1
0
forked from iarv/contact

Compare commits

...

98 Commits

Author SHA1 Message Date
pdxlocations
f11f7bb9e0 tryfix deps 2026-01-02 14:36:47 -08:00
pdxlocations
ecd2d2d692 fix dependency 2026-01-02 14:28:39 -08:00
pdxlocations
bdae90ecca allow python 3.14 and bump ver. 2026-01-02 14:25:54 -08:00
pdxlocations
56637f806b bump version 2025-12-26 23:32:06 -08:00
pdxlocations
c6abedec75 Merge pull request #237 from pdxlocations:close-interface
close the interface on quit
2025-12-27 02:26:28 -05:00
pdxlocations
6b18809215 close the interface on quit 2025-12-26 23:26:14 -08:00
pdxlocations
b048fe2480 Merge pull request #236 from pdxlocations:notification-sound-delay
wait for all messages to play notif sound
2025-12-27 02:21:45 -05:00
pdxlocations
600fc61ed7 wait for all messages to play notif sound 2025-12-26 23:21:25 -08:00
pdxlocations
fbf5ff6bd3 version bump 2025-12-16 08:55:08 -08:00
pdxlocations
faab1e961f fix nodeinfo keyerror 2025-12-16 08:29:30 -08:00
pdxlocations
255db3aa3c Merge pull request #234 from pdxlocations:dialog-scrolling
scrolling for dialogs
2025-12-16 08:23:14 -08:00
pdxlocations
42717c956f scrolling for dialogs 2025-12-16 07:59:16 -08:00
pdxlocations
ad77eba0d6 Fix formatting for Settings dialogue shortcut 2025-12-15 22:09:54 -08:00
pdxlocations
7d6918c69e Fix formatting of keyboard shortcut for settings 2025-12-15 22:07:36 -08:00
pdxlocations
70646a1214 Fix formatting for Settings dialogue shortcut 2025-12-15 22:06:52 -08:00
pdxlocations
53c1320d87 bump version 2025-12-15 22:04:54 -08:00
pdxlocations
ed9ff60f97 fix single-pane crash 2025-12-15 22:04:14 -08:00
pdxlocations
443df7bf48 Merge pull request #233 from pdxlocations:rm-function-win
Remove Function Window
2025-12-15 21:52:55 -08:00
pdxlocations
d8452e74d5 don't move control window around 2025-12-15 21:52:31 -08:00
pdxlocations
2cefdfb645 update readme 2025-12-15 21:35:40 -08:00
pdxlocations
191d6bad35 remove help/function window 2025-12-15 21:31:57 -08:00
pdxlocations
bf1d0ecea9 Merge pull request #232 from pdxlocations/3.9-compatible
restore 3.9 compatibility
2025-12-15 19:34:46 -08:00
pdxlocations
33904d2785 restore 3.9 compatibility 2025-12-15 19:33:26 -08:00
pdxlocations
b5fd8d74c4 bump version 2025-11-30 22:09:18 -08:00
pdxlocations
c383091a00 bracket and spacing fix 2025-11-30 22:07:04 -08:00
pdxlocations
cc37f9a66b Merge pull request #230 from brightkill/main
Changes to UI
2025-11-30 22:00:48 -08:00
brightkill
41ea441e32 fastfix for timestamp db message loading 2025-11-30 21:03:47 +03:00
brightkill
58fb82fb1b full node info on f5, traceroute additional f4 key, move prompt to bottom, node count 2025-11-30 20:34:44 +03:00
brightkill
dcd39c231f db handler timestamp 2025-11-30 20:34:41 +03:00
brightkill
87bc876c3e add timestamp to messages 2025-11-30 20:34:15 +03:00
pdxlocations
10fc78c869 Merge pull request #227 from Timmoth/patch-1 2025-11-03 11:58:59 -08:00
Tim Jones
9fa66ac80f Added favorite & ignored toggle commands to readme 2025-11-03 19:50:53 +00:00
pdxlocations
974a4af7f4 bump version 2025-10-23 21:19:15 -07:00
pdxlocations
9026c56ebf Merge pull request #226 from pdxlocations:improve-traceroute-timer
Implement cooldown for traceroute command
2025-10-22 08:38:44 -07:00
pdxlocations
26ca9599de Implement cooldown for traceroute command to prevent spamming; update UI state to track last traceroute time 2025-10-22 08:38:16 -07:00
pdxlocations
44b2a3abee bump version 2025-10-22 07:58:29 -07:00
pdxlocations
a26804b8b6 adjust splash 2025-10-22 07:56:59 -07:00
pdxlocations
b225d5fe51 Update .gitignore and launch.json for debugging configurations; adjust splash screen layout 2025-10-22 07:53:51 -07:00
pdxlocations
ea33b78af0 bump version 2025-10-03 22:30:58 -07:00
pdxlocations
c7f3f47ac2 clear srcreen on init 2025-10-03 22:28:11 -07:00
varna9000
8d41a1e060 Add telemetry beautifier (#221)
Co-authored-by: varna9000 <milen@aeroisk.com>
2025-08-26 10:11:47 -07:00
pdxlocations
c6d760650f fix Menu translasion 2025-08-24 00:33:55 -07:00
pdxlocations
3f12eca2ad Fix save dialog logic 2025-08-24 00:30:53 -07:00
pdxlocations
12bc87dd46 add help text 2025-08-24 00:00:42 -07:00
pdxlocations
bd4469f708 small cleanup 2025-08-23 23:57:27 -07:00
pdxlocations
b9a1c9d9a7 add missing help strings 2025-08-23 23:46:03 -07:00
pdxlocations
18d743c599 extract payload details 2025-08-23 01:00:57 -07:00
pdxlocations
c156211df8 parse protobufs (#219) 2025-08-23 00:28:33 -07:00
pdxlocations
888cdb244c bump version 2025-08-22 23:36:22 -07:00
pdxlocations
0c8ca2eb48 Update README with single pane mode instructions
Added information about enabling single pane mode for smaller displays.
2025-08-22 23:16:38 -07:00
pdxlocations
c06017e3f9 Add Single Pane Mode and Support for Smaller Displays (#217)
* init

* shift focus on message send

* fix save check

* focus arrows fix

* fix single-pane crash

* fix packet log crash

* Bonus, redraw settings when new line in packetlog

* refactor

* allow smaller windows

* hide help on small screens
2025-08-22 23:07:31 -07:00
pdxlocations
751a143d0a Merge pull request #216 from jekeam/patch-1 2025-08-13 07:25:44 -07:00
jekeam
f7d203e97a Update README.md [Install for Window] 2025-08-13 14:48:05 +05:00
pdxlocations
de4f813b90 bump version 2025-08-12 21:55:34 -07:00
pdxlocations
e17f7e576f hide cursor 2025-08-12 21:48:21 -07:00
pdxlocations
dccdb00dcd Redraw settings menu on new node 2025-08-12 21:32:38 -07:00
pdxlocations
81fd7a26f5 Merge pull request #215 from pdxlocations:confirm-unsaved-changes
Confirm unsaved Changes
2025-08-08 00:39:17 -07:00
pdxlocations
640955656f fix no config redraw 2025-08-08 00:38:34 -07:00
pdxlocations
8f248f4b5b add confirmation to app settings 2025-08-08 00:29:11 -07:00
pdxlocations
c10905e954 don't exit dialog with left arrow 2025-08-07 23:58:47 -07:00
pdxlocations
d1b93263fa add confirmation box if settings not saved 2025-08-07 23:34:36 -07:00
pdxlocations
623708c2a1 update README.md 2025-08-07 22:30:18 -07:00
pdxlocations
9b8abdb344 fix admin key window name 2025-07-31 23:48:15 -07:00
pdxlocations
8c3e00b52b redraw other input types 2025-07-31 23:44:14 -07:00
pdxlocations
81ebd1b95f fix get_text_input 2025-07-31 00:55:19 -07:00
pdxlocations
ae75d85741 fix user settings inputs 2025-07-31 00:33:30 -07:00
pdxlocations
b6767f423e Fix settings redraw (#214)
* current window 4

* refresh settings on new message

* redraw dialog and fix traceroute

* formatting and catch

* move continue
2025-07-31 00:08:08 -07:00
pdxlocations
b1252fec6c Update README.md 2025-07-30 22:18:01 -07:00
pdxlocations
43d1152074 Update README.md 2025-07-30 22:14:46 -07:00
pdxlocations
786a7b03c5 Configure Filepath for Export Node Config (#213)
* add node config path to settings

* try reload config but failed
2025-07-29 16:51:26 -07:00
pdxlocations
8d111c5df7 Add Warning for Sending Messages Quickly (#212)
* Warn About 2-Second Message Delay

* add comment

* update lines and cols
2025-07-28 23:04:57 -07:00
pdxlocations
b314a24a0c Input Validation Framework (#211)
* init

* validation framework

* add rules

* automatic types

* changes

* fix positions

* redraw input

* check for selected_config

* tweaks

* refactor
2025-07-26 21:20:15 -07:00
pdxlocations
4378f3045c unused argument 2025-07-24 18:02:44 -07:00
pdxlocations
a451d1d7d6 note to future me 2025-07-21 23:18:11 -07:00
pdxlocations
fe1f027219 Merge pull request #209 from pdxlocations/check-db-fields-for-null
Check for NULLS in DB
2025-07-21 10:57:18 -07:00
pdxlocations
43435cbe04 replace \x00 in messages 2025-07-21 10:54:19 -07:00
pdxlocations
fe98075582 bump version 2025-07-21 00:04:00 -07:00
pdxlocations
8716ea6fe1 dont write to the log before config 2025-07-20 23:46:24 -07:00
pdxlocations
a8bdcbb7e6 Merge pull request #208 from pdxlocations/config
fallback to user if install dir not writable
2025-07-17 23:00:13 -07:00
pdxlocations
02742b27f3 fallback to user if install dir not writable 2025-07-17 22:59:35 -07:00
pdxlocations
ae028032a0 Merge pull request #207 from pdxlocations/errors
show connection errors in console
2025-07-17 00:12:12 -07:00
pdxlocations
30402f4906 show connection errors in console 2025-07-17 00:10:17 -07:00
pdxlocations
324e0b03e7 Merge pull request #206 from pdxlocations/notifications
maybe fix aplay
2025-07-16 19:43:37 -07:00
pdxlocations
056db12911 maybe fix aplay 2025-07-16 18:46:51 -07:00
pdxlocations
685a2d4bf8 bump version 2025-07-14 08:15:16 -07:00
pdxlocations
6ed0cc8c9f Merge pull request #199 from rfschmid/add-traceroute-sent-message-to-history
Add traceroute sent message to history
2025-07-03 10:46:07 -07:00
Russell Schmidt
fc208a9258 Add "Traceroute Sent" to message history 2025-07-03 12:43:18 -05:00
Russell Schmidt
eaf9381bca Refactor message saving
Add common function for saving a message to history, removing some
duplicate code and making traceroutes add timestamps like other messages
do.
2025-07-03 12:43:18 -05:00
pdxlocations
367af9044c Merge pull request #198 from rfschmid/add-node-name-to-traceroute-confirm-dialog
Add node name to traceroute confirm dialog
2025-07-03 10:32:41 -07:00
Russell Schmidt
d8183d9009 Make capitalization consistent 2025-07-03 12:19:24 -05:00
Russell Schmidt
3fb1335be3 Add node name to traceroute confirm dialog 2025-07-03 12:10:56 -05:00
pdxlocations
8b05072786 bump version 2025-06-13 15:33:57 -07:00
pdxlocations
4455781e6c fix types and returns 2025-06-12 16:38:05 -07:00
pdxlocations
0c8aaee415 Merge pull request #197 from rfschmid/redirect-sound-player-output-to-dev-null
Redirect sound player output to dev null
2025-06-12 16:10:26 -07:00
pdxlocations
b97d9f4649 Merge pull request #196 from rfschmid/only-clear-input-text-on-enter-if-sending-message
Only clear input on enter when sending message
2025-06-12 16:09:44 -07:00
Russell Schmidt
4152fb6a21 Redirect sound player output to dev null
On my linux system, the sound playing code goes to aplay. When called,
aplay outputs a message about the file it is playing to stderr, which
causes it to be printed on the input line, which can't be easily
cleared. Redirect output from the audio player executable to dev/null.
Deduplicate sound playing code a bit so we only need one call to
subprocess.run, so I don't have to make this change in three places.
2025-06-12 17:28:30 -05:00
Russell Schmidt
384e36dac2 Only clear input on enter when sending message
We should only clear the input field when the user presses enter if the
user actually sent the message. If selecting a different node to send
to, don't clear input.
2025-06-12 17:23:03 -05:00
pdxlocations
65bca84fe6 minor refactor 2025-06-10 23:24:11 -07:00
22 changed files with 1698 additions and 674 deletions

3
.gitignore vendored
View File

@@ -8,4 +8,5 @@ client.log
settings.log
config.json
default_config.log
dist/
dist/
.vscode/launch.json

11
.vscode/launch.json vendored
View File

@@ -1,13 +1,22 @@
{
"version": "0.1.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"name": "Python Debugger: main",
"type": "debugpy",
"request": "launch",
"cwd": "${workspaceFolder}",
"module": "contact.__main__",
"args": []
},
{
"name": "Python Debugger: tcp",
"type": "debugpy",
"request": "launch",
"cwd": "${workspaceFolder}",
"module": "contact.__main__",
"args": ["--host","192.168.86.69"]
}
]
}

View File

@@ -6,6 +6,13 @@
```bash
pip install contact
```
> [!NOTE]
> Windows users must also install:
>
> ```powershell
> pip install windows-curses
> ```
> because the built-in curses module is not available on Windows.
This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores.
@@ -25,14 +32,24 @@ All messages will saved in a SQLite DB and restored upon relaunch of the app. Y
By navigating to Settings -> App Settings, you may customize your UI's icons, colors, and more!
For smaller displays you may wish to enable `single_pane_mode`:
<img width="486" height="194" alt="Screenshot 2025-08-22 at 11 15 54PM" src="https://github.com/user-attachments/assets/447c5d30-0850-4a4f-b0d4-976e4c5e329d" />
## Commands
- `CTRL` + `k` = display a list of commands.
- `↑→↓←` = Navigate around the UI.
- `F1/F2/F3` = Jump to Channel/Messages/Nodes
- `ENTER` = Send a message typed in the Input Window, or with the Node List highlighted, select a node to DM
- `` ` `` = Open the Settings dialogue
- `` ` `` or `F12` = Open the Settings dialogue
- `CTRL` + `p` = Hide/show a log of raw received packets.
- `CTRL` + `t` = With the Node List highlighted, send a traceroute to the selected node
- `CTRL` + `t` or `F4` = With the Node List highlighted, send a traceroute to the selected node
- `F5` = Display a node's info
- `CTRL` + `f` = With the Node List highlighted, favorite the selected node
- `CTRL` + `g` = With the Node List highlighted, ignore the selected node
- `CTRL` + `d` = With the Channel List hightlighted, archive a chat to reduce UI clutter. Messages will be saved in the db and repopulate if you send or receive a DM from this user.
- `CTRL` + `d` = With the Note List highlghted, remove a node from your nodedb.
- `ESC` = Exit out of the Settings Dialogue, or Quit the application if settings are not displayed.
### Search
@@ -62,8 +79,17 @@ If no connection arguments are specified, the client will attempt a serial conne
contact --port /dev/ttyUSB0
contact --host 192.168.1.1
contact --ble BlAddressOfDevice
contact --port COM3
```
To quickly connect to localhost, use:
```sh
contact -t
```
## Install in development (editable) mode:
```bash
git clone https://github.com/pdxlocations/contact.git
cd contact
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
```

View File

@@ -53,26 +53,26 @@ logging.basicConfig(
app_state.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals(args) -> None:
def initialize_globals() -> None:
"""Initializes interface and shared globals."""
interface_state.interface = initialize_interface(args)
# Prompt for region if unset
if interface_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
ui_state.single_pane_mode = config.single_pane_mode.lower() == "true"
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
@@ -81,55 +81,68 @@ def initialize_globals(args) -> None:
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args()
args = setup_parser().parse_args()
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface...")
with app_state.lock:
initialize_globals(args)
logging.info("Starting main UI")
logging.info("Initializing interface...")
with app_state.lock:
interface_state.interface = initialize_interface(args)
main_ui(stdscr)
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
except Exception as e:
console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
initialize_globals()
logging.info("Starting main UI")
stdscr.clear()
stdscr.refresh()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
raise
def start() -> None:
"""Launch curses wrapper and redirect logs to file."""
"""Entry point for the application."""
if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help()
sys.exit(0)
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
try:
curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
try:
curses.endwin()
except Exception:
pass
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":

View File

@@ -14,13 +14,13 @@ id, "", ""
uplink_enabled, "Uplink enabled", "Let this channel's data be sent to the MQTT server configured on this node."
downlink_enabled, "Downlink enabled", "Let data from the MQTT server configured on this node be sent to this channel."
module_settings, "Module settings", "Position precision and Client Mute."
position_precision, "Position precision", "The precision level of location data sent on this channel."
is_client_muted, "", ""
module_settings.position_precision, "Position precision", "The precision level of location data sent on this channel."
module_settings.is_client_muted, "Is Client Muted", "Controls whether or not the phone / clients should mute the current channel. Useful for noisy public channels you don't necessarily want to disable."
[config.device]
title, "Device"
role, "Role", "For the vast majority of users, the correct choice is CLIENT. See Meshtastic docs for more information."
serial_enabled, "Enable serial console", ""
serial_enabled, "Enable serial console", "Serial Console over the Stream API."
button_gpio, "Button GPIO", "GPIO pin for user button."
buzzer_gpio, "Buzzer GPIO", "GPIO pin for user buzzer."
rebroadcast_mode, "Rebroadcast mode", "This setting defines the device's behavior for how messages are rebroadcast."
@@ -30,6 +30,7 @@ is_managed, "Enable managed mode", "Enabling Managed Mode blocks smartphone apps
disable_triple_click, "Disable triple button press", ""
tzdef, "Timezone", "Uses the TZ Database format to display the correct local time on the device display and in its logs."
led_heartbeat_disabled, "Disable LED heartbeat", "On certain hardware models, this disables the blinking heartbeat LED."
buzzer_mode, "Buzzer Mode", "Controls buzzer behavior for audio feedback."
[config.position]
title, "Position"
@@ -77,6 +78,7 @@ subnet, "IPv4 subnet", ""
dns, "IPv4 DNS server", ""
rsyslog_server, "RSyslog server", ""
enabled_protocols, "Enabled protocols", ""
ipv6_enabled, "IPv6 enabled", "Enables or Disables IPv6 networking."
[config.network.ipv4_config]
title, "IPv4 Config", ""
@@ -158,7 +160,7 @@ channel_num, "Frequency slot", "Determines the exact frequency the radio transmi
override_duty_cycle, "Override duty cycle", "Override the legal transmit time limit to allow unlimited transmit time. [warning]May have legal ramifications.[/warning]"
sx126x_rx_boosted_gain, "Enable SX126X RX boosted gain", "This is an option specific to the SX126x chip series which allows the chip to consume a small amount of additional power to increase RX (receive) sensitivity."
override_frequency, "Override frequency in MHz", "Overrides frequency slot. May have legal ramifications."
pa_fan_disabled, "", ""
pa_fan_disabled, "PA Fan Disabled", "If true, disable the build-in PA FAN using pin define in RF95_FAN_EN"
ignore_mqtt, "Ignore MQTT", "Ignores any messages it receives via LoRa that came via MQTT somewhere along the path towards the device."
config_ok_to_mqtt, "OK to MQTT", "Indicates that the user approves their packets to be uplinked to MQTT brokers."
@@ -190,8 +192,10 @@ tls_enabled, "TLS enabled", "If true, we attempt to establish a secure connectio
root, "Root topic", "The root topic to use for MQTT messages. This is useful if you want to use a single MQTT server for multiple meshtastic networks and separate them via ACLs."
proxy_to_client_enabled, "Client proxy enabled", "If true, let the device use the client's (e.g. your phone's) network connection to connect to the MQTT server. If false, it uses the device's network connection which you have to enable via the network settings."
map_reporting_enabled, "Map reporting enabled", "Available from firmware version 2.3.2 on. If true, your node will periodically send an unencrypted map report to the MQTT server to be displayed by online maps that support this packet."
map_report_settings, "Map report settings", "Settings for the map report module."
map_report_settings.publish_interval_secs, "Map report publish interval", "How often we should publish the map report to the MQTT server in seconds. Defaults to 900 seconds (15 minutes)."
map_report_settings.position_precision, "Map report position precision", "The precision to use for the position in the map report. Defaults to a maximum deviation of around 1459m."
map_report_settings.should_report_location, "Should report location", "Whether we have opted-in to report our location to the map."
[module.serial]
title, "Serial"
@@ -208,9 +212,9 @@ override_console_serial_port, "Override console serial port", "If set to true, t
title, "External Notification"
enabled, "Enabled", "Enables the module."
output_ms, "Length", "Specifies how long in milliseconds you would like your GPIOs to be active. In case of the repeat option, this is the duration of every tone and pause."
output, "", ""
output_vibra, "", ""
output_buzzer, "", ""
output, "Output GPIO", "Define the output pin GPIO setting Defaults to EXT_NOTIFY_OUT if set for the board. In standalone devices this pin should drive the LED to match the UI."
output_vibra, "Vibra GPIO", "Optional: Define a secondary output pin for a vibra motor. This is used in standalone devices to match the UI."
output_buzzer, "Buzzer GPIO", "Optional: Define a tertiary output pin for an active buzze. This is used in standalone devices to to match the UI."
active, "Active (general / LED only)", "Specifies whether the external circuit is active when the device's GPIO is low or high. If this is set true, the pin will be pulled active high, false means active low."
alert_message, "Alert when receiving a message (general)", "Specifies if an alert should be triggered when receiving an incoming message."
alert_message_vibra, "Alert vibration on message", "Specifies if a vibration alert should be triggered when receiving an incoming message."
@@ -280,7 +284,8 @@ i2s_sck, "I2S clock", "The GPIO to use for the SCK signal in the I2S interface."
[module.remote_hardware]
title, "Remote Hardware"
enabled, "Enabled", "Enables the module."
allow_undefined_pin_access, "Allow undefined pin access", ""
allow_undefined_pin_access, "Allow undefined pin access", "Whether the Module allows consumers to read / write to pins not defined in available_pins"
available_pins, "Available pins", "Exposes the available pins to the mesh for reading and writing."
[module.neighbor_info]
title, "Neighbor Info"
@@ -311,5 +316,5 @@ use_pullup, "Use pull-up", "Whether or not use INPUT_PULLUP mode for GPIO pin. O
title, "Paxcounter"
enabled, "Enabled", "Enables the module."
paxcounter_update_interval, "Update interval", "The interval in seconds of how often we can send a message to the mesh when a state change is detected."
Wi-Fi_threshold, "", ""
ble_threshold, "", ""
Wi-Fi_threshold, "Wi-Fi Threshold", "WiFi RSSI threshold. Defaults to -80"
ble_threshold, "BLE Threshold", "BLE RSSI threshold. Defaults to -80"

View File

@@ -2,12 +2,52 @@ import logging
import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
import subprocess
import threading
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: threading.Timer | None = None
_sound_timer_lock = threading.Lock()
_last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period.
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
global _sound_timer, _last_sound_request
now = time.monotonic()
with _sound_timer_lock:
_last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None:
try:
_sound_timer.cancel()
except Exception:
pass
_sound_timer = None
def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock:
if expected_request_time != _last_sound_request:
return
play_sound()
_sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True
_sound_timer.start()
from typing import Any, Dict
from contact.utilities.utils import refresh_node_list
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
@@ -23,43 +63,48 @@ from contact.utilities.db_handler import (
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
if os.path.exists(sound_path):
subprocess.run(["afplay", sound_path], check=True)
return
else:
logging.warning(f"macOS sound file not found: {sound_path}")
executable = "afplay"
elif system == "Linux":
sound_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
if os.path.exists(sound_path):
if shutil.which("paplay"):
subprocess.run(["paplay", sound_path], check=True)
return
elif shutil.which("aplay"):
subprocess.run(["aplay", sound_path], check=True)
return
else:
logging.warning("No sound player found (paplay/aplay)")
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning(f"Linux sound file not found: {sound_path}")
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
# Final fallback: terminal beep
print("\a")
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""
@@ -78,6 +123,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
try:
if "decoded" not in packet:
return
@@ -92,9 +140,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0)
hop_limit = packet.get('hopLimit', 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
play_sound()
schedule_notification_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
@@ -119,7 +172,9 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
channel_number = ui_state.channel_list.index(packet["from"])
if ui_state.channel_list[channel_number] != ui_state.channel_list[ui_state.selected_channel]:
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -129,40 +184,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[ui_state.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[ui_state.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], message_from_id, message_string)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,4 +1,5 @@
from datetime import datetime
import time
from typing import Any, Dict
import google.protobuf.json_format
@@ -16,6 +17,8 @@ import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -48,7 +51,7 @@ def onAckNak(packet: Dict[str, Any]) -> None:
ack_type = "Nak"
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ",
time.strftime("[%H:%M:%S] ") + config.sent_message_prefix + confirm_string + ": ",
message,
)
@@ -146,8 +149,9 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -155,18 +159,14 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
if ui_state.channel_list[channel_number] not in ui_state.all_messages:
ui_state.all_messages[ui_state.channel_list[channel_number]] = []
ui_state.all_messages[ui_state.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(ui_state.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
@@ -190,32 +190,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel,
)
# Add sent message to the messages dictionary
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
ui_state.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
timestamp = save_message_to_db(channel_id, myid, message)
@@ -230,10 +205,14 @@ def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData(
r,
destinationId=ui_state.node_list[ui_state.selected_node],
destinationId=channel_id,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

View File

@@ -1,46 +1,108 @@
import curses
import logging
import time
import traceback
from typing import Union
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute
from contact.utilities.utils import parse_protobuf
from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.singleton import ui_state, interface_state, menu_state
MIN_COL = 1 # "effectively zero" without breaking curses
root_win = None
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
def draw_window_arrows(window_id: int) -> None:
if window_id == 0:
draw_main_arrows(channel_win, len(ui_state.channel_list), window=0)
channel_win.refresh()
elif window_id == 1:
msg_line_count = messages_pad.getmaxyx()[0]
draw_main_arrows(
messages_win,
msg_line_count,
window=1,
log_height=packetlog_win.getmaxyx()[0],
)
messages_win.refresh()
elif window_id == 2:
draw_main_arrows(nodes_win, len(ui_state.node_list), window=2)
nodes_win.refresh()
def compute_widths(total_w: int, focus: int):
# focus: 0=channel, 1=messages, 2=nodes
if total_w < 3 * MIN_COL:
# tiny terminals: allocate something, anything
return max(1, total_w), 0, 0
if focus == 0:
return total_w - 2 * MIN_COL, MIN_COL, MIN_COL
if focus == 1:
return MIN_COL, total_w - 2 * MIN_COL, MIN_COL
return MIN_COL, MIN_COL, total_w - 2 * MIN_COL
def paint_frame(win, selected: bool) -> None:
win.attrset(get_color("window_frame_selected") if selected else get_color("window_frame"))
win.box()
win.attrset(get_color("window_frame"))
win.refresh()
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
# Calculate window max dimensions
height, width = stdscr.getmaxyx()
# Define window dimensions and positions
channel_width = int(config.channel_list_16ths) * (width // 16)
nodes_width = int(config.node_list_16ths) * (width // 16)
messages_width = width - channel_width - nodes_width
if ui_state.single_pane_mode:
channel_width, messages_width, nodes_width = compute_widths(width, ui_state.current_window)
else:
channel_width = int(config.channel_list_16ths) * (width // 16)
nodes_width = int(config.node_list_16ths) * (width // 16)
messages_width = width - channel_width - nodes_width
channel_width = max(MIN_COL, channel_width)
messages_width = max(MIN_COL, messages_width)
nodes_width = max(MIN_COL, nodes_width)
# Ensure the three widths sum exactly to the terminal width by adjusting the focused pane
total = channel_width + messages_width + nodes_width
if total != width:
delta = total - width
if ui_state.current_window == 0:
channel_width = max(MIN_COL, channel_width - delta)
elif ui_state.current_window == 1:
messages_width = max(MIN_COL, messages_width - delta)
else:
nodes_width = max(MIN_COL, nodes_width - delta)
entry_height = 3
function_height = 3
y_pad = entry_height + function_height
packet_log_height = int(height / 3)
y_pad = entry_height
content_h = max(1, height - y_pad)
pkt_h = max(1, int(height / 3))
if firstrun:
entry_win = curses.newwin(entry_height, width, 0, 0)
channel_win = curses.newwin(height - y_pad, channel_width, entry_height, 0)
messages_win = curses.newwin(height - y_pad, messages_width, entry_height, channel_width)
nodes_win = curses.newwin(height - y_pad, nodes_width, entry_height, channel_width + messages_width)
function_win = curses.newwin(function_height, width, height - function_height, 0)
packetlog_win = curses.newwin(
packet_log_height, messages_width, height - packet_log_height - function_height, channel_width
)
entry_win = curses.newwin(entry_height, width, height - entry_height, 0)
channel_win = curses.newwin(content_h, channel_width, 0, 0)
messages_win = curses.newwin(content_h, messages_width, 0, channel_width)
nodes_win = curses.newwin(content_h, nodes_width, 0, channel_width + messages_width)
packetlog_win = curses.newwin(pkt_h, messages_width, height - pkt_h - entry_height, channel_width)
# Will be resized to what we need when drawn
messages_pad = curses.newpad(1, 1)
@@ -48,7 +110,7 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
channel_pad = curses.newpad(1, 1)
# Set background colors for windows
for win in [entry_win, channel_win, messages_win, nodes_win, function_win, packetlog_win]:
for win in [entry_win, channel_win, messages_win, nodes_win, packetlog_win]:
win.bkgd(get_color("background"))
# Set background colors for pads
@@ -56,31 +118,30 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pad.bkgd(get_color("background"))
# Set colors for window frames
for win in [channel_win, entry_win, nodes_win, messages_win, function_win]:
for win in [channel_win, entry_win, nodes_win, messages_win]:
win.attrset(get_color("window_frame"))
else:
for win in [entry_win, channel_win, messages_win, nodes_win, function_win, packetlog_win]:
for win in [entry_win, channel_win, messages_win, nodes_win, packetlog_win]:
win.erase()
entry_win.resize(3, width)
entry_win.resize(entry_height, width)
entry_win.mvwin(height - entry_height, 0)
channel_win.resize(height - y_pad, channel_width)
channel_win.resize(content_h, channel_width)
channel_win.mvwin(0, 0)
messages_win.resize(height - y_pad, messages_width)
messages_win.mvwin(3, channel_width)
messages_win.resize(content_h, messages_width)
messages_win.mvwin(0, channel_width)
nodes_win.resize(height - y_pad, nodes_width)
nodes_win.mvwin(entry_height, channel_width + messages_width)
nodes_win.resize(content_h, nodes_width)
nodes_win.mvwin(0, channel_width + messages_width)
function_win.resize(3, width)
function_win.mvwin(height - function_height, 0)
packetlog_win.resize(packet_log_height, messages_width)
packetlog_win.mvwin(height - packet_log_height - function_height, channel_width)
packetlog_win.resize(pkt_h, messages_width)
packetlog_win.mvwin(height - pkt_h - entry_height, channel_width)
# Draw window borders
for win in [channel_win, entry_win, nodes_win, messages_win, function_win]:
for win in [channel_win, entry_win, nodes_win, messages_win]:
win.box()
win.refresh()
@@ -88,10 +149,10 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
curses.curs_set(1)
try:
draw_function_win()
draw_channel_list()
draw_messages_window(True)
draw_node_list()
draw_window_arrows(ui_state.current_window)
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
@@ -102,13 +163,16 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
def main_ui(stdscr: curses.window) -> None:
"""Main UI loop for the curses interface."""
global input_text
global root_win
root_win = stdscr
input_text = ""
stdscr.keypad(True)
get_channels()
handle_resize(stdscr, True)
while True:
draw_text_field(entry_win, f"Input: {input_text[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
@@ -136,17 +200,22 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_LEFT or char == curses.KEY_RIGHT:
handle_leftright(char)
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
handle_enter(input_text)
input_text = ""
elif char in (curses.KEY_F1, curses.KEY_F2, curses.KEY_F3):
handle_function_keys(char)
elif char == chr(20): # Ctrl + t for Traceroute
elif char in (chr(curses.KEY_ENTER), chr(10), chr(13)):
input_text = handle_enter(input_text)
elif char in (curses.KEY_F4, chr(20)): # Ctrl + t and F4 for Traceroute
handle_ctrl_t(stdscr)
elif char == curses.KEY_F5:
handle_f5_key(stdscr)
elif char in (curses.KEY_BACKSPACE, chr(127)):
input_text = handle_backspace(entry_win, input_text)
elif char == "`": # ` Launch the settings interface
elif char in (curses.KEY_F12, "`"): # ` Launch the settings interface
handle_backtick(stdscr)
elif char == chr(16): # Ctrl + P for Packet Log
@@ -162,6 +231,9 @@ def main_ui(stdscr: curses.window) -> None:
elif char == chr(31): # Ctrl + / to search
handle_ctrl_fslash()
elif char == chr(11): # Ctrl + K for Help
handle_ctrl_k(stdscr)
elif char == chr(6): # Ctrl + F to toggle favorite
handle_ctrl_f(stdscr)
@@ -209,6 +281,8 @@ def handle_home() -> None:
elif ui_state.current_window == 2:
select_node(0)
draw_window_arrows(ui_state.current_window)
def handle_end() -> None:
"""Handle end key events to select the last item in the current window."""
@@ -220,29 +294,27 @@ def handle_end() -> None:
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(len(ui_state.node_list) - 1)
draw_window_arrows(ui_state.current_window)
def handle_pageup() -> None:
"""Handle page up key events to scroll the current window by a page."""
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
select_channel(ui_state.selected_channel - (channel_win.getmaxyx()[0] - 2))
elif ui_state.current_window == 1:
ui_state.selected_message = max(
ui_state.selected_message - get_msg_window_lines(messages_win, packetlog_win), 0
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
select_node(ui_state.selected_node - (nodes_win.getmaxyx()[0] - 2))
draw_window_arrows(ui_state.current_window)
def handle_pagedown() -> None:
"""Handle page down key events to scroll the current window down."""
if ui_state.current_window == 0:
select_channel(
ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2)
) # select_channel will bounds check for us
select_channel(ui_state.selected_channel + (channel_win.getmaxyx()[0] - 2))
elif ui_state.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
ui_state.selected_message = min(
@@ -251,7 +323,8 @@ def handle_pagedown() -> None:
)
refresh_pad(1)
elif ui_state.current_window == 2:
select_node(ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2)) # select_node will bounds check for us
select_node(ui_state.selected_node + (nodes_win.getmaxyx()[0] - 2))
draw_window_arrows(ui_state.current_window)
def handle_leftright(char: int) -> None:
@@ -259,46 +332,80 @@ def handle_leftright(char: int) -> None:
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
handle_resize(root_win, False)
if old_window == 0:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
paint_frame(channel_win, selected=False)
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
messages_win.box()
messages_win.refresh()
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
if ui_state.current_window == 0:
channel_win.attrset(get_color("window_frame_selected"))
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window)
def handle_enter(input_text: str) -> None:
def handle_function_keys(char: int) -> None:
"""Switch windows using F1/F2/F3."""
if char == curses.KEY_F1:
target = 0
elif char == curses.KEY_F2:
target = 1
elif char == curses.KEY_F3:
target = 2
else:
return
old_window = ui_state.current_window
if target == old_window:
return
ui_state.current_window = target
handle_resize(root_win, False)
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
elif old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window)
def handle_enter(input_text: str) -> str:
"""Handle Enter key events to send messages or select channels."""
if ui_state.current_window == 2:
node_list = ui_state.node_list
@@ -315,34 +422,144 @@ def handle_enter(input_text: str) -> None:
ui_state.selected_node = 0
ui_state.current_window = 0
handle_resize(root_win, False)
draw_node_list()
draw_channel_list()
draw_messages_window(True)
draw_window_arrows(ui_state.current_window)
return input_text
elif len(input_text) > 0:
# TODO: This is a hack to prevent sending messages too quickly. Let's get errors from the node.
now = time.monotonic()
if now - ui_state.last_sent_time < 2.5:
contact.ui.dialog.dialog("Slow down", "Please wait 2 seconds between messages.")
return input_text
# Enter key pressed, send user input as message
send_message(input_text, channel=ui_state.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
input_text = ""
ui_state.last_sent_time = now
entry_win.erase()
if ui_state.current_window == 0:
ui_state.current_window = 1
handle_resize(root_win, False)
return ""
return input_text
def handle_f5_key(stdscr: curses.window) -> None:
node = None
try:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
message_parts = []
message_parts.append("**📋 Basic Information:**")
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
role = f"{node.get('user', {}).get('role', 'Unknown')}"
message_parts.append(f"• Role: {role}")
pk = f"{node.get('user', {}).get('publicKey')}"
message_parts.append(f"Public key: {pk}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
if "position" in node:
pos = node["position"]
if pos.get("latitude") and pos.get("longitude"):
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
if pos.get("altitude"):
message_parts.append(f"• Altitude: {pos['altitude']}m")
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
message_parts.append("\n**🌐 Network Metrics:**")
if "snr" in node:
snr = node["snr"]
snr_status = (
"🟢 Excellent"
if snr > 10
else (
"🟡 Good"
if snr > 3
else "🟠 Fair" if snr > -10 else "🔴 Poor" if snr > -20 else "💀 Very Poor"
)
)
message_parts.append(f"• SNR: {snr}dB {snr_status}")
if "hopsAway" in node:
hops = node["hopsAway"]
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else ""
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
if "lastHeard" in node and node["lastHeard"]:
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
if node.get("deviceMetrics"):
metrics = node["deviceMetrics"]
message_parts.append("\n**📊 Device Metrics:**")
if "batteryLevel" in metrics:
battery = metrics["batteryLevel"]
battery_emoji = "🟢" if battery > 50 else "🟡" if battery > 20 else "🔴"
voltage_info = f" ({metrics['voltage']}v)" if "voltage" in metrics else ""
message_parts.append(f"• Battery: {battery_emoji} {battery}%{voltage_info}")
if "uptimeSeconds" in metrics:
message_parts.append(f"• Uptime: ⏱️ {get_readable_duration(metrics['uptimeSeconds'])}")
if "channelUtilization" in metrics:
util = metrics["channelUtilization"]
util_emoji = "🔴" if util > 80 else "🟡" if util > 50 else "🟢"
message_parts.append(f"• Channel utilization: {util_emoji} {util:.2f}%")
if "airUtilTx" in metrics:
air_util = metrics["airUtilTx"]
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
message = "\n".join(message_parts)
contact.ui.dialog.dialog(f"📡 Node Details: {node.get('user', {}).get('shortName', 'Unknown')}", message)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
except KeyError:
return
def handle_ctrl_t(stdscr: curses.window) -> None:
"""Handle Ctrl + T key events to send a traceroute."""
now = time.monotonic()
cooldown = 30.0
remaining = cooldown - (now - ui_state.last_traceroute_time)
if remaining > 0:
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
"Traceroute Not Sent", f"Please wait {int(remaining)} seconds before sending another traceroute."
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
return
send_traceroute()
ui_state.last_traceroute_time = now
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(
stdscr,
"Traceroute Sent",
"Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.",
f"Traceroute Sent To: {get_name_from_database(ui_state.node_list[ui_state.selected_node])}",
"Results will appear in messages window.",
)
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
def handle_backspace(entry_win: curses.window, input_text: str) -> None:
def handle_backspace(entry_win: curses.window, input_text: str) -> str:
"""Handle backspace key events to remove the last character from input text."""
if input_text:
input_text = input_text[:-1]
@@ -357,7 +574,10 @@ def handle_backspace(entry_win: curses.window, input_text: str) -> None:
def handle_backtick(stdscr: curses.window) -> None:
"""Handle backtick key events to open the settings menu."""
curses.curs_set(0)
previous_window = ui_state.current_window
ui_state.current_window = 4
settings_menu(stdscr, interface_state.interface)
ui_state.current_window = previous_window
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
@@ -375,6 +595,34 @@ def handle_ctrl_p() -> None:
draw_messages_window(True)
# --- Ctrl+K handler for Help ---
def handle_ctrl_k(stdscr: curses.window) -> None:
"""Handle Ctrl + K to show a help window with shortcut keys."""
curses.curs_set(0)
cmds = [
"↑/↓ = Scroll",
"←/→ = Switch window",
"F1/F2/F3 = Jump to Channel/Messages/Nodes",
"ENTER = Send / Select",
"` or F12 = Settings",
"ESC = Quit",
"Ctrl+P = Toggle Packet Log",
"Ctrl+T or F4 = Traceroute",
"F5 = Full node info",
"Ctrl+D = Archive chat / remove node",
"Ctrl+F = Favorite",
"Ctrl+G = Ignore",
"Ctrl+/ = Search",
"Ctrl+K = Help",
]
contact.ui.dialog.dialog("Help — Shortcut Keys", "\n".join(cmds))
curses.curs_set(1)
handle_resize(stdscr, False)
def handle_ctrl_d() -> None:
if ui_state.current_window == 0:
if isinstance(ui_state.channel_list[ui_state.selected_channel], int):
@@ -490,6 +738,10 @@ def handle_ctlr_g(stdscr: curses.window) -> None:
def draw_channel_list() -> None:
"""Update the channel list window and pad based on the current state."""
if ui_state.current_window != 0 and ui_state.single_pane_mode:
return
channel_pad.erase()
win_width = channel_win.getmaxyx()[1]
@@ -524,20 +776,18 @@ def draw_channel_list() -> None:
channel_pad.addstr(idx, 1, truncated_channel, color)
idx += 1
channel_win.attrset(
get_color("window_frame_selected") if ui_state.current_window == 0 else get_color("window_frame")
)
channel_win.box()
channel_win.attrset((get_color("window_frame")))
draw_main_arrows(channel_win, len(ui_state.channel_list), window=0)
channel_win.refresh()
paint_frame(channel_win, selected=(ui_state.current_window == 0))
refresh_pad(0)
draw_window_arrows(0)
channel_win.refresh()
def draw_messages_window(scroll_to_bottom: bool = False) -> None:
"""Update the messages window based on the selected channel and scroll position."""
if ui_state.current_window != 1 and ui_state.single_pane_mode:
return
messages_pad.erase()
channel = ui_state.channel_list[ui_state.selected_channel]
@@ -557,7 +807,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
for line in wrapped_lines:
if prefix.startswith("--"):
color = get_color("timestamps")
elif prefix.startswith(config.sent_message_prefix):
elif prefix.find(config.sent_message_prefix) != -1:
color = get_color("tx_messages")
else:
color = get_color("rx_messages")
@@ -565,39 +815,32 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
messages_pad.addstr(row, 1, line, color)
row += 1
messages_win.attrset(
get_color("window_frame_selected") if ui_state.current_window == 1 else get_color("window_frame")
)
messages_win.box()
messages_win.attrset(get_color("window_frame"))
messages_win.refresh()
paint_frame(messages_win, selected=(ui_state.current_window == 1))
visible_lines = get_msg_window_lines(messages_win, packetlog_win)
if scroll_to_bottom:
ui_state.selected_message = max(msg_line_count - visible_lines, 0)
ui_state.start_index[1] = max(msg_line_count - visible_lines, 0)
pass
else:
ui_state.selected_message = max(min(ui_state.selected_message, msg_line_count - visible_lines), 0)
draw_main_arrows(
messages_win,
msg_line_count,
window=1,
log_height=packetlog_win.getmaxyx()[0],
)
messages_win.refresh()
refresh_pad(1)
draw_packetlog_win()
draw_window_arrows(1)
messages_win.refresh()
if ui_state.current_window == 4:
menu_state.need_redraw = True
def draw_node_list() -> None:
"""Update the nodes list window and pad based on the current state."""
global nodes_pad
if ui_state.current_window != 2 and ui_state.single_pane_mode:
return
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
# if nodes_pad is None:
# nodes_pad = curses.newpad(1, 1)
@@ -614,9 +857,21 @@ def draw_node_list() -> None:
for i, node_num in enumerate(ui_state.node_list):
node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[
: box_width - 2
]
status_icon = "🔐" if secure else "🔓"
node_name = get_name_from_database(node_num, "long")
user_name = node["user"]["shortName"]
uptime_str = ""
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]:
uptime_str = f" / Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
last_heard_str = f"{get_time_ago(node['lastHeard'])}" if node.get("lastHeard") else ""
hops_str = f" ■ Hops: {node['hopsAway']}" if "hopsAway" in node else ""
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
# Future node name custom formatting possible
node_str = f"{status_icon} {node_name}"
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
color = "node_list"
if "isFavorite" in node and node["isFavorite"]:
color = "node_favorite"
@@ -626,22 +881,20 @@ def draw_node_list() -> None:
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
)
nodes_win.attrset(
get_color("window_frame_selected") if ui_state.current_window == 2 else get_color("window_frame")
)
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
draw_main_arrows(nodes_win, len(ui_state.node_list), window=2)
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
nodes_win.refresh()
refresh_pad(2)
draw_window_arrows(2)
nodes_win.refresh()
# Restore cursor to input field
entry_win.keypad(True)
curses.curs_set(1)
entry_win.refresh()
if ui_state.current_window == 4:
menu_state.need_redraw = True
def select_channel(idx: int) -> None:
"""Select a channel by index and update the UI state accordingly."""
@@ -699,15 +952,9 @@ def scroll_messages(direction: int) -> None:
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
draw_main_arrows(
messages_win,
msg_line_count,
ui_state.current_window,
log_height=packetlog_win.getmaxyx()[0],
)
messages_win.refresh()
refresh_pad(1)
draw_window_arrows(ui_state.current_window)
def select_node(idx: int) -> None:
@@ -724,8 +971,6 @@ def select_node(idx: int) -> None:
ui_state=ui_state,
)
draw_function_win()
def scroll_nodes(direction: int) -> None:
"""Scroll through the node list by a given direction."""
@@ -744,6 +989,9 @@ def draw_packetlog_win() -> None:
columns = [10, 10, 15, 30]
span = 0
if ui_state.current_window != 1 and ui_state.single_pane_mode:
return
if ui_state.display_log:
packetlog_win.erase()
height, width = packetlog_win.getmaxyx()
@@ -769,22 +1017,20 @@ def draw_packetlog_win() -> None:
else get_name_from_database(packet["to"], "short").ljust(columns[1])
)
if "decoded" in packet:
port = packet["decoded"]["portnum"].ljust(columns[2])
payload = (packet["decoded"]["payload"]).ljust(columns[3])
port = str(packet["decoded"].get("portnum", "")).ljust(columns[2])
parsed_payload = parse_protobuf(packet)
else:
port = "NO KEY".ljust(columns[2])
payload = "NO KEY".ljust(columns[3])
parsed_payload = "NO KEY"
# Combine and truncate if necessary
logString = f"{from_id} {to_id} {port} {payload}"
logString = f"{from_id} {to_id} {port} {parsed_payload}"
logString = logString[: width - 3]
# Add to the window
packetlog_win.addstr(i + 2, 1, logString, get_color("log"))
packetlog_win.attrset(get_color("window_frame"))
packetlog_win.box()
packetlog_win.refresh()
paint_frame(packetlog_win, selected=False)
# Restore cursor to input field
entry_win.keypad(True)
@@ -841,101 +1087,13 @@ def search(win: int) -> None:
entry_win.erase()
def draw_node_details() -> None:
"""Draw the details of the selected node in the function window."""
node = None
try:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
except KeyError:
return
function_win.erase()
function_win.box()
nodestr = ""
width = function_win.getmaxyx()[1]
node_details_list = [
f"{node['user']['longName']} " if "user" in node and "longName" in node["user"] else "",
f"({node['user']['shortName']})" if "user" in node and "shortName" in node["user"] else "",
f" | {node['user']['hwModel']}" if "user" in node and "hwModel" in node["user"] else "",
f" | {node['user']['role']}" if "user" in node and "role" in node["user"] else "",
]
if ui_state.node_list[ui_state.selected_node] == interface_state.myNodeNum:
node_details_list.extend(
[
(
f" | Bat: {node['deviceMetrics']['batteryLevel']}% ({node['deviceMetrics']['voltage']}v)"
if "deviceMetrics" in node
and "batteryLevel" in node["deviceMetrics"]
and "voltage" in node["deviceMetrics"]
else ""
),
(
f" | Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]
else ""
),
(
f" | ChUtil: {node['deviceMetrics']['channelUtilization']:.2f}%"
if "deviceMetrics" in node and "channelUtilization" in node["deviceMetrics"]
else ""
),
(
f" | AirUtilTX: {node['deviceMetrics']['airUtilTx']:.2f}%"
if "deviceMetrics" in node and "airUtilTx" in node["deviceMetrics"]
else ""
),
]
)
else:
node_details_list.extend(
[
f" | {get_time_ago(node['lastHeard'])}" if ("lastHeard" in node and node["lastHeard"]) else "",
f" | Hops: {node['hopsAway']}" if "hopsAway" in node else "",
f" | SNR: {node['snr']}dB" if ("snr" in node and "hopsAway" in node and node["hopsAway"] == 0) else "",
]
)
for s in node_details_list:
if len(nodestr) + len(s) < width - 2:
nodestr = nodestr + s
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_help() -> None:
"""Draw the help text in the function window."""
cmds = [
"↑→↓← = Select",
" ENTER = Send",
" ` = Settings",
" ESC = Quit",
" ^P = Packet Log",
" ^t = Traceroute",
" ^d = Archive Chat",
" ^f = Favorite",
" ^g = Ignore",
" ^/ = Search",
]
function_str = ""
for s in cmds:
if len(function_str) + len(s) < function_win.getmaxyx()[1] - 2:
function_str += s
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
def draw_function_win() -> None:
if ui_state.current_window == 2:
draw_node_details()
else:
draw_help()
def refresh_pad(window: int) -> None:
# If in single-pane mode and this isn't the focused window, skip refreshing its (collapsed) pad
if ui_state.single_pane_mode and window != ui_state.current_window:
return
# Derive the target box and pad for the requested window
win_height = channel_win.getmaxyx()[0]
if window == 1:
@@ -953,6 +1111,7 @@ def refresh_pad(window: int) -> None:
pad = nodes_pad
box = nodes_win
lines = box.getmaxyx()[0] - 2
box.addstr(0, 2, (f"Nodes: {len(ui_state.node_list)}"), curses.A_BOLD)
selected_item = ui_state.selected_node
start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders
@@ -963,13 +1122,39 @@ def refresh_pad(window: int) -> None:
selected_item = ui_state.selected_channel
start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders
# Compute inner drawable area of the box
box_y, box_x = box.getbegyx()
box_h, box_w = box.getmaxyx()
inner_h = max(0, box_h - 2) # minus borders
inner_w = max(0, box_w - 2)
if inner_h <= 0 or inner_w <= 0:
return
# Clamp lines to available inner height
lines = max(0, min(lines, inner_h))
# Clamp start_index within the pad's height
pad_h, pad_w = pad.getmaxyx()
if pad_h <= 0:
return
start_index = max(0, min(start_index, max(0, pad_h - 1)))
top = box_y + 1
left = box_x + 1
bottom = box_y + min(inner_h, lines) # inclusive
right = box_x + min(inner_w, box_w - 2)
if bottom < top or right < left:
return
pad.refresh(
start_index,
0,
box.getbegyx()[0] + 1,
box.getbegyx()[1] + 1,
box.getbegyx()[0] + lines,
box.getbegyx()[1] + box.getmaxyx()[1] - 3,
top,
left,
bottom,
right,
)
@@ -985,7 +1170,23 @@ def remove_notification(channel_number: int) -> None:
def draw_text_field(win: curses.window, text: str, color: int) -> None:
win.border()
win.addstr(1, 1, text, color)
# Put a small hint in the border of the message entry field.
# We key off the "Message:" prompt to avoid affecting other bordered fields.
if isinstance(text, str) and text.startswith("Message:"):
hint = " Ctrl+K Help "
h, w = win.getmaxyx()
x = max(2, w - len(hint) - 2)
try:
win.addstr(0, x, hint, get_color("commands"))
except curses.error:
pass
# Draw the actual field text
try:
win.addstr(1, 1, text, color)
except curses.error:
pass
def draw_centered_text_field(win: curses.window, text: str, y_offset: int, color: int) -> None:
@@ -994,8 +1195,3 @@ def draw_centered_text_field(win: curses.window, text: str, y_offset: int, color
y = (height // 2) + y_offset
win.addstr(y, x, text, color)
win.refresh()
def draw_debug(value: Union[str, int]) -> None:
function_win.addstr(1, 1, f"debug: {value} ")
function_win.refresh()

View File

@@ -6,6 +6,7 @@ import sys
from typing import List
from contact.utilities.save_to_radio import save_changes
import contact.ui.default_config as config
from contact.utilities.config_io import config_export, config_import
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
from contact.utilities.input_handlers import (
@@ -20,60 +21,74 @@ from contact.ui.dialog import dialog
from contact.ui.menus import generate_menu_from_protobuf
from contact.ui.nav_utils import move_highlight, draw_arrows, update_help_window
from contact.ui.user_config import json_editor
from contact.ui.ui_state import MenuState
from contact.utilities.singleton import menu_state
menu_state = MenuState()
# Constants
width = 80
# Setup Variables
MAX_MENU_WIDTH = 80 # desired max; will shrink on small terminals
save_option = "Save Changes"
max_help_lines = 0
help_win = None
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
# Compute the effective menu width for the current terminal
def get_menu_width() -> int:
# Leave at least 2 columns for borders; clamp to >= 20 for usability
return max(20, min(MAX_MENU_WIDTH, curses.COLS - 2))
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
# Get the parent directory of the script
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
# locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
translation_file = os.path.join(parent_dir, "localisations", "en.ini")
config_folder = os.path.join(locals_dir, "node-configs")
# config_folder = os.path.join(locals_dir, "node-configs")
config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.window or pad types
def display_menu() -> tuple[object, object]:
# if help_win:
# min_help_window_height = 6
# else:
# min_help_window_height = 0
min_help_window_height = 6
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
# Determine the available height for the menu
max_menu_height = curses.LINES
menu_height = min(max_menu_height - min_help_window_height, num_items + 5)
w = get_menu_width()
start_y = (curses.LINES - menu_height) // 2 - (min_help_window_height // 2)
start_x = (curses.COLS - width) // 2
start_x = (curses.COLS - w) // 2
# Calculate remaining space for help window
global max_help_lines
remaining_space = curses.LINES - (start_y + menu_height + 2) # +2 for padding
max_help_lines = max(remaining_space, 1) # Ensure at least 1 lines for help
menu_win = curses.newwin(menu_height, width, start_y, start_x)
menu_win = curses.newwin(menu_height, w, start_y, start_x)
menu_win.erase()
menu_win.bkgd(get_color("background"))
menu_win.attrset(get_color("window_frame"))
menu_win.border()
menu_win.keypad(True)
menu_pad = curses.newpad(len(menu_state.current_menu) + 1, width - 8)
menu_pad = curses.newpad(len(menu_state.current_menu) + 1, w - 8)
menu_pad.bkgd(get_color("background"))
header = " > ".join(word.title() for word in menu_state.menu_path)
if len(header) > width - 4:
header = header[: width - 7] + "..."
if len(header) > w - 4:
header = header[: w - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
transformed_path = transform_menu_path(menu_state.menu_path)
@@ -84,15 +99,15 @@ def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.wind
full_key = ".".join(transformed_path + [option])
display_name = field_mapping.get(full_key, option)
display_option = f"{display_name}"[: width // 2 - 2]
display_value = f"{current_value}"[: width // 2 - 4]
display_option = f"{display_name}"[: w // 2 - 2]
display_value = f"{current_value}"[: w // 2 - 4]
try:
color = get_color(
"settings_sensitive" if option in sensitive_settings else "settings_default",
reverse=(idx == menu_state.selected_index),
)
menu_pad.addstr(idx, 0, f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
menu_pad.addstr(idx, 0, f"{display_option:<{w // 2 - 2}} {display_value}".ljust(w - 8), color)
except curses.error:
pass
@@ -100,13 +115,13 @@ def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.wind
save_position = menu_height - 2
menu_win.addstr(
save_position,
(width - len(save_option)) // 2,
(w - len(save_option)) // 2,
save_option,
get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))),
)
# Draw help window with dynamically updated max_help_lines
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path)
menu_win.refresh()
menu_pad.refresh(
@@ -117,6 +132,7 @@ def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.wind
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
)
curses.curs_set(0)
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
@@ -132,9 +148,7 @@ def draw_help_window(
menu_height: int,
max_help_lines: int,
transformed_path: List[str],
menu_state: MenuState,
) -> None:
global help_win
if "help_win" not in globals():
@@ -145,11 +159,21 @@ def draw_help_window(
)
help_y = menu_start_y + menu_height
# Use current terminal width for the help window width calculation
help_win = update_help_window(
help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_start_x
help_win, help_text, transformed_path, selected_option, max_help_lines, get_menu_width(), help_y, menu_start_x
)
def get_input_type_for_field(field) -> type:
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
return int
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
return float
else:
return str
def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols()
@@ -159,29 +183,33 @@ def settings_menu(stdscr: object, interface: object) -> None:
modified_settings = {}
need_redraw = True
menu_state.need_redraw = True
menu_state.show_save_option = False
new_value_name = None
while True:
if need_redraw:
if menu_state.need_redraw:
menu_state.need_redraw = False
options = list(menu_state.current_menu.keys())
# Determine if save option should be shown
path = menu_state.menu_path
menu_state.show_save_option = (
(
len(menu_state.menu_path) > 2
and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
)
or (len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path)
or (len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path)
(len(path) > 2 and ("Radio Settings" in path or "Module Settings" in path))
or (len(path) == 2 and "User Settings" in path)
or (len(path) == 3 and "Channels" in path)
)
# Display the menu
menu_win, menu_pad = display_menu(menu_state)
menu_win, menu_pad = display_menu()
need_redraw = False
if menu_win is None:
continue # Skip if menu_win is not initialized
# Capture user input
menu_win.timeout(200) # wait up to 200 ms for a keypress (or less if key is pressed)
key = menu_win.getch()
if key == -1:
continue
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# max_help_lines = 4
@@ -215,14 +243,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
)
elif key == curses.KEY_RESIZE:
need_redraw = True
menu_state.need_redraw = True
curses.update_lines_cols()
menu_win.erase()
help_win.erase()
if help_win:
help_win.erase()
menu_win.refresh()
help_win.refresh()
if help_win:
help_win.refresh()
elif key == ord("\t") and menu_state.show_save_option:
old_selected_index = menu_state.selected_index
@@ -239,15 +269,17 @@ def settings_menu(stdscr: object, interface: object) -> None:
)
elif key == curses.KEY_RIGHT or key == ord("\n"):
need_redraw = True
menu_state.need_redraw = True
menu_state.start_index.append(0)
menu_win.erase()
help_win.erase()
if help_win:
help_win.erase()
# draw_help_window(menu_win.getbegyx()[0], menu_win.getbegyx()[1], menu_win.getmaxyx()[0], max_help_lines, menu_state.current_menu, selected_index, transform_menu_path(menu_state.menu_path))
menu_win.refresh()
help_win.refresh()
if help_win:
help_win.refresh()
if menu_state.show_save_option and menu_state.selected_index == len(options):
save_changes(interface, modified_settings, menu_state)
@@ -268,7 +300,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
elif selected_option == "Export Config File":
filename = get_text_input("Enter a filename for the config file")
filename = get_text_input("Enter a filename for the config file", None, None)
if not filename:
logging.info("Export aborted: No filename provided.")
menu_state.start_index.pop()
@@ -290,7 +323,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
dialog(stdscr, "Config File Saved:", yaml_file_path)
dialog("Config File Saved:", yaml_file_path)
menu_state.need_redraw = True
menu_state.start_index.pop()
continue
except PermissionError:
@@ -306,14 +340,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog(stdscr, "", " No config files found. Export a config first.")
dialog("", " No config files found. Export a config first.")
menu_state.need_redraw = True
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
dialog(stdscr, "", " No config files found. Export a config first.")
dialog("", " No config files found. Export a config first.")
menu_state.need_redraw = True
continue
filename = get_list_input("Choose a config file", None, file_list)
@@ -327,7 +363,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
elif selected_option == "Config URL":
current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}")
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
if new_value is not None:
current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
@@ -380,7 +416,6 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
menu_state.selected_index = 4
continue
# need_redraw = True
field_info = menu_state.current_menu.get(selected_option)
if isinstance(field_info, tuple):
@@ -395,7 +430,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, None
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -414,7 +451,9 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, float
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -453,25 +492,42 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()
for key in menu_state.menu_path[3:]: # Skip "Main Menu"
modified_settings = modified_settings.setdefault(key, {})
# Add the new value to the appropriate level
modified_settings[selected_option] = new_value
# For comparison, normalize enum numbers to names
compare_value = new_value
if field and field.enum_type and isinstance(new_value, int):
enum_value_descriptor = field.enum_type.values_by_number.get(new_value)
if enum_value_descriptor:
compare_value = enum_value_descriptor.name
if compare_value != current_value:
# Save the raw protobuf number, not the name
modified_settings[selected_option] = new_value
# Convert enum string to int
if field and field.enum_type:
@@ -486,19 +542,42 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.selected_index = 0
elif key == curses.KEY_LEFT:
need_redraw = True
# If we are at the main menu and there are unsaved changes, prompt to save
if len(menu_state.menu_path) == 3 and modified_settings:
current_section = menu_state.menu_path[-1]
save_prompt = get_list_input(
f"You have unsaved changes in {current_section}. Save before exiting?",
None,
["Yes", "No", "Cancel"],
mandatory=True,
)
if save_prompt == "Cancel":
continue # Stay in the menu without doing anything
elif save_prompt == "Yes":
save_changes(interface, modified_settings, menu_state)
logging.info("Changes Saved")
modified_settings.clear()
menu = rebuild_menu_at_current_path(interface, menu_state)
pass
menu_state.need_redraw = True
menu_win.erase()
help_win.erase()
if help_win:
help_win.erase()
# max_help_lines = 4
# draw_help_window(menu_win.getbegyx()[0], menu_win.getbegyx()[1], menu_win.getmaxyx()[0], max_help_lines, menu_state.current_menu, selected_index, transform_menu_path(menu_state.menu_path))
menu_win.refresh()
help_win.refresh()
if help_win:
help_win.refresh()
if len(menu_state.menu_path) < 2:
modified_settings.clear()
# if len(menu_state.menu_path) < 2:
# modified_settings.clear()
# Navigate back to the previous menu
if len(menu_state.menu_path) > 1:
@@ -515,6 +594,16 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
def rebuild_menu_at_current_path(interface, menu_state):
"""Rebuild menus from the device and re-point current_menu to the same path."""
new_menu = generate_menu_from_protobuf(interface)
cur = new_menu["Main Menu"]
for step in menu_state.menu_path[1:]:
cur = cur.get(step, {})
menu_state.current_menu = cur
return new_menu
def set_region(interface: object) -> None:
node = interface.getNode("^local")
device_config = node.localConfig

View File

@@ -2,15 +2,69 @@ import json
import logging
import os
from typing import Dict
from contact.ui.colors import setup_colors
# Get the parent directory of the script
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
# To test writting to a non-writable directory, you can uncomment the following lines:
# mkdir /tmp/test_nonwritable
# chmod -w /tmp/test_nonwritable
# parent_dir = "/tmp/test_nonwritable"
def reload_config() -> None:
loaded_config = initialize_config()
assign_config_variables(loaded_config)
setup_colors(reinit=True)
def _is_writable_dir(path: str) -> bool:
"""
Return True if we can create & delete a temp file in `path`.
"""
if not os.path.isdir(path):
return False
test_path = os.path.join(path, ".perm_test_tmp")
try:
with open(test_path, "w", encoding="utf-8") as _tmp:
_tmp.write("ok")
os.remove(test_path)
return True
except OSError:
return False
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
"""
Choose a writable directory for config artifacts.
"""
if _is_writable_dir(preferred_dir):
return preferred_dir
home = os.path.expanduser("~")
fallback_dir = os.path.join(home, fallback_name)
# Ensure the fallback exists.
os.makedirs(fallback_dir, exist_ok=True)
# If *that* still isn't writable, last-ditch: use a system temp dir.
if not _is_writable_dir(fallback_dir):
import tempfile
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
return fallback_dir
# Pick the root now.
config_root = _get_config_root(parent_dir)
# Paths (derived from the chosen root)
json_file_path = os.path.join(config_root, "config.json")
log_file_path = os.path.join(config_root, "client.log")
db_file_path = os.path.join(config_root, "client.db")
node_configs_file_path = os.path.join(config_root, "node-configs/")
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
@@ -129,8 +183,10 @@ def initialize_config() -> Dict[str, object]:
default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"single_pane_mode": "False",
"db_file_path": db_file_path,
"log_file_path": log_file_path,
"node_configs_file_path": node_configs_file_path,
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
@@ -171,16 +227,18 @@ def initialize_config() -> Dict[str, object]:
def assign_config_variables(loaded_config: Dict[str, object]) -> None:
# Assign values to local variables
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global db_file_path, log_file_path, node_configs_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global node_list_16ths, channel_list_16ths, single_pane_mode
global theme, COLOR_CONFIG
global node_sort, notification_sound
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
single_pane_mode = loaded_config["single_pane_mode"]
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
node_configs_file_path = loaded_config.get("node_configs_file_path")
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
@@ -212,6 +270,7 @@ if __name__ == "__main__":
print("\nLoaded Configuration:")
print(f"Database File Path: {db_file_path}")
print(f"Log File Path: {log_file_path}")
print(f"Configs File Path: {node_configs_file_path}")
print(f"Message Prefix: {message_prefix}")
print(f"Sent Message Prefix: {sent_message_prefix}")
print(f"Notification Symbol: {notification_symbol}")

View File

@@ -1,44 +1,164 @@
import curses
from contact.ui.colors import get_color
from contact.ui.nav_utils import draw_main_arrows
from contact.utilities.singleton import menu_state, ui_state
def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
def dialog(title: str, message: str) -> None:
"""Display a dialog with a title and message."""
# Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines()
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
previous_window = ui_state.current_window
ui_state.current_window = 4
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
# Parse message into lines and calculate dimensions
message_lines = message.splitlines() or [""]
max_line_length = max(len(l) for l in message_lines)
# Desired size
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
y = (height - dialog_height) // 2
desired_height = len(message_lines) + 4
# Clamp dialog size to the screen (leave a 1-cell margin if possible)
max_w = max(10, width - 2)
max_h = max(6, height - 2)
dialog_width = min(dialog_width, max_w)
dialog_height = min(desired_height, max_h)
x = max(0, (width - dialog_width) // 2)
y = max(0, (height - dialog_height) // 2)
# Ensure we have a start index slot for this dialog window id (4)
# ui_state.start_index is used by draw_main_arrows()
try:
while len(ui_state.start_index) <= 4:
ui_state.start_index.append(0)
except Exception:
# If start_index isn't list-like, fall back to an attribute
if not hasattr(ui_state, "start_index"):
ui_state.start_index = [0, 0, 0, 0, 0]
def visible_message_rows() -> int:
# Rows available for message text inside the border, excluding title row and OK row.
# Layout:
# row 0: title
# rows 1..(dialog_height-3): message viewport (with arrows drawn on a subwindow)
# row dialog_height-2: OK button
# So message viewport height is dialog_height - 3 - 1 + 1 = dialog_height - 3
return max(1, dialog_height - 4)
def draw_window():
win.erase()
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
# Title
try:
win.addstr(0, 2, title[: max(0, dialog_width - 4)], get_color("settings_default"))
except curses.error:
pass
# Message viewport
viewport_h = visible_message_rows()
start = ui_state.start_index[4]
start = max(0, min(start, max(0, len(message_lines) - viewport_h)))
ui_state.start_index[4] = start
# Create a subwindow covering the message region so draw_main_arrows() doesn't collide with the OK row
msg_win = win.derwin(viewport_h + 2, dialog_width - 2, 1, 1)
msg_win.erase()
for i in range(viewport_h):
idx = start + i
if idx >= len(message_lines):
break
line = message_lines[idx]
# Hard-trim lines that don't fit
trimmed = line[: max(0, dialog_width - 6)]
msg_x = max(0, ((dialog_width - 2) - len(trimmed)) // 2)
try:
msg_win.addstr(1 + i, msg_x, trimmed, get_color("settings_default"))
except curses.error:
pass
# Draw arrows only when scrolling is needed
if len(message_lines) > viewport_h:
draw_main_arrows(msg_win, len(message_lines) - 1, window=4)
else:
# Clear arrow positions if not needed
try:
h, w = msg_win.getmaxyx()
msg_win.addstr(1, w - 2, " ", get_color("settings_default"))
msg_win.addstr(h - 2, w - 2, " ", get_color("settings_default"))
except curses.error:
pass
msg_win.noutrefresh()
# OK button
ok_text = " Ok "
try:
win.addstr(
dialog_height - 2,
(dialog_width - len(ok_text)) // 2,
ok_text,
get_color("settings_default", reverse=True),
)
except curses.error:
pass
win.noutrefresh()
curses.doupdate()
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
win.keypad(True)
draw_window()
# Add title
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()
# Get user input
while True:
win.timeout(200)
char = win.getch()
# Close dialog with enter, space, or esc
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
if menu_state.need_redraw:
menu_state.need_redraw = False
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
draw_window()
# Close dialog
ok_selected = True
if char in (27, curses.KEY_LEFT): # Esc or Left arrow
win.erase()
win.refresh()
ui_state.current_window = previous_window
return
if ok_selected and char in (curses.KEY_ENTER, 10, 13, 32):
win.erase()
win.refresh()
ui_state.current_window = previous_window
return
if char == -1:
continue
# Scroll if the dialog is clipped vertically
viewport_h = visible_message_rows()
if len(message_lines) > viewport_h:
start = ui_state.start_index[4]
max_start = max(0, len(message_lines) - viewport_h)
if char in (curses.KEY_UP, ord("k")):
ui_state.start_index[4] = max(0, start - 1)
draw_window()
elif char in (curses.KEY_DOWN, ord("j")):
ui_state.start_index[4] = min(max_start, start + 1)
draw_window()
elif char == curses.KEY_PPAGE: # Page up
ui_state.start_index[4] = max(0, start - viewport_h)
draw_window()
elif char == curses.KEY_NPAGE: # Page down
ui_state.start_index[4] = min(max_start, start + viewport_h)
draw_window()

View File

@@ -22,9 +22,9 @@ def get_node_color(node_index: int, reverse: bool = False):
Segment = tuple[str, str, bool, bool]
WrappedLine = List[Segment]
width = 80
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
save_option = "Save Changes"
MIN_HEIGHT_FOR_HELP = 20
def move_highlight(
@@ -73,9 +73,8 @@ def move_highlight(
# Clear old selection
if show_save_option and old_idx == max_index:
menu_win.chgat(
menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save")
)
win_h, win_w = menu_win.getmaxyx()
menu_win.chgat(win_h - 2, (win_w - len(save_option)) // 2, len(save_option), get_color("settings_save"))
else:
menu_pad.chgat(
old_idx,
@@ -90,9 +89,10 @@ def move_highlight(
# Highlight new selection
if show_save_option and new_idx == max_index:
win_h, win_w = menu_win.getmaxyx()
menu_win.chgat(
menu_win.getmaxyx()[0] - 2,
(width - len(save_option)) // 2,
win_h - 2,
(win_w - len(save_option)) // 2,
len(save_option),
get_color("settings_save", reverse=True),
)
@@ -124,13 +124,14 @@ def move_highlight(
selected_option = options[new_idx] if new_idx < len(options) else None
help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
if help_win:
win_h, win_w = menu_win.getmaxyx()
help_win = update_help_window(
help_win,
help_text,
transformed_path,
selected_option,
max_help_lines,
width,
win_w,
help_y,
menu_win.getbegyx()[1],
)
@@ -167,23 +168,46 @@ def update_help_window(
help_x: int,
) -> object: # returns a curses window
"""Handles rendering the help window consistently."""
wrapped_help = get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_help_lines)
if curses.LINES < MIN_HEIGHT_FOR_HELP:
return None
# Clamp target position and width to the current terminal size
help_x = max(0, help_x)
help_y = max(0, help_y)
# Ensure requested width fits on screen from help_x
max_w_from_x = max(1, curses.COLS - help_x)
safe_width = min(width, max_w_from_x)
# Always leave a minimal border area; enforce a minimum usable width of 3
safe_width = max(3, safe_width)
wrapped_help = get_wrapped_help_text(help_text, transformed_path, selected_option, safe_width, max_help_lines)
help_height = min(len(wrapped_help) + 2, max_help_lines + 2) # +2 for border
help_height = max(help_height, 3) # Ensure at least 3 rows (1 text + border)
# Ensure help window does not exceed screen size
# Re-clamp Y to keep the window visible
if help_y + help_height > curses.LINES:
help_y = curses.LINES - help_height
help_y = max(0, curses.LINES - help_height)
# If width would overflow the screen, shrink it
if help_x + safe_width > curses.COLS:
safe_width = max(3, curses.COLS - help_x)
# Create or update the help window
if help_win is None:
help_win = curses.newwin(help_height, width, help_y, help_x)
help_win = curses.newwin(help_height, safe_width, help_y, help_x)
else:
help_win.erase()
help_win.refresh()
help_win.resize(help_height, width)
help_win.mvwin(help_y, help_x)
help_win.resize(help_height, safe_width)
try:
help_win.mvwin(help_y, help_x)
except curses.error:
# If moving fails due to edge conditions, pin to (0,0) as a fallback
help_y = 0
help_x = 0
help_win.mvwin(help_y, help_x)
help_win.bkgd(get_color("background"))
help_win.attrset(get_color("window_frame"))
@@ -295,14 +319,16 @@ def get_wrapped_help_text(
return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
whitespace = "\t\n\x0b\x0c\r "
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(" "))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately

View File

@@ -22,6 +22,7 @@ def draw_splash(stdscr: object) -> None:
stdscr.addstr(start_y + 1, start_x - 1, message_2, get_color("splash_logo", bold=True))
stdscr.addstr(start_y + 2, start_x - 2, message_3, get_color("splash_logo", bold=True))
stdscr.addstr(start_y + 4, start_x2, message_4, get_color("splash_text"))
stdscr.move(start_y + 5, start_x2)
stdscr.attrset(get_color("window_frame"))
stdscr.box()

View File

@@ -10,6 +10,7 @@ class MenuState:
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
menu_path: List[str] = field(default_factory=list)
show_save_option: bool = False
need_redraw: bool = False
@dataclass
@@ -24,11 +25,14 @@ class ChatUIState:
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
last_sent_time: float = 0.0
last_traceroute_time: float = 0.0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
single_pane_mode: bool = False
@dataclass

View File

@@ -4,16 +4,23 @@ import curses
from typing import Any, List, Dict
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
import contact.ui.default_config as config
from contact.ui.nav_utils import move_highlight, draw_arrows
from contact.utilities.input_handlers import get_list_input
from contact.utilities.singleton import menu_state
width = 80
MAX_MENU_WIDTH = 80 # desired max; will shrink on small terminals
max_help_lines = 6
save_option = "Save Changes"
# Compute an effective width that fits the current terminal
def get_effective_width() -> int:
# Leave space for borders; ensure a sane minimum
return max(20, min(MAX_MENU_WIDTH, curses.COLS - 2))
def edit_color_pair(key: str, current_value: List[str]) -> List[str]:
"""
Allows the user to select a foreground and background color for a key.
@@ -27,13 +34,14 @@ def edit_color_pair(key: str, current_value: List[str]) -> List[str]:
def edit_value(key: str, current_value: str) -> str:
w = get_effective_width()
height = 10
input_width = width - 16 # Allow space for "New Value: "
input_width = w - 16 # Allow space for "New Value: "
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
start_x = max(0, (curses.COLS - w) // 2)
# Create a centered window
edit_win = curses.newwin(height, width, start_y, start_x)
edit_win = curses.newwin(height, w, start_y, start_x)
edit_win.bkgd(get_color("background"))
edit_win.attrset(get_color("window_frame"))
edit_win.border()
@@ -42,7 +50,7 @@ def edit_value(key: str, current_value: str) -> str:
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
wrap_width = width - 4 # Account for border and padding
wrap_width = w - 4 # Account for border and padding
wrapped_lines = [current_value[i : i + wrap_width] for i in range(0, len(current_value), wrap_width)]
for i, line in enumerate(wrapped_lines[:4]): # Limit display to fit the window height
@@ -53,7 +61,9 @@ def edit_value(key: str, current_value: str) -> str:
# Handle theme selection dynamically
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
theme_options = [
k.split("_", 2)[2].lower() for k in config.loaded_config.keys() if k.startswith("COLOR_CONFIG")
]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
@@ -64,6 +74,10 @@ def edit_value(key: str, current_value: str) -> str:
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
elif key == "single_pane_mode":
sound_options = ["True", "False"]
return get_list_input("Single-Pane Mode", current_value, sound_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
@@ -72,41 +86,64 @@ def edit_value(key: str, current_value: str) -> str:
user_input = ""
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[scroll_offset : scroll_offset + input_width] # Only show what fits
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
if menu_state.need_redraw:
curses.update_lines_cols()
menu_state.need_redraw = False
# Re-create the window to fully reset state
edit_win = curses.newwin(height, w, start_y, start_x)
edit_win.timeout(200)
edit_win.bkgd(get_color("background"))
edit_win.attrset(get_color("window_frame"))
edit_win.border()
# Redraw static content
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
for i, line in enumerate(wrapped_lines[:4]):
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
visible_text = user_input[scroll_offset : scroll_offset + input_width]
edit_win.addstr(row, col, " " * input_width, get_color("settings_default"))
edit_win.addstr(row, col, visible_text, get_color("settings_default"))
edit_win.refresh()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width))
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
try:
key = edit_win.get_wch()
except curses.error:
continue # window not ready — skip this loop
if key in (chr(27), curses.KEY_LEFT):
curses.curs_set(0)
return current_value # Exit without returning a value
return current_value
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
if user_input: # Only process if there's something to delete
elif key in (curses.KEY_BACKSPACE, chr(127)):
if user_input:
user_input = user_input[:-1]
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
scroll_offset -= 1 # Move back if text is shorter than scrolled area
scroll_offset -= 1
else:
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
if len(user_input) > input_width: # Scroll if input exceeds visible area
if len(user_input) > input_width:
scroll_offset += 1
curses.curs_set(0)
return user_input if user_input else current_value
def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
def display_menu() -> tuple[Any, Any, List[str]]:
"""
Render the configuration menu with a Save button directly added to the window.
"""
@@ -124,11 +161,12 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
max_menu_height = curses.LINES
menu_height = min(max_menu_height, num_items + 5)
num_items = len(options)
w = get_effective_width()
start_y = (curses.LINES - menu_height) // 2
start_x = (curses.COLS - width) // 2
start_x = max(0, (curses.COLS - w) // 2)
# Create the window
menu_win = curses.newwin(menu_height, width, start_y, start_x)
menu_win = curses.newwin(menu_height, w, start_y, start_x)
menu_win.erase()
menu_win.bkgd(get_color("background"))
menu_win.attrset(get_color("window_frame"))
@@ -136,13 +174,13 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
menu_win.keypad(True)
# Create the pad for scrolling
menu_pad = curses.newpad(num_items + 1, width - 8)
menu_pad = curses.newpad(num_items + 1, w - 8)
menu_pad.bkgd(get_color("background"))
# Display the menu path
header = " > ".join(menu_state.menu_path)
if len(header) > width - 4:
header = header[: width - 7] + "..."
if len(header) > w - 4:
header = header[: w - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
# Populate the pad with menu options
@@ -152,18 +190,18 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
if isinstance(menu_state.current_menu, dict)
else menu_state.current_menu[int(key.strip("[]"))]
)
display_key = f"{key}"[: width // 2 - 2]
display_value = f"{value}"[: width // 2 - 8]
display_key = f"{key}"[: w // 2 - 2]
display_value = f"{value}"[: w // 2 - 8]
color = get_color("settings_default", reverse=(idx == menu_state.selected_index))
menu_pad.addstr(idx, 0, f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
menu_pad.addstr(idx, 0, f"{display_key:<{w // 2 - 2}} {display_value}".ljust(w - 8), color)
# Add Save button to the main window
if menu_state.show_save_option:
save_position = menu_height - 2
menu_win.addstr(
save_position,
(width - len(save_option)) // 2,
(w - len(save_option)) // 2,
save_option,
get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))),
)
@@ -189,6 +227,7 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.selected_index = 0 # Track the selected option
made_changes = False # Track if any changes were made
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
@@ -211,16 +250,18 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.current_menu = data # Track the current level of the menu
# Render the menu
menu_win, menu_pad, options = display_menu(menu_state)
need_redraw = True
menu_win, menu_pad, options = display_menu()
menu_state.need_redraw = True
while True:
if need_redraw:
menu_win, menu_pad, options = display_menu(menu_state)
if menu_state.need_redraw:
menu_state.need_redraw = False
menu_win, menu_pad, options = display_menu()
menu_win.refresh()
need_redraw = False
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
menu_win.timeout(200)
key = menu_win.getch()
if key == curses.KEY_UP:
@@ -248,7 +289,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
need_redraw = True
menu_state.need_redraw = True
menu_win.erase()
menu_win.refresh()
@@ -269,11 +310,14 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
if isinstance(selected_data, list) and len(selected_data) == 2:
# Edit color pair
old = selected_data
new_value = edit_color_pair(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.start_index.pop()
menu_state.menu_index.pop()
menu_state.current_menu[selected_key] = new_value
if new_value != old:
made_changes = True
elif isinstance(selected_data, (dict, list)):
# Navigate into nested data
@@ -282,22 +326,27 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# General value editing
old = selected_data
new_value = edit_value(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.menu_index.pop()
menu_state.start_index.pop()
menu_state.current_menu[selected_key] = new_value
need_redraw = True
menu_state.need_redraw = True
if new_value != old:
made_changes = True
else:
# Save button selected
save_json(file_path, data)
made_changes = False
stdscr.refresh()
continue
# config.reload() # This isn't refreshing the file paths as expected
break
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
need_redraw = True
menu_state.need_redraw = True
menu_win.erase()
menu_win.refresh()
@@ -318,6 +367,19 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# Exit the editor
if made_changes:
save_prompt = get_list_input(
"You have unsaved changes. Save before exiting?",
None,
["Yes", "No", "Cancel"],
mandatory=True,
)
if save_prompt == "Cancel":
continue # Stay in the menu without doing anything
elif save_prompt == "Yes":
save_json(file_path, data)
made_changes = False
menu_win.clear()
menu_win.refresh()
@@ -325,7 +387,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
def save_json(file_path: str, data: Dict[str, Any]) -> None:
formatted_json = format_json_single_line_arrays(data)
formatted_json = config.format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
setup_colors(reinit=True)
@@ -334,7 +396,6 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
def main(stdscr: curses.window) -> None:
from contact.ui.ui_state import MenuState
menu_state = MenuState()
if len(menu_state.menu_path) == 0:
menu_state.menu_path = ["App Settings"] # Initialize if not set

View File

@@ -116,7 +116,14 @@ def load_messages_from_db() -> None:
# Add messages to ui_state.all_messages grouped by hourly timestamp
hourly_messages = {}
for user_id, message, timestamp, ack_type in db_messages:
for row in db_messages:
user_id, message, timestamp, ack_type = row
# Only ack_type is allowed to be None
if user_id is None or message is None or timestamp is None:
logging.warning(f"Skipping row with NULL required field(s): {row}")
continue
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
if hour not in hourly_messages:
hourly_messages[hour] = []
@@ -129,12 +136,19 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
ts_str = datetime.fromtimestamp(timestamp).strftime("[%H:%M:%S]")
if user_id == str(interface_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
sanitized_message = message.replace("\x00", "")
formatted_message = (
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
message,
f"{ts_str} {config.sent_message_prefix}{ack_str}: ",
sanitized_message,
)
else:
sanitized_message = message.replace("\x00", "")
formatted_message = (
f"{ts_str} {config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
sanitized_message,
)
hourly_messages[hour].append(formatted_message)

View File

@@ -6,20 +6,68 @@ from typing import Any, Optional, List
from contact.ui.colors import get_color
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
from contact.ui.dialog import dialog
from contact.utilities.validation_rules import get_validation_for
from contact.utilities.singleton import menu_state
# Dialogs should be at most 80 cols, but shrink on small terminals
MAX_DIALOG_WIDTH = 80
MIN_DIALOG_WIDTH = 20
def get_text_input(prompt: str) -> Optional[str]:
def get_dialog_width() -> int:
# Leave 2 columns for borders and clamp to a sane minimum
try:
return max(MIN_DIALOG_WIDTH, min(MAX_DIALOG_WIDTH, curses.COLS - 2))
except Exception:
# Fallback if curses not ready yet
return MAX_DIALOG_WIDTH
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
"""Displays an invalid input message in the given window and redraws if needed."""
cursor_y, cursor_x = window.getyx()
curses.curs_set(0)
dialog("Invalid Input", message)
if redraw_func:
redraw_func() # Redraw the original window content that got obscured
else:
window.refresh()
window.move(cursor_y, cursor_x)
curses.curs_set(1)
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts."""
def redraw_input_win():
"""Redraw the input window with the current prompt and user input."""
input_win.erase()
input_win.border()
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3:
break
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
height = 8
width = 80
width = get_dialog_width()
margin = 2 # Left and right margin
input_width = width - (2 * margin) # Space available for text
max_input_rows = height - 4 # Space for input
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
start_y = max(0, (curses.LINES - height) // 2)
start_x = max(0, (curses.COLS - width) // 2)
input_win = curses.newwin(height, width, start_y, start_x)
input_win.timeout(200)
input_win.bkgd(get_color("background"))
input_win.attrset(get_color("window_frame"))
input_win.border()
@@ -27,6 +75,7 @@ def get_text_input(prompt: str) -> Optional[str]:
# Wrap the prompt text
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
@@ -39,34 +88,125 @@ def get_text_input(prompt: str) -> Optional[str]:
input_win.refresh()
curses.curs_set(1)
max_length = 4 if "shortName" in prompt else None
user_input = ""
min_value = 0
max_value = 4294967295
min_length = 0
max_length = None
# Start user input after the prompt text
if selected_config is not None:
validation = get_validation_for(selected_config) or {}
min_value = validation.get("min_value", 0)
max_value = validation.get("max_value", 4294967295)
min_length = validation.get("min_length", 0)
max_length = validation.get("max_length")
user_input = ""
col_start = margin + len(prompt_text)
first_line_width = input_width - len(prompt_text) # Available space for first line
first_line_width = input_width - len(prompt_text)
while True:
key = input_win.get_wch() # Waits for user input
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw_input_win()
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
try:
key = input_win.get_wch()
except curses.error:
continue
if key == chr(27) or key == curses.KEY_LEFT:
input_win.erase()
input_win.refresh()
curses.curs_set(0)
return None # Exit without saving
menu_state.need_redraw = True
return None
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
break
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
menu_state.need_redraw = True
if not user_input.strip():
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
continue
length = len(user_input)
if min_length == max_length and max_length is not None:
if length != min_length:
invalid_input(
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
)
continue
else:
if length < min_length:
invalid_input(
input_win,
f"Value must be at least {min_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if max_length is not None and length > max_length:
invalid_input(
input_win,
f"Value must be no more than {max_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if input_type is int:
if not user_input.isdigit():
invalid_input(input_win, "Only numeric digits (09) allowed.", redraw_func=redraw_input_win)
continue
int_val = int(user_input)
if not (min_value <= int_val <= max_value):
invalid_input(
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
)
continue
curses.curs_set(0)
return int_val
elif input_type is float:
try:
float_val = float(user_input)
if not (min_value <= float_val <= max_value):
invalid_input(
input_win,
f"Enter a number between {min_value} and {max_value}.",
redraw_func=redraw_input_win,
)
continue
except ValueError:
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
continue
else:
curses.curs_set(0)
return float_val
else:
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
if user_input:
user_input = user_input[:-1] # Remove last character
elif max_length is None or len(user_input) < max_length: # Enforce max length
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
elif max_length is None or len(user_input) < max_length:
try:
char = chr(key) if not isinstance(key, str) else key
if input_type is int:
if char.isdigit() or (char == "-" and len(user_input) == 0):
user_input += char
elif input_type is float:
if (
char.isdigit()
or (char == "." and "." not in user_input)
or (char == "-" and len(user_input) == 0)
):
user_input += char
else:
user_input += char
except ValueError:
pass # Ignore invalid input
# First line must be manually handled before using wrap_text()
first_line = user_input[:first_line_width] # Cut to max first line width
@@ -77,9 +217,7 @@ def get_text_input(prompt: str) -> Optional[str]:
# Clear only the input area (without touching prompt text)
for i in range(max_input_rows):
if row + 1 + i < height - 1:
input_win.addstr(
row + 1 + i, margin, " " * min(input_width, width - margin - 1), get_color("settings_default")
)
input_win.addstr(row + 1 + i, margin, " " * input_width, get_color("settings_default"))
# Redraw the prompt text so it never disappears
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
@@ -95,10 +233,12 @@ def get_text_input(prompt: str) -> Optional[str]:
curses.curs_set(0)
input_win.erase()
input_win.refresh()
return user_input
return user_input.strip()
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
@@ -115,14 +255,15 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
cvalue = to_base64(current_value) # Convert current values to Base64
height = 9
width = 80
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
width = get_dialog_width()
start_y = max(0, (curses.LINES - height) // 2)
start_x = max(0, (curses.COLS - width) // 2)
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True) # Enable keypad for special keys
admin_key_win = curses.newwin(height, width, start_y, start_x)
admin_key_win.timeout(200)
admin_key_win.bkgd(get_color("background"))
admin_key_win.attrset(get_color("window_frame"))
admin_key_win.keypad(True) # Enable keypad for special keys
curses.echo()
curses.curs_set(1)
@@ -130,46 +271,48 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
# Editable list of values (max 3 values)
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
cursor_pos = 0 # Track which value is being edited
error_message = ""
invalid_input = ""
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
admin_key_win.erase()
admin_key_win.border()
admin_key_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(
admin_key_win.addstr(
3 + i, 2, f"{prefix}Admin Key {i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
)
repeated_win.addstr(3 + i, 18, line) # Align text for easier editing
admin_key_win.addstr(3 + i, 18, line) # Align text for easier editing
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
admin_key_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
if invalid_input:
admin_key_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
admin_key_win.refresh()
key = admin_key_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
admin_key_win.erase()
admin_key_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return None
elif key == ord("\n"): # Enter key to save and return
menu_state.need_redraw = True
if all(is_valid_base64(val) for val in user_values): # Ensure all values are valid Base64 and 32 bytes
curses.noecho()
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
@@ -180,200 +323,252 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
invalid_input = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
from contact.utilities.singleton import menu_state # Required if not already imported
def get_repeated_input(current_value: List[str]) -> Optional[str]:
height = 9
width = 80
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
width = get_dialog_width()
start_y = max(0, (curses.LINES - height) // 2)
start_x = max(0, (curses.COLS - width) // 2)
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.timeout(200)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True) # Enable keypad for special keys
repeated_win.keypad(True)
curses.echo()
curses.curs_set(1) # Show the cursor
curses.curs_set(1)
# Editable list of values (max 3 values)
user_values = current_value[:3]
cursor_pos = 0 # Track which value is being edited
error_message = ""
user_values = current_value[:3] + [""] * (3 - len(current_value)) # Always 3 fields
cursor_pos = 0
invalid_input = ""
while True:
def redraw():
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True))
# Display current values, allowing editing
win_h, win_w = repeated_win.getmaxyx()
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
prefix = "" if i == cursor_pos else " "
repeated_win.addstr(
3 + i, 2, f"{prefix}Value{i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
)
repeated_win.addstr(3 + i, 18, line)
repeated_win.addstr(3 + i, 18, line[: max(0, win_w - 20)]) # Prevent overflow
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
if invalid_input:
win_h, win_w = repeated_win.getmaxyx()
repeated_win.addstr(7, 2, invalid_input[: max(0, win_w - 4)], get_color("settings_default", bold=True))
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos]))
repeated_win.refresh()
key = repeated_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw()
redraw()
try:
key = repeated_win.get_wch()
except curses.error:
continue # ignore timeout or input issues
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return None
elif key == ord("\n"): # Enter key to save and return
elif key in ("\n", curses.KEY_ENTER):
curses.noecho()
curses.curs_set(0)
return ", ".join(user_values)
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
menu_state.need_redraw = True
return ", ".join(user_values).strip()
elif key == curses.KEY_UP:
cursor_pos = (cursor_pos - 1) % 3
elif key == curses.KEY_DOWN:
cursor_pos = (cursor_pos + 1) % 3
elif key in (curses.KEY_BACKSPACE, 127):
user_values[cursor_pos] = user_values[cursor_pos][:-1]
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
ch = chr(key) if isinstance(key, int) else key
if ch.isprintable():
user_values[cursor_pos] += ch
invalid_input = ""
except Exception:
pass
from contact.utilities.singleton import menu_state # Ensure this is imported
def get_fixed32_input(current_value: int) -> int:
cvalue = current_value
current_value = str(ipaddress.IPv4Address(current_value))
original_value = current_value
ip_string = str(ipaddress.IPv4Address(current_value))
height = 10
width = 80
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
width = get_dialog_width()
start_y = max(0, (curses.LINES - height) // 2)
start_x = max(0, (curses.COLS - width) // 2)
fixed32_win = curses.newwin(height, width, start_y, start_x)
fixed32_win.bkgd(get_color("background"))
fixed32_win.attrset(get_color("window_frame"))
fixed32_win.keypad(True)
fixed32_win.timeout(200)
curses.echo()
curses.curs_set(1)
user_input = ""
while True:
def redraw():
fixed32_win.erase()
fixed32_win.border()
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
fixed32_win.addstr(5, 2, f"New value: {user_input}")
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", get_color("settings_default", bold=True))
fixed32_win.addstr(3, 2, f"Current: {ip_string}", get_color("settings_default"))
fixed32_win.addstr(5, 2, f"New value: {user_input}", get_color("settings_default"))
fixed32_win.refresh()
key = fixed32_win.getch()
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow to cancel
redraw()
try:
key = fixed32_win.get_wch()
except curses.error:
continue # ignore timeout
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow to cancel
fixed32_win.erase()
fixed32_win.refresh()
curses.noecho()
curses.curs_set(0)
return cvalue # Return the current value unchanged
elif key == ord("\n"): # Enter key to validate and save
# Validate IP address
menu_state.need_redraw = True
return original_value
elif key in ("\n", curses.KEY_ENTER):
octets = user_input.split(".")
menu_state.need_redraw = True
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
curses.noecho()
curses.curs_set(0)
fixed32_address = ipaddress.ip_address(user_input)
return int(fixed32_address) # Return the valid IP address
return int(ipaddress.ip_address(user_input))
else:
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", get_color("settings_default", bold=True))
fixed32_win.refresh()
curses.napms(1500) # Wait for 1.5 seconds before refreshing
user_input = "" # Clear invalid input
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
curses.napms(1500)
user_input = ""
elif key in (curses.KEY_BACKSPACE, 127):
user_input = user_input[:-1]
else:
try:
char = chr(key)
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs
ch = chr(key) if isinstance(key, int) else key
if ch.isdigit() or ch == ".":
user_input += ch
except Exception:
pass # Ignore unprintable inputs
def get_list_input(prompt: str, current_option: Optional[str], list_options: List[str]) -> Optional[str]:
from typing import List, Optional # ensure Optional is imported
def get_list_input(
prompt: str, current_option: Optional[str], list_options: List[str], mandatory: bool = False
) -> Optional[str]:
"""
Displays a scrollable list of list_options for the user to choose from.
List selector.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
height = min(len(list_options) + 5, curses.LINES)
width = 80
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
width = get_dialog_width()
start_y = max(0, (curses.LINES - height) // 2)
start_x = max(0, (curses.COLS - width) // 2)
list_win = curses.newwin(height, width, start_y, start_x)
list_win.timeout(200)
list_win.bkgd(get_color("background"))
list_win.attrset(get_color("window_frame"))
list_win.keypad(True)
list_pad = curses.newpad(len(list_options) + 1, width - 8)
list_pad = curses.newpad(len(list_options) + 1, max(1, width - 8))
list_pad.bkgd(get_color("background"))
# Render header
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
max_index = len(list_options) - 1
visible_height = list_win.getmaxyx()[0] - 5
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False) # Initial call to draw arrows
def redraw_list_ui():
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
win_h, win_w = list_win.getmaxyx()
pad_w = max(1, win_w - 8)
for idx, item in enumerate(list_options):
color = get_color("settings_default", reverse=(idx == selected_index))
list_pad.addstr(idx, 0, item[:pad_w].ljust(pad_w), color)
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
# Recompute visible height each draw in case of resize
vis_h = list_win.getmaxyx()[0] - 5
draw_arrows(list_win, vis_h, max_index, [0], show_save_option=False)
# Initial draw
redraw_list_ui()
while True:
key = list_win.getch()
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw_list_ui()
try:
key = list_win.getch()
except curses.error:
continue
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = min(len(list_options) - 1, selected_index + 1)
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
elif key == ord("\n"): # Enter key
elif key == ord("\n"): # Enter
list_win.clear()
list_win.refresh()
menu_state.need_redraw = True
return list_options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left
if mandatory:
continue
list_win.clear()
list_win.refresh()
menu_state.need_redraw = True
return current_option

View File

@@ -1,5 +1,6 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState, MenuState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()
menu_state = MenuState()

View File

@@ -0,0 +1,90 @@
import datetime
sensors = {
'temperature': {'icon':'🌡️ ','unit':'°'},
'relative_humidity': {'icon':'💧','unit':'%'},
'barometric_pressure': {'icon':'','unit': 'hPa'},
'lux': {'icon':'🔦 ','unit': 'lx'},
'uv_lux': {'icon':'uv🔦 ','unit': 'lx'},
'wind_speed': {'icon':'💨 ','unit': 'm/s'},
'wind_direction': {'icon':'','unit': ''},
'battery_level': {'icon':'🔋 ', 'unit':'%'},
'voltage': {'icon':'', 'unit':'V'},
'channel_utilization': {'icon':'ChUtil:', 'unit':'%'},
'air_util_tx': {'icon':'AirUtil:', 'unit':'%'},
'uptime_seconds': {'icon':'🆙 ', 'unit':'h'},
'latitude_i': {'icon':'🌍 ', 'unit':''},
'longitude_i': {'icon':'', 'unit':''},
'altitude': {'icon':'⬆️ ', 'unit':'m'},
'time': {'icon':'🕔 ', 'unit':''}
}
def humanize_wind_direction(degrees):
""" Convert degrees to Eest-West-Nnoth-Ssouth directions """
if not 0 <= degrees <= 360:
return None
directions = [
("N", 337.5, 22.5),
("NE", 22.5, 67.5),
("E", 67.5, 112.5),
("SE", 112.5, 157.5),
("S", 157.5, 202.5),
("SW", 202.5, 247.5),
("W", 247.5, 292.5),
("NW", 292.5, 337.5),
]
if degrees >= directions[0][1] or degrees < directions[0][2]:
return directions[0][0]
# Check for all other directions
for direction, lower_bound, upper_bound in directions[1:]:
if lower_bound <= degrees < upper_bound:
return direction
# This part should ideally not be reached with valid input
return None
def get_chunks(data):
""" Breakdown telemetry data and assign emojis for more visual appeal of the payloads """
reading = data.split('\n')
# remove empty list lefover from the split
reading = list(filter(None, reading))
parsed=""
for item in reading:
key, value = item.split(":")
# If value is float, round it to the 1 digit after point
# else make it int
if "." in value:
value = round(float(value.strip()),1)
else:
try:
value = int(value.strip())
except Exception:
# Leave it string as last resort
value = value
match key:
# convert seconds to hours, for our sanity
case "uptime_seconds":
value = round(value / 60 / 60, 1)
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate
case "longitude_i" | "latitude_i":
value = round(value * 1e-7, 6)
# Convert wind direction from degrees to abbreviation
case "wind_direction":
value = humanize_wind_direction(value)
case "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "
else:
# just pass through if we haven't added the particular telemetry key:value to the sensor dict
parsed+=f"{key}:{value} "
return parsed

View File

@@ -1,8 +1,13 @@
import datetime
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
import time
from typing import Optional, Union
from google.protobuf.message import DecodeError
from meshtastic import protocols
from meshtastic.protobuf import config_pb2, mesh_pb2, portnums_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
import contact.utilities.telemetry_beautifier as tb
def get_channels():
@@ -134,3 +139,81 @@ def get_time_ago(timestamp):
if unit != "s":
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ts_str = time.strftime("[%H:%M:%S] ")
ui_state.all_messages[channel_id].append((f"{ts_str}{prefix}", message))
def parse_protobuf(packet: dict) -> Union[str, dict]:
"""Attempt to parse a decoded payload using the registered protobuf handler."""
try:
decoded = packet.get("decoded") or {}
portnum = decoded.get("portnum")
payload = decoded.get("payload")
if isinstance(payload, str):
return payload
# These portnumbers carry information visible elswhere in the app, so we just note them in the logs
if portnum == "TEXT_MESSAGE_APP":
return "✉️"
elif portnum == "NODEINFO_APP":
return "Name identification payload"
elif portnum == "TRACEROUTE_APP":
return "Traceroute payload"
handler = protocols.get(portnums_pb2.PortNum.Value(portnum)) if portnum is not None else None
if handler is not None and handler.protobufFactory is not None:
try:
pb = handler.protobufFactory()
pb.ParseFromString(bytes(payload))
# If we have position payload
if portnum == "POSITION_APP":
return tb.get_chunks(str(pb))
# Part of TELEMETRY_APP portnum
if hasattr(pb, "device_metrics") and pb.HasField("device_metrics"):
return tb.get_chunks(str(pb.device_metrics))
# Part of TELEMETRY_APP portnum
if hasattr(pb, "environment_metrics") and pb.HasField("environment_metrics"):
return tb.get_chunks(str(pb.environment_metrics))
# For other data, without implemented beautification, fallback to just printing the object
return str(pb).replace("\n", " ").replace("\r", " ").strip()
except DecodeError:
return payload
# return payload
except Exception:
return payload

View File

@@ -0,0 +1,23 @@
validation_rules = {
"shortName": {"max_length": 4},
"longName": {"max_length": 32},
"fixed_pin": {"min_length": 6, "max_length": 6},
"position_flags": {"max_length": 3},
"enabled_protocols": {"max_value": 2},
"hop_limit": {"max_value": 7},
"latitude": {"min_value": -90, "max_value": 90},
"longitude": {"min_value": -180, "max_value": 180},
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
"red": {"max_value": 255},
"green": {"max_value": 255},
"blue": {"max_value": 255},
"current": {"max_value": 255},
"position_precision": {"max_value": 32},
}
def get_validation_for(key: str) -> dict:
for rule_key, config in validation_rules.items():
if rule_key in key:
return config
return {}

View File

@@ -1,15 +1,15 @@
[project]
name = "contact"
version = "1.3.13"
version = "1.4.10"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
]
license = "GPL-3.0-only"
readme = "README.md"
requires-python = ">=3.9,<3.14"
requires-python = ">=3.9,<3.15"
dependencies = [
"meshtastic (>=2.6.0,<3.0.0)"
"meshtastic (>=2.7.5,<3.0.0)"
]
[project.urls]