Compare commits

..

27 Commits

Author SHA1 Message Date
pdxlocations
8e31e39266 current state 2025-04-16 12:17:53 -07:00
pdxlocations
e3f85ffaf2 changes 2025-04-16 12:00:59 -07:00
pdxlocations
b59ae94e00 bump version 2025-04-16 11:30:30 -07:00
pdxlocations
d0ada7eb5b add utf-8 read (#166) 2025-04-13 15:17:36 -07:00
pdxlocations
8d9bbac0be Merge pull request #165 from pdxlocations/type-annotations 2025-04-13 15:02:54 -07:00
Ben Lipsey
613eeb4fab working changes 2025-04-13 14:58:31 -07:00
Ben Lipsey
f7b2645dcb working changes 2025-04-13 14:49:00 -07:00
Ben Lipsey
bc5a5951d4 current state 2025-04-13 14:18:52 -07:00
Ben Lipsey
d7eec6de6e current state 2025-04-12 21:53:27 -07:00
Ben Lipsey
8779297424 current state 2025-04-12 21:19:44 -07:00
Ben Lipsey
ccc1399644 current state 2025-04-12 09:46:00 -07:00
Ben Lipsey
f52034e61f current state 2025-04-11 22:22:34 -07:00
pdxlocations
cdd1d89062 fix typo 2025-04-08 21:17:01 -07:00
pdxlocations
c3ff85a646 bump version 2025-04-08 21:14:58 -07:00
pdxlocations
9b8cf19a0c Fix localhost fallback and allow tcp port (#164)
* update interfaces.py

* fix logging

* update

* update

* fix devpath

* fix returns

* changes
2025-04-08 21:05:32 -07:00
pdxlocations
f2e671da7f Update README.md 2025-04-08 18:10:41 -07:00
pdxlocations
3fc1293db1 Update README.md 2025-04-08 17:58:15 -07:00
pdxlocations
4abe9611e3 bump version 2025-04-06 22:03:05 -07:00
pdxlocations
4d20df17fe Update README.md 2025-04-06 21:59:01 -07:00
pdxlocations
3bb57b9420 Merge pull request #163 from pdxlocations:localhost-fallback
Fallback to localhost not meshtastic.local
2025-04-06 21:44:32 -07:00
pdxlocations
e305bb4464 use localhost not meshtastic.local 2025-04-06 21:44:04 -07:00
pdxlocations
636b27cf9b fix typo 2025-04-06 21:28:07 -07:00
pdxlocations
8e500cb305 bump version 2025-04-06 20:19:44 -07:00
pdxlocations
0878937194 correct instructions for launching control 2025-04-06 20:17:03 -07:00
pdxlocations
ac2016322b update en.ini 2025-04-05 22:35:35 -07:00
pdxlocations
031d74a290 Fix options "not set" displaying values 2025-04-05 22:20:19 -07:00
pdxlocations
14913ce5ae fix new_idx 2025-04-05 21:22:19 -07:00
27 changed files with 1339 additions and 645 deletions

6
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,6 @@
{
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
}
}

View File

@@ -1,17 +1,21 @@
## Contact - A Console UI for Meshtastic
### (Formerly Curses Client for Meshtastic)
#### Powered by Meshtastic.org
### Install with:
```bash
pip install contact
```
This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores.
<img width="846" alt="Contact - Main UI Screenshot" src="https://github.com/user-attachments/assets/d2996bfb-2c6d-46a8-b820-92a9143375f4">
<br><br>
The settings dialogue can be accessed within the client or may be run standalone to configure your node by launching `settings.py`
The settings dialogue can be accessed within the client or may be run standalone to configure your node by launching `contact --settings` or `contact -c`
<img width="441" alt="Contact - Settings Dialogue" src="https://github.com/user-attachments/assets/dd47f52a-d4d8-4e40-8001-9ea53d87f816" />
<img width="696" alt="Screenshot 2025-04-08 at 6 10 06PM" src="https://github.com/user-attachments/assets/3d5e3964-f009-4772-bd6e-91b907c65a3b" />
## Message Persistence
@@ -62,4 +66,4 @@ contact --ble BlAddressOfDevice
To quickly connect to localhost, use:
```sh
contact -t
```
```

View File

@@ -1,118 +1,133 @@
#!/usr/bin/env python3
'''
"""
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
Powered by Meshtastic.org
Meshtastic® is a registered trademark of Meshtastic LLC. Meshtastic software components are released under various licenses, see GitHub for details. No warranty is provided - use at your own risk.
'''
Meshtastic® is a registered trademark of Meshtastic LLC.
Meshtastic software components are released under various licenses—see GitHub for details.
No warranty is provided. Use at your own risk.
"""
# Standard library
import contextlib
import curses
import os
from pubsub import pub
import sys
import io
import logging
import os
import subprocess
import traceback
import sys
import threading
import traceback
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
# Third-party
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
from contact.ui.contact_ui import main_ui
from contact.ui.colors import setup_colors
from contact.ui.contact_ui import main_ui
from contact.ui.splash import draw_splash
import contact.ui.default_config as config
from contact.utilities.arg_parser import setup_parser
from contact.utilities.interfaces import initialize_interface
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.utils import get_channels, get_node_list, get_nodeNum
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
import contact.globals as globals
from contact.ui.ui_state import NodeState
node_state = NodeState()
# ------------------------------------------------------------------------------
# Environment & Logging Setup
# ------------------------------------------------------------------------------
# Set ncurses compatibility settings
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
os.environ["LANG"] = "C.UTF-8"
os.environ.setdefault("TERM", "xterm-256color")
if os.environ.get("COLORTERM") == "gnome-terminal":
os.environ["TERM"] = "xterm-256color"
# Configure logging
# Run `tail -f client.log` in another terminal to view live
logging.basicConfig(
filename=config.log_file_path,
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
def main(stdscr):
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
# Check if --settings was passed and run settings.py as a subprocess
args = setup_parser().parse_args()
if getattr(args, 'settings', False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface %s", args)
logging.info("Initializing interface...")
with globals.lock:
node_state.interface = initialize_interface(args)
if node_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(node_state)
node_state.interface.close()
node_state.interface = initialize_interface(args)
logging.info("Interface initialized")
get_nodeNum(node_state)
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb(node_state)
load_messages_from_db(node_state)
initialize_globals(args)
logging.info("Starting main UI")
main_ui(stdscr, node_state)
main_ui(stdscr)
except Exception as e:
console_output = output_capture.getvalue()
logging.error("An error occurred: %s", e)
logging.error("Uncaught exception: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output before crash:\n%s", console_output)
raise # Re-raise only unexpected errors
logging.error("Console output:\n%s", console_output)
raise
def start():
log_file = config.log_file_path
log_f = open(log_file, "a", buffering=1) # Enable line-buffering for immediate log writes
sys.stdout = log_f
sys.stderr = log_f
def start() -> None:
"""Launch curses wrapper and redirect logs to file."""
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
with contextlib.redirect_stderr(log_f), contextlib.redirect_stdout(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C or Ctrl+X") # Clean exit logging
sys.exit(0) # Ensure a clean exit
except Exception as e:
logging.error("Fatal error in curses wrapper: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1) # Exit with an error code
if __name__ == "__main__":
start()

View File

@@ -1,4 +1,4 @@
# interface = None
interface = None
lock = None
display_log = False
all_messages = {}
@@ -6,7 +6,7 @@ channel_list = []
notifications = []
packet_buffer = []
node_list = []
# myNodeNum = 0
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0

View File

@@ -78,6 +78,13 @@ dns, "IPv4 DNS server", ""
rsyslog_server, "RSyslog server", ""
enabled_protocols, "Enabled protocols", ""
[config.network.ipv4_config]
title, "IPv4 Config", ""
ip, "IP", ""
gateway, "Gateway", ""
subnet, "Subnet", ""
dns, "DNS", ""
[config.display]
title, "Display"
screen_on_secs, "Screen on duration", "How long the screen remains on in seconds after the user button is pressed or messages are received."
@@ -105,6 +112,35 @@ theme, "Theme", ""
alert_enabled, "Alert enabled", ""
banner_enabled, "Banner enabled", ""
ring_tone_id, "Ring tone ID", ""
language, "Language", ""
node_filter, "Node Filter", ""
node_highlight, "Node Highlight", ""
calibration_data, "Calibration Data", ""
map_data, "Map Data", ""
[config.device_ui.node_filter]
title, "Node Filter"
unknown_switch, "Unknown Switch", ""
offline_switch, "Offline Switch", ""
public_key_switch, "Public Key Switch", ""
hops_away, "Hops Away", ""
position_switch, "Position Switch", ""
node_name, "Node Name", ""
channel, "Channel", ""
[config.device_ui.node_highlight]
title, "Node Highlight"
chat_switch, "Chat Switch", ""
position_switch, "Position Switch", ""
telemetry_switch, "Telemetry Switch", ""
iaq_switch, "IAQ Switch", ""
node_name, "Node Name", ""
[config.device_ui.map_data]
title, "Map Data"
home, "Home", ""
style, "Style", ""
follow_gps, "Follow GPS", ""
[config.lora]
title, "LoRa"

View File

@@ -1,17 +1,34 @@
import logging
import time
from contact.utilities.utils import refresh_node_list
from datetime import datetime
from contact.ui.contact_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from contact.utilities.db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database, update_node_info_in_db
from typing import Any
from contact.utilities.utils import refresh_node_list
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
from datetime import datetime
def on_receive(packet, node_state):
def on_receive(packet: dict[str, Any], interface: Any) -> None:
"""
Handles an incoming packet from a Meshtastic interface.
Args:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
# Update packet log
globals.packet_buffer.append(packet)
@@ -20,7 +37,7 @@ def on_receive(packet, node_state):
globals.packet_buffer = globals.packet_buffer[-20:]
if globals.display_log:
draw_packetlog_win(node_state)
draw_packetlog_win()
try:
if 'decoded' not in packet:
return
@@ -32,7 +49,7 @@ def on_receive(packet, node_state):
if packet['decoded']['portnum'] == 'NODEINFO_APP':
if "user" in packet['decoded'] and "longName" in packet['decoded']["user"]:
maybe_store_nodeinfo_in_db(packet, node_state)
maybe_store_nodeinfo_in_db(packet)
elif packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
message_bytes = packet['decoded']['payload']
@@ -66,7 +83,7 @@ def on_receive(packet, node_state):
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = get_name_from_database(message_from_id, node_state, type='short') + ":"
message_from_string = get_name_from_database(message_from_id, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
@@ -95,9 +112,9 @@ def on_receive(packet, node_state):
globals.all_messages[globals.channel_list[channel_number]].append((f"{config.message_prefix} {message_from_string} ", message_string))
if refresh_channels:
draw_channel_list(node_state)
draw_channel_list()
if refresh_messages:
draw_messages_window(node_state, True)
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)

View File

@@ -1,17 +1,29 @@
from datetime import datetime
from typing import Any
import google.protobuf.json_format
from meshtastic import BROADCAST_NUM
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from contact.utilities.db_handler import save_message_to_db, update_ack_nak, get_name_from_database, is_chat_archived, update_node_info_in_db
from contact.utilities.db_handler import (
save_message_to_db,
update_ack_nak,
get_name_from_database,
is_chat_archived,
update_node_info_in_db,
)
import contact.ui.default_config as config
import contact.globals as globals
ack_naks = {}
ack_naks: dict[str, dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
# Note "onAckNak" has special meaning to the API, thus the nonstandard naming convention
# See https://github.com/meshtastic/python/blob/master/meshtastic/mesh_interface.py#L462
def onAckNak(packet, node_state):
def onAckNak(packet: dict[str, Any]) -> None:
"""
Handles incoming ACK/NAK response packets.
"""
from contact.ui.contact_ui import draw_messages_window
request = packet['decoded']['requestId']
if(request not in ack_naks):
@@ -35,14 +47,16 @@ def onAckNak(packet, node_state):
globals.all_messages[acknak['channel']][acknak['messageIndex']] = (config.sent_message_prefix + confirm_string + ": ", message)
update_ack_nak(acknak['channel'], acknak['timestamp'], message, ack_type, node_state)
update_ack_nak(acknak['channel'], acknak['timestamp'], message, ack_type)
channel_number = globals.channel_list.index(acknak['channel'])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
draw_messages_window(node_state)
draw_messages_window()
def on_response_traceroute(packet, node_state):
"""on response for trace route"""
def on_response_traceroute(packet: dict[str, Any]) -> None:
"""
Handle traceroute response packets and render the route visually in the UI.
"""
from contact.ui.contact_ui import draw_channel_list, draw_messages_window, add_notification
refresh_channels = False
@@ -56,18 +70,18 @@ def on_response_traceroute(packet, node_state):
msg_str = "Traceroute to:\n"
route_str = get_name_from_database(packet["to"], node_state, 'short') or f"{packet['to']:08x}" # Start with destination of response
route_str = get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_database(node_num, node_state, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_database(packet["from"], node_state, 'short') or f"{packet['from']:08x}") \
route_str += " --> " + (get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
@@ -77,15 +91,15 @@ def on_response_traceroute(packet, node_state):
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_database(packet["from"], node_state, 'short') or f"{packet['from']:08x}" # Start with origin of response
route_str = get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_database(node_num, node_state, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_database(packet["to"], node_state, 'short') or f"{packet['to']:08x}") \
route_str += " --> " + (get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
@@ -94,7 +108,7 @@ def on_response_traceroute(packet, node_state):
globals.channel_list.append(packet['from'])
refresh_channels = True
if(is_chat_archived(packet['from']), node_state):
if(is_chat_archived(packet['from'])):
update_node_info_in_db(packet['from'], chat_archived=False)
channel_number = globals.channel_list.index(packet['from'])
@@ -105,21 +119,24 @@ def on_response_traceroute(packet, node_state):
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_database(packet['from'], node_state, type='short') + ":\n"
message_from_string = get_name_from_database(packet['from'], type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{config.message_prefix} {message_from_string}", msg_str))
if refresh_channels:
draw_channel_list(node_state)
draw_channel_list()
if refresh_messages:
draw_messages_window(node_state, True)
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet['from'], msg_str)
def send_message(message, node_state, destination=BROADCAST_NUM, channel=0):
myid = node_state.myNodeNum
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
if isinstance(channel_id, int):
@@ -128,7 +145,7 @@ def send_message(message, node_state, destination=BROADCAST_NUM, channel=0):
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = node_state.interface.sendText(
sent_message_data = globals.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -168,9 +185,12 @@ def send_message(message, node_state, destination=BROADCAST_NUM, channel=0):
ack_naks[sent_message_data.id] = {'channel': channel_id, 'messageIndex': len(globals.all_messages[channel_id]) - 1, 'timestamp': timestamp}
def send_traceroute(node_state):
def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
r = mesh_pb2.RouteDiscovery()
node_state.interface.sendData(
globals.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,

View File

@@ -14,7 +14,7 @@ from contact.utilities.arg_parser import setup_parser
from contact.utilities.interfaces import initialize_interface
def main(stdscr):
def main(stdscr: curses.window) -> None:
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
@@ -25,17 +25,17 @@ def main(stdscr):
parser = setup_parser()
args = parser.parse_args()
node_state.interface = initialize_interface(args)
interface = initialize_interface(args)
if node_state.interface.localNode.localConfig.lora.region == 0:
if interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(node_state.interface)
node_state.interface.close()
node_state.interface = initialize_interface(args)
set_region(interface)
interface.close()
interface = initialize_interface(args)
stdscr.clear()
stdscr.refresh()
settings_menu(stdscr, node_state)
settings_menu(stdscr, interface)
except Exception as e:
console_output = output_capture.getvalue()
@@ -52,9 +52,6 @@ logging.basicConfig( # Run `tail -f client.log` in another terminal to view live
)
if __name__ == "__main__":
from contact.ui.ui_state import NodeState
node_state = NodeState()
log_file = config.log_file_path
log_f = open(log_file, "a", buffering=1) # Enable line-buffering for immediate log writes

View File

@@ -12,7 +12,7 @@ COLOR_MAP = {
"white": curses.COLOR_WHITE
}
def setup_colors(reinit=False):
def setup_colors(reinit: bool = False) -> None:
"""
Initialize curses color pairs based on the COLOR_CONFIG.
"""
@@ -29,7 +29,7 @@ def setup_colors(reinit=False):
print()
def get_color(category, bold=False, reverse=False, underline=False):
def get_color(category: str, bold: bool = False, reverse: bool = False, underline: bool = False) -> int:
"""
Retrieve a curses color pair with optional attributes.
"""

View File

@@ -13,7 +13,7 @@ import contact.ui.default_config as config
import contact.ui.dialog
import contact.globals as globals
def handle_resize(stdscr, firstrun, node_state):
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
# Calculate window max dimensions
@@ -80,17 +80,17 @@ def handle_resize(stdscr, firstrun, node_state):
curses.curs_set(1)
try:
draw_function_win(node_state)
draw_channel_list(node_state)
draw_messages_window(node_state, True)
draw_node_list(node_state)
draw_function_win()
draw_channel_list()
draw_messages_window(True)
draw_node_list()
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
# In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later.
pass
def main_ui(stdscr, node_state):
def main_ui(stdscr: curses.window) -> None:
global input_text
input_text = ""
stdscr.keypad(True)
@@ -111,7 +111,7 @@ def main_ui(stdscr, node_state):
elif globals.current_window == 1:
scroll_messages(-1)
elif globals.current_window == 2:
scroll_nodes(-1, node_state)
scroll_nodes(-1)
elif char == curses.KEY_DOWN:
if globals.current_window == 0:
@@ -119,11 +119,11 @@ def main_ui(stdscr, node_state):
elif globals.current_window == 1:
scroll_messages(1)
elif globals.current_window == 2:
scroll_nodes(1, node_state)
scroll_nodes(1)
elif char == curses.KEY_HOME:
if globals.current_window == 0:
select_channel(0, node_state)
select_channel(0)
elif globals.current_window == 1:
globals.selected_message = 0
refresh_pad(1)
@@ -132,7 +132,7 @@ def main_ui(stdscr, node_state):
elif char == curses.KEY_END:
if globals.current_window == 0:
select_channel(len(globals.channel_list) - 1, node_state)
select_channel(len(globals.channel_list) - 1)
elif globals.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0)
@@ -142,7 +142,7 @@ def main_ui(stdscr, node_state):
elif char == curses.KEY_PPAGE:
if globals.current_window == 0:
select_channel(globals.selected_channel - (channel_win.getmaxyx()[0] - 2), node_state) # select_channel will bounds check for us
select_channel(globals.selected_channel - (channel_win.getmaxyx()[0] - 2)) # select_channel will bounds check for us
elif globals.current_window == 1:
globals.selected_message = max(globals.selected_message - get_msg_window_lines(), 0)
refresh_pad(1)
@@ -151,7 +151,7 @@ def main_ui(stdscr, node_state):
elif char == curses.KEY_NPAGE:
if globals.current_window == 0:
select_channel(globals.selected_channel + (channel_win.getmaxyx()[0] - 2), node_state) # select_channel will bounds check for us
select_channel(globals.selected_channel + (channel_win.getmaxyx()[0] - 2)) # select_channel will bounds check for us
elif globals.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = min(globals.selected_message + get_msg_window_lines(), msg_line_count - get_msg_window_lines())
@@ -169,7 +169,7 @@ def main_ui(stdscr, node_state):
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
highlight_line(False, 0, globals.selected_channel, node_state)
highlight_line(False, 0, globals.selected_channel)
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
@@ -177,11 +177,11 @@ def main_ui(stdscr, node_state):
messages_win.refresh()
refresh_pad(1)
elif old_window == 2:
draw_function_win(node_state)
draw_function_win()
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
highlight_line(False, 2, globals.selected_node, node_state)
highlight_line(False, 2, globals.selected_node)
refresh_pad(2)
if globals.current_window == 0:
@@ -189,7 +189,7 @@ def main_ui(stdscr, node_state):
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
highlight_line(True, 0, globals.selected_channel, node_state)
highlight_line(True, 0, globals.selected_channel)
refresh_pad(0)
elif globals.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
@@ -198,12 +198,12 @@ def main_ui(stdscr, node_state):
messages_win.refresh()
refresh_pad(1)
elif globals.current_window == 2:
draw_function_win(node_state)
draw_function_win()
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
highlight_line(True, 2, globals.selected_node, node_state)
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
# Check for Esc
@@ -212,7 +212,7 @@ def main_ui(stdscr, node_state):
# Check for Ctrl + t
elif char == chr(20):
send_traceroute(node_state)
send_traceroute()
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
@@ -229,20 +229,20 @@ def main_ui(stdscr, node_state):
globals.selected_channel = globals.channel_list.index(node_list[globals.selected_node])
if(is_chat_archived(globals.channel_list[globals.selected_channel], node_state)):
if(is_chat_archived(globals.channel_list[globals.selected_channel])):
update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=False)
globals.selected_node = 0
globals.current_window = 0
draw_node_list(node_state)
draw_channel_list(node_state)
draw_messages_window(node_state, True)
draw_node_list()
draw_channel_list()
draw_messages_window(True)
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, node_state, channel=globals.selected_channel)
draw_messages_window(node_state, True)
send_message(input_text, channel=globals.selected_channel)
draw_messages_window(True)
# Clear entry window and reset input text
input_text = ""
@@ -259,7 +259,7 @@ def main_ui(stdscr, node_state):
elif char == "`": # ` Launch the settings interface
curses.curs_set(0)
settings_menu(stdscr, node_state.interface)
settings_menu(stdscr, globals.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
@@ -268,11 +268,11 @@ def main_ui(stdscr, node_state):
# Display packet log
if globals.display_log is False:
globals.display_log = True
draw_messages_window(node_state, True)
draw_messages_window(True)
else:
globals.display_log = False
packetlog_win.erase()
draw_messages_window(node_state, True)
draw_messages_window(True)
elif char == curses.KEY_RESIZE:
input_text = ""
@@ -291,29 +291,29 @@ def main_ui(stdscr, node_state):
del globals.channel_list[globals.selected_channel]
globals.selected_channel = min(globals.selected_channel, len(globals.channel_list) - 1)
select_channel(globals.selected_channel, node_state)
draw_channel_list(node_state)
draw_messages_window(node_state)
select_channel(globals.selected_channel)
draw_channel_list()
draw_messages_window()
if(globals.current_window == 2):
curses.curs_set(0)
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node], node_state)} from nodedb?", "No", ["Yes", "No"])
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from nodedb?", "No", ["Yes", "No"])
if confirmation == "Yes":
node_state.interface.localNode.removeNode(globals.node_list[globals.selected_node])
globals.interface.localNode.removeNode(globals.node_list[globals.selected_node])
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del(node_state.interface.nodesByNum[globals.node_list[globals.selected_node]])
del(globals.interface.nodesByNum[globals.node_list[globals.selected_node]])
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(globals.node_list[globals.selected_node])[2:]}"
del(node_state.interface.nodes[hexid])
del(globals.interface.nodes[hexid])
globals.node_list.pop(globals.selected_node)
draw_messages_window(node_state)
draw_node_list(node_state)
draw_messages_window()
draw_node_list()
else:
draw_messages_window(node_state)
draw_messages_window()
curses.curs_set(1)
continue
@@ -325,25 +325,25 @@ def main_ui(stdscr, node_state):
# ^F
elif char == chr(6):
if globals.current_window == 2:
selectedNode = node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
curses.curs_set(0)
if 'isFavorite' not in selectedNode or selectedNode['isFavorite'] == False:
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", node_state, None, ["Yes", "No"])
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", None, ["Yes", "No"])
if confirmation == "Yes":
node_state.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
globals.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = True
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = True
refresh_node_list()
else:
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", node_state, None, ["Yes", "No"])
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", None, ["Yes", "No"])
if confirmation == "Yes":
node_state.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
globals.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = False
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = False
refresh_node_list()
@@ -351,20 +351,20 @@ def main_ui(stdscr, node_state):
elif char == chr(7):
if globals.current_window == 2:
selectedNode = node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
curses.curs_set(0)
if 'isIgnored' not in selectedNode or selectedNode['isIgnored'] == False:
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", node_state, "No", ["Yes", "No"])
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", "No", ["Yes", "No"])
if confirmation == "Yes":
node_state.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = True
globals.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = True
else:
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", node_state, "No", ["Yes", "No"])
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", "No", ["Yes", "No"])
if confirmation == "Yes":
node_state.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = False
globals.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = False
handle_resize(stdscr, False)
@@ -377,7 +377,7 @@ def main_ui(stdscr, node_state):
def draw_channel_list(node_state):
def draw_channel_list() -> None:
channel_pad.erase()
win_height, win_width = channel_win.getmaxyx()
start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders
@@ -388,9 +388,9 @@ def draw_channel_list(node_state):
for channel in globals.channel_list:
# Convert node number to long name if it's an integer
if isinstance(channel, int):
if is_chat_archived(channel, node_state):
if is_chat_archived(channel):
continue
channel_name = get_name_from_database(channel, node_state, type='long')
channel_name = get_name_from_database(channel, type='long')
if channel_name is None:
continue
channel = channel_name
@@ -418,7 +418,7 @@ def draw_channel_list(node_state):
refresh_pad(0)
def draw_messages_window(node_state, scroll_to_bottom = False):
def draw_messages_window(scroll_to_bottom: bool = False) -> None:
"""Update the messages window based on the selected channel and scroll position."""
messages_pad.erase()
@@ -459,9 +459,9 @@ def draw_messages_window(node_state, scroll_to_bottom = False):
refresh_pad(1)
draw_packetlog_win(node_state)
draw_packetlog_win()
def draw_node_list(node_state):
def draw_node_list() -> None:
global nodes_pad
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
@@ -479,9 +479,9 @@ def draw_node_list(node_state):
logging.error("Traceback: %s", traceback.format_exc())
for i, node_num in enumerate(globals.node_list):
node = node_state.interface.nodesByNum[node_num]
node = globals.interface.nodesByNum[node_num]
secure = 'user' in node and 'publicKey' in node['user'] and node['user']['publicKey']
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, node_state, 'long')}".ljust(box_width - 2)[:box_width - 2]
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[:box_width - 2]
color = "node_list"
if 'isFavorite' in node and node['isFavorite']:
color = "node_favorite"
@@ -501,21 +501,21 @@ def draw_node_list(node_state):
curses.curs_set(1)
entry_win.refresh()
def select_channel(idx, node_state):
def select_channel(idx: int) -> None:
old_selected_channel = globals.selected_channel
globals.selected_channel = max(0, min(idx, len(globals.channel_list) - 1))
draw_messages_window(node_state, True)
draw_messages_window(True)
# For now just re-draw channel list when clearing notifications, we can probably make this more efficient
if globals.selected_channel in globals.notifications:
remove_notification(globals.selected_channel)
draw_channel_list(node_state)
draw_channel_list()
return
highlight_line(False, 0, old_selected_channel, node_state)
highlight_line(True, 0, globals.selected_channel, node_state)
highlight_line(False, 0, old_selected_channel)
highlight_line(True, 0, globals.selected_channel)
refresh_pad(0)
def scroll_channels(direction, node_state):
def scroll_channels(direction: int) -> None:
new_selected_channel = globals.selected_channel + direction
if new_selected_channel < 0:
@@ -523,9 +523,9 @@ def scroll_channels(direction, node_state):
elif new_selected_channel >= len(globals.channel_list):
new_selected_channel = 0
select_channel(new_selected_channel, node_state)
select_channel(new_selected_channel)
def scroll_messages(direction):
def scroll_messages(direction: int) -> None:
globals.selected_message += direction
msg_line_count = messages_pad.getmaxyx()[0]
@@ -533,17 +533,17 @@ def scroll_messages(direction):
refresh_pad(1)
def select_node(idx, node_state):
def select_node(idx: int) -> None:
old_selected_node = globals.selected_node
globals.selected_node = max(0, min(idx, len(globals.node_list) - 1))
highlight_line(False, 2, old_selected_node, node_state)
highlight_line(True, 2, globals.selected_node, node_state)
highlight_line(False, 2, old_selected_node)
highlight_line(True, 2, globals.selected_node)
refresh_pad(2)
draw_function_win(node_state)
draw_function_win()
def scroll_nodes(direction, node_state):
def scroll_nodes(direction: int) -> None:
new_selected_node = globals.selected_node + direction
if new_selected_node < 0:
@@ -551,9 +551,9 @@ def scroll_nodes(direction, node_state):
elif new_selected_node >= len(globals.node_list):
new_selected_node = 0
select_node(new_selected_node, node_state)
select_node(new_selected_node)
def draw_packetlog_win(node_state):
def draw_packetlog_win() -> None:
columns = [10,10,15,30]
span = 0
@@ -574,10 +574,10 @@ def draw_packetlog_win(node_state):
break
# Format each field
from_id = get_name_from_database(packet['from'], node_state, 'short').ljust(columns[0])
from_id = get_name_from_database(packet['from'], 'short').ljust(columns[0])
to_id = (
"BROADCAST".ljust(columns[1]) if str(packet['to']) == "4294967295"
else get_name_from_database(packet['to'], node_state, 'short').ljust(columns[1])
else get_name_from_database(packet['to'], 'short').ljust(columns[1])
)
if 'decoded' in packet:
port = packet['decoded']['portnum'].ljust(columns[2])
@@ -602,7 +602,7 @@ def draw_packetlog_win(node_state):
curses.curs_set(1)
entry_win.refresh()
def search(win):
def search(win: int) -> None:
start_idx = globals.selected_node
select_func = select_node
@@ -645,10 +645,10 @@ def search(win):
entry_win.erase()
def draw_node_details(node_state):
def draw_node_details() -> None:
node = None
try:
node = node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]
node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
except KeyError:
return
@@ -693,7 +693,7 @@ def draw_node_details(node_state):
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_help():
def draw_help() -> None:
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat", " ^f = Favorite", " ^g = Ignore"]
function_str = ""
for s in cmds:
@@ -702,19 +702,18 @@ def draw_help():
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
def draw_function_win(node_state):
def draw_function_win() -> None:
if(globals.current_window == 2):
draw_node_details(node_state)
draw_node_details()
else:
draw_help()
def get_msg_window_lines():
def get_msg_window_lines() -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height
def refresh_pad(window):
# global messages_pad, nodes_pad, channel_pad
def refresh_pad(window: int) -> None:
win_height = channel_win.getmaxyx()[0]
if(window == 1):
@@ -746,7 +745,7 @@ def refresh_pad(window):
box.getbegyx()[0] + 1, box.getbegyx()[1] + 1,
box.getbegyx()[0] + lines, box.getbegyx()[1] + box.getmaxyx()[1] - 2)
def highlight_line(highlight, window, line, node_state):
def highlight_line(highlight: bool, window: int, line: int) -> None:
pad = nodes_pad
color = get_color("node_list")
@@ -754,7 +753,7 @@ def highlight_line(highlight, window, line, node_state):
if window == 2:
node_num = globals.node_list[line]
node = node_state.interface.nodesByNum[node_num]
node = globals.interface.nodesByNum[node_num]
if 'isFavorite' in node and node['isFavorite']:
color = get_color("node_favorite")
if 'isIgnored' in node and node['isIgnored']:
@@ -767,25 +766,25 @@ def highlight_line(highlight, window, line, node_state):
pad.chgat(line, 1, select_len, color | curses.A_REVERSE if highlight else color)
def add_notification(channel_number):
def add_notification(channel_number: int) -> None:
if channel_number not in globals.notifications:
globals.notifications.append(channel_number)
def remove_notification(channel_number):
def remove_notification(channel_number: int) -> None:
if channel_number in globals.notifications:
globals.notifications.remove(channel_number)
def draw_text_field(win, text, color):
def draw_text_field(win: curses.window, text: str, color: int) -> None:
win.border()
win.addstr(1, 1, text, color)
def draw_centered_text_field(win, text, y_offset, color):
def draw_centered_text_field(win: curses.window, text: str, y_offset: int, color: int) -> None:
height, width = win.getmaxyx()
x = (width - len(text)) // 2
y = (height // 2) + y_offset
win.addstr(y, x, text, color)
win.refresh()
def draw_debug(value):
def draw_debug(value: str | int) -> None:
function_win.addstr(1, 1, f"debug: {value} ")
function_win.refresh()

View File

@@ -7,17 +7,23 @@ import sys
from contact.utilities.save_to_radio import save_changes
from contact.utilities.config_io import config_export, config_import
from contact.utilities.input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input, get_admin_key_input
from contact.utilities.input_handlers import (
get_repeated_input,
get_text_input,
get_fixed32_input,
get_list_input,
get_admin_key_input,
)
from contact.ui.menus import generate_menu_from_protobuf
from contact.ui.colors import get_color
from contact.ui.dialog import dialog
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
from contact.ui.user_config import json_editor
from contact.ui.ui_state import MenuState
from contact.ui.navigation_utils import move_highlight
menu_state = MenuState()
# Constants
width = 80
save_option = "Save Changes"
@@ -38,15 +44,21 @@ config_folder = os.path.join(locals_dir, "node-configs")
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
# Aliases
Segment = tuple[str, str, bool, bool]
WrappedLine = list[Segment]
def display_menu(menu_state):
def display_menu(
menu_state: MenuState,
) -> tuple[object, object]: # curses.window or pad types
min_help_window_height = 6
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
# Determine the available height for the menu
max_menu_height = curses.LINES
menu_height = min(max_menu_height - min_help_window_height, num_items + 5)
max_menu_height = curses.LINES
menu_height = min(max_menu_height - min_help_window_height, num_items + 5)
start_y = (curses.LINES - menu_height) // 2 - (min_help_window_height // 2)
start_x = (curses.COLS - width) // 2
@@ -67,7 +79,7 @@ def display_menu(menu_state):
header = " > ".join(word.title() for word in menu_state.menu_path)
if len(header) > width - 4:
header = header[:width - 7] + "..."
header = header[: width - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
transformed_path = transform_menu_path(menu_state.menu_path)
@@ -75,55 +87,118 @@ def display_menu(menu_state):
for idx, option in enumerate(menu_state.current_menu):
field_info = menu_state.current_menu[option]
current_value = field_info[1] if isinstance(field_info, tuple) else ""
full_key = '.'.join(transformed_path + [option])
full_key = ".".join(transformed_path + [option])
display_name = field_mapping.get(full_key, option)
display_option = f"{display_name}"[:width // 2 - 2]
display_value = f"{current_value}"[:width // 2 - 4]
display_option = f"{display_name}"[: width // 2 - 2]
display_value = f"{current_value}"[: width // 2 - 4]
try:
color = get_color("settings_sensitive" if option in sensitive_settings else "settings_default", reverse=(idx == menu_state.selected_index))
menu_pad.addstr(idx, 0, f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
color = get_color(
(
"settings_sensitive"
if option in sensitive_settings
else "settings_default"
),
reverse=(idx == menu_state.selected_index),
)
menu_pad.addstr(
idx,
0,
f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8),
color,
)
except curses.error:
pass
if menu_state.show_save_option:
save_position = menu_height - 2
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))))
menu_win.addstr(
save_position,
(width - len(save_option)) // 2,
save_option,
get_color(
"settings_save",
reverse=(menu_state.selected_index == len(menu_state.current_menu)),
),
)
# Draw help window with dynamically updated max_help_lines
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
draw_help_window(
start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state
)
menu_win.refresh()
menu_pad.refresh(
menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4
menu_state.start_index[-1],
0,
menu_win.getbegyx()[0] + 3,
menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0]
+ 3
+ menu_win.getmaxyx()[0]
- 5
- (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
)
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
visible_height = (
menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
)
draw_arrows(menu_win, visible_height, max_index, menu_state)
return menu_win, menu_pad
def draw_help_window(menu_start_y, menu_start_x, menu_height, max_help_lines, transformed_path, menu_state):
def draw_help_window(
menu_start_y: int,
menu_start_x: int,
menu_height: int,
max_help_lines: int,
transformed_path: list[str],
menu_state: MenuState,
) -> None:
global help_win
if 'help_win' not in globals():
if "help_win" not in globals():
help_win = None # Initialize if it does not exist
selected_option = list(menu_state.current_menu.keys())[menu_state.selected_index] if menu_state.current_menu else None
selected_option = (
list(menu_state.current_menu.keys())[menu_state.selected_index]
if menu_state.current_menu
else None
)
help_y = menu_start_y + menu_height
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_start_x)
help_win = update_help_window(
help_win,
help_text,
transformed_path,
selected_option,
max_help_lines,
width,
help_y,
menu_start_x,
)
def update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, help_x):
def update_help_window(
help_win: object, # curses window or None
help_text: dict[str, str],
transformed_path: list[str],
selected_option: str | None,
max_help_lines: int,
width: int,
help_y: int,
help_x: int,
) -> object: # returns a curses window
"""Handles rendering the help window consistently."""
wrapped_help = get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_help_lines)
wrapped_help = get_wrapped_help_text(
help_text, transformed_path, selected_option, width, max_help_lines
)
help_height = min(len(wrapped_help) + 2, max_help_lines + 2) # +2 for border
help_height = max(help_height, 3) # Ensure at least 3 rows (1 text + border)
@@ -159,26 +234,41 @@ def update_help_window(help_win, help_text, transformed_path, selected_option, m
return help_win
def get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_lines):
def get_wrapped_help_text(
help_text: dict[str, str],
transformed_path: list[str],
selected_option: str | None,
width: int,
max_lines: int,
) -> list[WrappedLine]:
"""Fetches and formats help text for display, ensuring it fits within the allowed lines."""
full_help_key = '.'.join(transformed_path + [selected_option]) if selected_option else None
full_help_key = (
".".join(transformed_path + [selected_option]) if selected_option else None
)
help_content = help_text.get(full_help_key, "No help available.")
wrap_width = max(width - 6, 10) # Ensure a valid wrapping width
# Color replacements
color_mappings = {
r'\[warning\](.*?)\[/warning\]': ('settings_warning', True, False), # Red for warnings
r'\[note\](.*?)\[/note\]': ('settings_note', True, False), # Green for notes
r'\[underline\](.*?)\[/underline\]': ('settings_default', False, True), # Underline
r'\\033\[31m(.*?)\\033\[0m': ('settings_warning', True, False), # Red text
r'\\033\[32m(.*?)\\033\[0m': ('settings_note', True, False), # Green text
r'\\033\[4m(.*?)\\033\[0m': ('settings_default', False, True) # Underline
r"\[warning\](.*?)\[/warning\]": (
"settings_warning",
True,
False,
), # Red for warnings
r"\[note\](.*?)\[/note\]": ("settings_note", True, False), # Green for notes
r"\[underline\](.*?)\[/underline\]": (
"settings_default",
False,
True,
), # Underline
r"\\033\[31m(.*?)\\033\[0m": ("settings_warning", True, False), # Red text
r"\\033\[32m(.*?)\\033\[0m": ("settings_note", True, False), # Green text
r"\\033\[4m(.*?)\\033\[0m": ("settings_default", False, True), # Underline
}
def extract_ansi_segments(text):
def extract_ansi_segments(text: str) -> list[Segment]:
"""Extracts and replaces ANSI color codes, ensuring spaces are preserved."""
matches = []
last_pos = 0
@@ -187,7 +277,9 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
# Find all matches and store their positions
for pattern, (color, bold, underline) in color_mappings.items():
for match in re.finditer(pattern, text):
pattern_matches.append((match.start(), match.end(), match.group(1), color, bold, underline))
pattern_matches.append(
(match.start(), match.end(), match.group(1), color, bold, underline)
)
# Sort matches by start position to process sequentially
pattern_matches.sort(key=lambda x: x[0])
@@ -197,7 +289,7 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
if last_pos < start:
segment = text[last_pos:start]
matches.append((segment, "settings_default", False, False))
# Append the colored segment
matches.append((content, color, bold, underline))
last_pos = end
@@ -208,14 +300,14 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
return matches
def wrap_ansi_text(segments, wrap_width):
def wrap_ansi_text(segments: list[Segment], wrap_width: int) -> list[WrappedLine]:
"""Wraps text while preserving ANSI formatting and spaces."""
wrapped_lines = []
line_buffer = []
line_length = 0
for text, color, bold, underline in segments:
words = re.findall(r'\S+|\s+', text) # Capture words and spaces separately
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
for word in words:
word_length = len(word)
@@ -245,64 +337,76 @@ def get_wrapped_help_text(help_text, transformed_path, selected_option, width, m
# Trim and add ellipsis if needed
if len(wrapped_help) > max_lines:
wrapped_help = wrapped_help[:max_lines]
wrapped_help[-1].append(("...", "settings_default", False, False))
wrapped_help = wrapped_help[:max_lines]
wrapped_help[-1].append(("...", "settings_default", False, False))
return wrapped_help
def move_highlight(old_idx, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state):
if old_idx == menu_state.selected_index: # No-op
return
# def move_highlight(
# old_idx: int,
# options: list[str],
# menu_win: object,
# menu_pad: object,
# help_win: object,
# help_text: dict[str, str],
# max_help_lines: int,
# menu_state: MenuState
# ) -> None:
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# if old_idx == menu_state.selected_index: # No-op
# return
# Adjust menu_state.start_index only when moving out of visible range
if menu_state.selected_index == max_index and menu_state.show_save_option:
pass
elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
menu_state.start_index[-1] = menu_state.selected_index
elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
menu_state.start_index[-1] = menu_state.selected_index - visible_height
pass
# max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# Ensure menu_state.start_index is within bounds
menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# # Adjust menu_state.start_index only when moving out of visible range
# if menu_state.selected_index == max_index and menu_state.show_save_option:
# pass
# elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
# menu_state.start_index[-1] = menu_state.selected_index
# elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
# menu_state.start_index[-1] = menu_state.selected_index - visible_height
# pass
# Clear old selection
if menu_state.show_save_option and old_idx == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
else:
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
# # Ensure menu_state.start_index is within bounds
# menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# Highlight new selection
if menu_state.show_save_option and menu_state.selected_index == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
else:
menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
# # Clear old selection
# if menu_state.show_save_option and old_idx == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
# else:
# menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# # Highlight new selection
# if menu_state.show_save_option and menu_state.selected_index == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
# else:
# menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
# Update help window
transformed_path = transform_menu_path(menu_state.menu_path)
selected_option = options[menu_state.selected_index] if menu_state.selected_index < len(options) else None
help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_win.getbegyx()[1])
# menu_win.refresh()
draw_arrows(menu_win, visible_height, max_index, menu_state)
# # Refresh pad only if scrolling is needed
# menu_pad.refresh(menu_state.start_index[-1], 0,
# menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
# menu_win.getbegyx()[0] + 3 + visible_height,
# menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# # Update help window
# transformed_path = transform_menu_path(menu_state.menu_path)
# selected_option = options[menu_state.selected_index] if menu_state.selected_index < len(options) else None
# help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
# help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_win.getbegyx()[1])
# draw_arrows(menu_win, visible_height, max_index, menu_state)
def draw_arrows(win, visible_height, max_index, menu_state):
def draw_arrows(
win: object, visible_height: int, max_index: int, menu_state: MenuState
) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if menu_state.show_save_option else 0)
mi = max_index - (2 if menu_state.show_save_option else 0)
if visible_height < mi:
if menu_state.start_index[-1] > 0:
@@ -310,35 +414,46 @@ def draw_arrows(win, visible_height, max_index, menu_state):
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if mi - menu_state.start_index[-1] >= visible_height + (0 if menu_state.show_save_option else 1) :
if mi - menu_state.start_index[-1] >= visible_height + (
0 if menu_state.show_save_option else 1
):
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
def settings_menu(stdscr, node_state):
def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols()
menu = generate_menu_from_protobuf(node_state)
menu = generate_menu_from_protobuf(interface)
menu_state.current_menu = menu["Main Menu"]
menu_state.menu_path = ["Main Menu"]
modified_settings = {}
need_redraw = True
menu_state.show_save_option = False
while True:
if(need_redraw):
if need_redraw:
options = list(menu_state.current_menu.keys())
menu_state.show_save_option = (
len(menu_state.menu_path) > 2 and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
) or (
len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path
) or (
len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path
(
len(menu_state.menu_path) > 2
and (
"Radio Settings" in menu_state.menu_path
or "Module Settings" in menu_state.menu_path
)
)
or (
len(menu_state.menu_path) == 2
and "User Settings" in menu_state.menu_path
)
or (
len(menu_state.menu_path) == 3
and "Channels" in menu_state.menu_path
)
)
# Display the menu
@@ -353,14 +468,52 @@ def settings_menu(stdscr, node_state):
# max_help_lines = 4
if key == curses.KEY_UP:
old_selected_index = menu_state.selected_index
menu_state.selected_index = max_index if menu_state.selected_index == 0 else menu_state.selected_index - 1
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
old_idx = menu_state.selected_index
menu_state.selected_index = (
max_index
if menu_state.selected_index == 0
else menu_state.selected_index - 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
help_win=help_win,
help_updater=update_help_window,
field_mapping=help_text,
menu_path=transform_menu_path(menu_state.menu_path),
max_help_lines=max_help_lines,
sensitive_mode=True,
)
elif key == curses.KEY_DOWN:
old_selected_index = menu_state.selected_index
menu_state.selected_index = 0 if menu_state.selected_index == max_index else menu_state.selected_index + 1
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
old_idx = menu_state.selected_index
menu_state.selected_index = (
0
if menu_state.selected_index == max_index
else menu_state.selected_index + 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
help_win=help_win,
help_updater=update_help_window,
field_mapping=help_text,
menu_path=transform_menu_path(menu_state.menu_path),
max_help_lines=max_help_lines,
sensitive_mode=True,
)
elif key == curses.KEY_RESIZE:
need_redraw = True
@@ -373,11 +526,25 @@ def settings_menu(stdscr, node_state):
help_win.refresh()
elif key == ord("\t") and menu_state.show_save_option:
old_selected_index = menu_state.selected_index
old_idx = menu_state.selected_index
menu_state.selected_index = max_index
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
elif key == curses.KEY_RIGHT or key == ord('\n'):
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
help_win=help_win,
help_updater=update_help_window,
field_mapping=help_text,
menu_path=transform_menu_path(menu_state.menu_path),
max_help_lines=max_help_lines,
sensitive_mode=True,
)
elif key == curses.KEY_RIGHT or key == ord("\n"):
need_redraw = True
menu_state.start_index.append(0)
menu_win.erase()
@@ -388,8 +555,10 @@ def settings_menu(stdscr, node_state):
menu_win.refresh()
help_win.refresh()
if menu_state.show_save_option and menu_state.selected_index == len(options):
save_changes(node_state, modified_settings, menu_state)
if menu_state.show_save_option and menu_state.selected_index == len(
options
):
save_changes(interface, modified_settings, menu_state)
modified_settings.clear()
logging.info("Changes Saved")
@@ -416,13 +585,19 @@ def settings_menu(stdscr, node_state):
filename += ".yaml"
try:
config_text = config_export(node_state)
config_text = config_export(interface)
yaml_file_path = os.path.join(config_folder, filename)
if os.path.exists(yaml_file_path):
overwrite = get_list_input(f"{filename} already exists. Overwrite?", None, ["Yes", "No"])
overwrite = get_list_input(
f"{filename} already exists. Overwrite?",
None,
["Yes", "No"],
)
if overwrite == "No":
logging.info("Export cancelled: User chose not to overwrite.")
logging.info(
"Export cancelled: User chose not to overwrite."
)
menu_state.start_index.pop()
continue # Return to menu
os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True)
@@ -433,22 +608,30 @@ def settings_menu(stdscr, node_state):
menu_state.start_index.pop()
continue
except PermissionError:
logging.error(f"Permission denied: Unable to write to {yaml_file_path}")
logging.error(
f"Permission denied: Unable to write to {yaml_file_path}"
)
except OSError as e:
logging.error(f"OS error while saving config: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
menu_state.start_index.pop()
continue
elif selected_option == "Load Config File":
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
if not os.path.exists(config_folder) or not any(
os.listdir(config_folder)
):
dialog(stdscr, "", " No config files found. Export a config first.")
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
file_list = [
f
for f in os.listdir(config_folder)
if os.path.isfile(os.path.join(config_folder, f))
]
# Ensure file_list is not empty before proceeding
if not file_list:
@@ -458,52 +641,68 @@ def settings_menu(stdscr, node_state):
filename = get_list_input("Choose a config file", None, file_list)
if filename:
file_path = os.path.join(config_folder, filename)
overwrite = get_list_input(f"Are you sure you want to load {filename}?", None, ["Yes", "No"])
overwrite = get_list_input(
f"Are you sure you want to load {filename}?",
None,
["Yes", "No"],
)
if overwrite == "Yes":
config_import(node_state, file_path)
config_import(interface, file_path)
menu_state.start_index.pop()
continue
elif selected_option == "Config URL":
current_value = node_state.interface.localNode.getURL()
current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}")
if new_value is not None:
current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
overwrite = get_list_input(
f"Are you sure you want to load this config?",
None,
["Yes", "No"],
)
if overwrite == "Yes":
node_state.interface.localNode.setURL(new_value)
interface.localNode.setURL(new_value)
logging.info(f"New Config URL sent to node")
menu_state.start_index.pop()
continue
elif selected_option == "Reboot":
confirmation = get_list_input("Are you sure you want to Reboot?", None, ["Yes", "No"])
confirmation = get_list_input(
"Are you sure you want to Reboot?", None, ["Yes", "No"]
)
if confirmation == "Yes":
node_state.interface.localNode.reboot()
interface.localNode.reboot()
logging.info(f"Node Reboot Requested by menu")
menu_state.start_index.pop()
continue
elif selected_option == "Reset Node DB":
confirmation = get_list_input("Are you sure you want to Reset Node DB?", None, ["Yes", "No"])
confirmation = get_list_input(
"Are you sure you want to Reset Node DB?", None, ["Yes", "No"]
)
if confirmation == "Yes":
node_state.interface.localNode.resetNodeDb()
interface.localNode.resetNodeDb()
logging.info(f"Node DB Reset Requested by menu")
menu_state.start_index.pop()
continue
elif selected_option == "Shutdown":
confirmation = get_list_input("Are you sure you want to Shutdown?", None, ["Yes", "No"])
confirmation = get_list_input(
"Are you sure you want to Shutdown?", None, ["Yes", "No"]
)
if confirmation == "Yes":
node_state.interface.localNode.shutdown()
interface.localNode.shutdown()
logging.info(f"Node Shutdown Requested by menu")
menu_state.start_index.pop()
continue
elif selected_option == "Factory Reset":
confirmation = get_list_input("Are you sure you want to Factory Reset?", None, ["Yes", "No"])
confirmation = get_list_input(
"Are you sure you want to Factory Reset?", None, ["Yes", "No"]
)
if confirmation == "Yes":
node_state.interface.localNode.factoryReset()
interface.localNode.factoryReset()
logging.info(f"Factory Reset Requested by menu")
menu_state.start_index.pop()
continue
@@ -520,26 +719,32 @@ def settings_menu(stdscr, node_state):
menu_state.selected_index = 4
continue
# need_redraw = True
field_info = menu_state.current_menu.get(selected_option)
if isinstance(field_info, tuple):
field, current_value = field_info
# Transform the menu path to get the full key
transformed_path = transform_menu_path(menu_state.menu_path)
full_key = '.'.join(transformed_path + [selected_option])
full_key = ".".join(transformed_path + [selected_option])
# Fetch human-readable name from field_mapping
human_readable_name = field_mapping.get(full_key, selected_option)
if selected_option in ['longName', 'shortName', 'isLicensed']:
if selected_option in ['longName', 'shortName']:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
elif selected_option == 'isLicensed':
new_value = get_list_input(f"{human_readable_name} is currently: {current_value}", str(current_value), ["True", "False"])
elif selected_option == "isLicensed":
new_value = get_list_input(
f"{human_readable_name} is currently: {current_value}",
str(current_value),
["True", "False"],
)
new_value = new_value == "True"
menu_state.current_menu[selected_option] = (field, new_value)
@@ -548,57 +753,82 @@ def settings_menu(stdscr, node_state):
menu_state.start_index.pop()
elif selected_option in ['latitude', 'longitude', 'altitude']:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
for option in ['latitude', 'longitude', 'altitude']:
for option in ["latitude", "longitude", "altitude"]:
if option in menu_state.current_menu:
modified_settings[option] = menu_state.current_menu[option][1]
modified_settings[option] = menu_state.current_menu[option][
1
]
menu_state.start_index.pop()
elif selected_option == "admin_key":
new_values = get_admin_key_input(current_value)
new_value = current_value if new_values is None else [base64.b64decode(key) for key in new_values]
new_value = (
current_value
if new_values is None
else [base64.b64decode(key) for key in new_values]
)
menu_state.start_index.pop()
elif field.type == 8: # Handle boolean type
new_value = get_list_input(human_readable_name, str(current_value), ["True", "False"])
new_value = new_value == "True" or new_value is True
new_value = get_list_input(
human_readable_name, str(current_value), ["True", "False"]
)
if new_value == "Not Set":
pass # Leave it as-is
else:
new_value = new_value == "True" or new_value is True
menu_state.start_index.pop()
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
elif (
field.label == field.LABEL_REPEATED
): # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ")
new_value = (
current_value if new_value is None else new_value.split(", ")
)
menu_state.start_index.pop()
elif field.enum_type: # Enum field
enum_options = {v.name: v.number for v in field.enum_type.values}
new_value_name = get_list_input(human_readable_name, current_value, list(enum_options.keys()))
new_value_name = get_list_input(
human_readable_name, current_value, list(enum_options.keys())
)
new_value = enum_options.get(new_value_name, current_value)
menu_state.start_index.pop()
elif field.type == 7: # Field type 7 corresponds to FIXED32
elif field.type == 7: # Field type 7 corresponds to FIXED32
new_value = get_fixed32_input(current_value)
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()
for key in menu_state.menu_path[3:]: # Skip "Main Menu"
modified_settings = modified_settings.setdefault(key, {})
@@ -607,8 +837,14 @@ def settings_menu(stdscr, node_state):
# Convert enum string to int
if field and field.enum_type:
enum_value_descriptor = field.enum_type.values_by_number.get(new_value)
new_value = enum_value_descriptor.name if enum_value_descriptor else new_value
enum_value_descriptor = field.enum_type.values_by_number.get(
new_value
)
new_value = (
enum_value_descriptor.name
if enum_value_descriptor
else new_value
)
menu_state.current_menu[selected_option] = (field, new_value)
else:
@@ -617,7 +853,6 @@ def settings_menu(stdscr, node_state):
menu_state.menu_index.append(menu_state.selected_index)
menu_state.selected_index = 0
elif key == curses.KEY_LEFT:
need_redraw = True
@@ -641,14 +876,15 @@ def settings_menu(stdscr, node_state):
menu_state.current_menu = menu_state.current_menu.get(step, {})
menu_state.selected_index = menu_state.menu_index.pop()
menu_state.start_index.pop()
elif key == 27: # Escape key
menu_win.erase()
menu_win.refresh()
break
def set_region(node_state):
node = node_state.interface.getNode('^local')
def set_region(interface: object) -> None:
node = interface.getNode("^local")
device_config = node.localConfig
lora_descriptor = device_config.lora.DESCRIPTOR
@@ -658,10 +894,12 @@ def set_region(node_state):
regions = list(region_name_to_number.keys())
new_region_name = get_list_input('Select your region:', 'UNSET', regions)
new_region_name = get_list_input("Select your region:", "UNSET", regions)
# Convert region name to corresponding enum number
new_region_number = region_name_to_number.get(new_region_name, 0) # Default to 0 if not found
new_region_number = region_name_to_number.get(
new_region_name, 0
) # Default to 0 if not found
node.localConfig.lora.region = new_region_number
node.writeConfig("lora")
node.writeConfig("lora")

View File

@@ -1,5 +1,5 @@
import logging
import json
import logging
import os
# Get the parent directory of the script
@@ -11,11 +11,11 @@ json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
def format_json_single_line_arrays(data, indent=4):
def format_json_single_line_arrays(data: dict[str, object], indent: int = 4) -> str:
"""
Formats JSON with arrays on a single line while keeping other elements properly indented.
"""
def format_value(value, current_indent):
def format_value(value: object, current_indent: int) -> str:
if isinstance(value, dict):
items = []
for key, val in value.items():
@@ -31,7 +31,7 @@ def format_json_single_line_arrays(data, indent=4):
return format_value(data, indent)
# Recursive function to check and update nested dictionaries
def update_dict(default, actual):
def update_dict(default: dict[str, object], actual: dict[str, object]) -> bool:
updated = False
for key, value in default.items():
if key not in actual:
@@ -42,7 +42,7 @@ def update_dict(default, actual):
updated = update_dict(value, actual[key]) or updated
return updated
def initialize_config():
def initialize_config() -> dict[str, object]:
COLOR_CONFIG_DARK = {
"default": ["white", "black"],
"background": [" ", "black"],
@@ -161,7 +161,7 @@ def initialize_config():
return loaded_config
def assign_config_variables(loaded_config):
def assign_config_variables(loaded_config: dict[str, object]) -> None:
# Assign values to local variables
global db_file_path, log_file_path, message_prefix, sent_message_prefix

View File

@@ -1,7 +1,7 @@
import curses
from contact.ui.colors import get_color
def dialog(stdscr, title, message):
def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
# Calculate dialog dimensions

View File

@@ -1,19 +1,27 @@
from collections import OrderedDict
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
import logging
import base64
import logging
import os
from collections import OrderedDict
from typing import Any
from google.protobuf.message import Message
from meshtastic.protobuf import channel_pb2, config_pb2, module_config_pb2
locals_dir = os.path.dirname(os.path.abspath(__file__))
translation_file = os.path.join(locals_dir, "localisations", "en.ini")
def encode_if_bytes(value):
def encode_if_bytes(value: Any) -> str:
"""Encode byte values to base64 string."""
if isinstance(value, bytes):
return base64.b64encode(value).decode('utf-8')
return value
def extract_fields(message_instance, current_config=None):
def extract_fields(
message_instance: Message,
current_config: Message | dict[str, Any] | None = None
) -> dict[str, Any]:
if isinstance(current_config, dict): # Handle dictionaries
return {key: (None, encode_if_bytes(current_config.get(key, "Not Set"))) for key in current_config}
@@ -47,7 +55,10 @@ def extract_fields(message_instance, current_config=None):
menu[field.name] = (field, encode_if_bytes(current_value))
return menu
def generate_menu_from_protobuf(interface):
def generate_menu_from_protobuf(interface: object) -> dict[str, Any]:
"""
Builds the full settings menu structure from the protobuf definitions.
"""
menu_structure = {"Main Menu": {}}
# Add User Settings

View File

@@ -0,0 +1,120 @@
# contact/ui/navigation_utils.py
import curses
from contact.ui.colors import get_color
save_option = "Save Changes"
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
def move_highlight(
old_idx: int,
new_idx: int,
options: list[str],
win: curses.window,
pad: curses.window,
*,
start_index_ref: list[int] = None,
selected_index: int = None,
max_help_lines: int = 0,
show_save: bool = False,
help_win: curses.window = None,
help_updater=None,
field_mapping=None,
menu_path: list[str] = None,
width: int = 80,
sensitive_mode: bool = False,
):
if old_idx == new_idx:
return
max_index = len(options) + (1 if show_save else 0) - 1
visible_height = win.getmaxyx()[0] - 5 - (2 if show_save else 0)
# Scrolling logic
if start_index_ref is not None:
if new_idx == max_index and show_save:
pass
elif new_idx < start_index_ref[0]:
start_index_ref[0] = new_idx
elif new_idx >= start_index_ref[0] + visible_height:
start_index_ref[0] = new_idx - visible_height
start_index_ref[0] = max(
0, min(start_index_ref[0], max_index - visible_height + 1)
)
scroll = start_index_ref[0] if start_index_ref else 0
# Clear previous highlight
if show_save and old_idx == max_index:
win.chgat(
win.getmaxyx()[0] - 2,
(width - len(save_option)) // 2,
len(save_option),
get_color("settings_save"),
)
else:
color = (
"settings_sensitive"
if sensitive_mode and options[old_idx] in sensitive_settings
else "settings_default"
)
pad.chgat(old_idx, 0, pad.getmaxyx()[1], get_color(color))
# Apply new highlight
if show_save and new_idx == max_index:
win.chgat(
win.getmaxyx()[0] - 2,
(width - len(save_option)) // 2,
len(save_option),
get_color("settings_save", reverse=True),
)
else:
color = (
"settings_sensitive"
if sensitive_mode and options[new_idx] in sensitive_settings
else "settings_default"
)
pad.chgat(new_idx, 0, pad.getmaxyx()[1], get_color(color, reverse=True))
win.refresh()
pad.refresh(
scroll,
0,
win.getbegyx()[0] + 3,
win.getbegyx()[1] + 4,
win.getbegyx()[0] + 3 + visible_height,
win.getbegyx()[1] + win.getmaxyx()[1] - 4,
)
# Optional help update
if help_win and help_updater and menu_path and selected_index is not None:
selected_option = (
options[selected_index] if selected_index < len(options) else None
)
help_y = win.getbegyx()[0] + win.getmaxyx()[0]
help_updater(
help_win,
field_mapping,
menu_path,
selected_option,
max_help_lines,
width,
help_y,
win.getbegyx()[1],
)
def draw_arrows(
win: curses.window, visible_height: int, max_index: int, start_index: int
) -> None:
if visible_height < max_index:
if start_index > 0:
win.addstr(3, 2, "", get_color("settings_default"))
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if max_index - start_index > visible_height:
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))

View File

@@ -1,7 +1,8 @@
import curses
from contact.ui.colors import get_color
def draw_splash(stdscr):
def draw_splash(stdscr: object) -> None:
"""Draw the splash screen with a logo and connecting message."""
curses.curs_set(0)
stdscr.clear()

View File

@@ -1,27 +1,10 @@
from typing import Any
class MenuState:
def __init__(self):
self.menu_index = [] # Row we left the previous menus
self.start_index = [0] # Row to start the menu if it doesn't all fit
self.selected_index = 0 # Selected Row
self.current_menu = {} # Contents of the current menu
self.menu_path = [] # Menu Path
self.show_save_option = False
class NodeState:
def __init__(self):
self.interface = None
self.myNodeNum = 0
# self.lock = None
# self.display_log = False
# self.all_messages = {}
# self.channel_list = []
# self.notifications = []
# self.packet_buffer = []
# self.node_list = []
# self.selected_channel = 0
# self.selected_message = 0
# self.selected_node = 0
# self.current_window = 0
self.menu_index: list[int]= [] # Row we left the previous menus
self.start_index: list[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: dict[str, Any] | list[Any] | str | int = {} # Contents of the current menu
self.menu_path: list[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'

View File

@@ -1,26 +1,33 @@
import os
import json
import curses
from typing import Any
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
from contact.utilities.input_handlers import get_list_input
from contact.ui.navigation_utils import move_highlight
width = 80
save_option = "Save Changes"
sensitive_settings = []
def edit_color_pair(key, current_value):
def edit_color_pair(key: str, current_value: list[str]) -> list[str]:
"""
Allows the user to select a foreground and background color for a key.
"""
color_list = [" "] + list(COLOR_MAP.keys())
fg_color = get_list_input(f"Select Foreground Color for {key}", current_value[0], color_list)
bg_color = get_list_input(f"Select Background Color for {key}", current_value[1], color_list)
fg_color = get_list_input(
f"Select Foreground Color for {key}", current_value[0], color_list
)
bg_color = get_list_input(
f"Select Background Color for {key}", current_value[1], color_list
)
return [fg_color, bg_color]
def edit_value(key, current_value, menu_state):
def edit_value(key: str, current_value: str) -> str:
height = 10
input_width = width - 16 # Allow space for "New Value: "
@@ -38,9 +45,14 @@ def edit_value(key, current_value, menu_state):
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
wrap_width = width - 4 # Account for border and padding
wrapped_lines = [current_value[i:i+wrap_width] for i in range(0, len(current_value), wrap_width)]
wrapped_lines = [
current_value[i : i + wrap_width]
for i in range(0, len(current_value), wrap_width)
]
for i, line in enumerate(wrapped_lines[:4]): # Limit display to fit the window height
for i, line in enumerate(
wrapped_lines[:4]
): # Limit display to fit the window height
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
edit_win.refresh()
@@ -48,10 +60,14 @@ def edit_value(key, current_value, menu_state):
# Handle theme selection dynamically
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
theme_options = [
k.split("_", 2)[2].lower()
for k in loaded_config.keys()
if k.startswith("COLOR_CONFIG")
]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ['lastHeard', 'name', 'hops']
sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options)
# Standard Input Mode (Scrollable)
@@ -63,18 +79,26 @@ def edit_value(key, current_value, menu_state):
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
visible_text = user_input[
scroll_offset : scroll_offset + input_width
] # Only show what fits
edit_win.addstr(
row, col, " " * input_width, get_color("settings_default")
) # Clear previous text
edit_win.addstr(
row, col, visible_text, get_color("settings_default")
) # Display text
edit_win.refresh()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
edit_win.move(
row, col + min(len(user_input) - scroll_offset, input_width)
) # Adjust cursor position
key = edit_win.get_wch()
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
curses.curs_set(0)
return current_value # Exit without returning a value
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
@@ -82,7 +106,9 @@ def edit_value(key, current_value, menu_state):
if user_input: # Only process if there's something to delete
user_input = user_input[:-1]
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
scroll_offset -= 1 # Move back if text is shorter than scrolled area
scroll_offset -= (
1 # Move back if text is shorter than scrolled area
)
else:
if isinstance(key, str):
user_input += key
@@ -96,7 +122,7 @@ def edit_value(key, current_value, menu_state):
return user_input if user_input else current_value
def display_menu(menu_state):
def display_menu(menu_state: Any) -> tuple[Any, Any, list[str]]:
"""
Render the configuration menu with a Save button directly added to the window.
"""
@@ -112,7 +138,7 @@ def display_menu(menu_state):
# Calculate dynamic dimensions for the menu
max_menu_height = curses.LINES
menu_height = min(max_menu_height, num_items + 5)
menu_height = min(max_menu_height, num_items + 5)
num_items = len(options)
start_y = (curses.LINES - menu_height) // 2
start_x = (curses.COLS - width) // 2
@@ -132,86 +158,120 @@ def display_menu(menu_state):
# Display the menu path
header = " > ".join(menu_state.menu_path)
if len(header) > width - 4:
header = header[:width - 7] + "..."
header = header[: width - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
# Populate the pad with menu options
for idx, key in enumerate(options):
value = menu_state.current_menu[key] if isinstance(menu_state.current_menu, dict) else menu_state.current_menu[int(key.strip("[]"))]
display_key = f"{key}"[:width // 2 - 2]
display_value = (
f"{value}"[:width // 2 - 8]
value = (
menu_state.current_menu[key]
if isinstance(menu_state.current_menu, dict)
else menu_state.current_menu[int(key.strip("[]"))]
)
display_key = f"{key}"[: width // 2 - 2]
display_value = f"{value}"[: width // 2 - 8]
color = get_color("settings_default", reverse=(idx == menu_state.selected_index))
menu_pad.addstr(idx, 0, f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
color = get_color(
"settings_default", reverse=(idx == menu_state.selected_index)
)
menu_pad.addstr(
idx,
0,
f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8),
color,
)
# Add Save button to the main window
if menu_state.show_save_option:
save_position = menu_height - 2
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))))
menu_win.addstr(
save_position,
(width - len(save_option)) // 2,
save_option,
get_color(
"settings_save",
reverse=(menu_state.selected_index == len(menu_state.current_menu)),
),
)
menu_win.refresh()
menu_pad.refresh(
menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4
menu_state.start_index[-1],
0,
menu_win.getbegyx()[0] + 3,
menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0]
+ 3
+ menu_win.getmaxyx()[0]
- 5
- (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
)
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
visible_height = (
menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
)
draw_arrows(menu_win, visible_height, max_index, menu_state)
return menu_win, menu_pad, options
def move_highlight(old_idx, new_idx, options, menu_win, menu_pad, menu_state):
if old_idx == new_idx: # No-op
return
# def move_highlight(
# old_idx: int,
# options: list[str],
# menu_win: curses.window,
# menu_pad: curses.window,
# menu_state: Any
# ) -> None:
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# if old_idx == menu_state.selected_index: # No-op
# return
# Adjust menu_state.start_index only when moving out of visible range
if new_idx == max_index and menu_state.show_save_option:
pass
elif new_idx < menu_state.start_index[-1]: # Moving above the visible area
menu_state.start_index[-1] = new_idx
elif new_idx >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
menu_state.start_index[-1] = new_idx - visible_height
pass
# max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# Ensure menu_state.start_index is within bounds
menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# # Adjust menu_state.start_index only when moving out of visible range
# if menu_state.selected_index == max_index and menu_state.show_save_option:
# pass
# elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
# menu_state.start_index[-1] = menu_state.selected_index
# elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
# menu_state.start_index[-1] = menu_state.selected_index - visible_height
# pass
# Clear old selection
if menu_state.show_save_option and old_idx == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
else:
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
# # Ensure menu_state.start_index is within bounds
# menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# Highlight new selection
if menu_state.show_save_option and new_idx == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
else:
menu_pad.chgat(new_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[new_idx] in sensitive_settings else get_color("settings_default", reverse=True))
# # Clear old selection
# if menu_state.show_save_option and old_idx == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
# else:
# menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# # Highlight new selection
# if menu_state.show_save_option and menu_state.selected_index == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
# else:
# menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
draw_arrows(menu_win, visible_height, max_index, menu_state)
# menu_win.refresh()
# # Refresh pad only if scrolling is needed
# menu_pad.refresh(menu_state.start_index[-1], 0,
# menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
# menu_win.getbegyx()[0] + 3 + visible_height,
# menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# draw_arrows(menu_win, visible_height, max_index, menu_state)
def draw_arrows(win, visible_height, max_index, menu_state):
def draw_arrows(
win: curses.window, visible_height: int, max_index: int, menu_state: any
) -> None:
mi = max_index - (2 if menu_state.show_save_option else 0)
mi = max_index - (2 if menu_state.show_save_option else 0)
if visible_height < mi:
if menu_state.start_index[-1] > 0:
@@ -219,13 +279,15 @@ def draw_arrows(win, visible_height, max_index, menu_state):
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if mi - menu_state.start_index[-1] >= visible_height + (0 if menu_state.show_save_option else 1) :
if mi - menu_state.start_index[-1] >= visible_height + (
0 if menu_state.show_save_option else 1
):
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
def json_editor(stdscr, menu_state):
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.selected_index = 0 # Track the selected option
@@ -241,7 +303,7 @@ def json_editor(stdscr, menu_state):
json.dump({}, f)
# Load JSON data
with open(file_path, "r") as f:
with open(file_path, "r", encoding="utf-8") as f:
original_data = json.load(f)
data = original_data # Reference to the original data
@@ -252,30 +314,68 @@ def json_editor(stdscr, menu_state):
need_redraw = True
while True:
if(need_redraw):
if need_redraw:
menu_win, menu_pad, options = display_menu(menu_state)
menu_win.refresh()
need_redraw = False
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
key = menu_win.getch()
if key == curses.KEY_UP:
old_selected_index = menu_state.selected_index
menu_state.selected_index = max_index if menu_state.selected_index == 0 else menu_state.selected_index - 1
move_highlight(old_selected_index, menu_state.selected_index, options, menu_win, menu_pad, menu_state)
old_idx = menu_state.selected_index
menu_state.selected_index = (
max_index
if menu_state.selected_index == 0
else menu_state.selected_index - 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
sensitive_mode=True,
)
elif key == curses.KEY_DOWN:
old_selected_index = menu_state.selected_index
menu_state.selected_index = 0 if menu_state.selected_index == max_index else menu_state.selected_index + 1
move_highlight(old_selected_index, menu_state.selected_index, options, menu_win, menu_pad, menu_state)
old_idx = menu_state.selected_index
menu_state.selected_index = (
0
if menu_state.selected_index == max_index
else menu_state.selected_index + 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
sensitive_mode=True,
)
elif key == ord("\t") and menu_state.show_save_option:
old_selected_index = menu_state.selected_index
old_idx = menu_state.selected_index
menu_state.selected_index = max_index
move_highlight(old_selected_index, menu_state.selected_index, options, menu_win, menu_pad, menu_state)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
sensitive_mode=True,
)
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
@@ -283,12 +383,14 @@ def json_editor(stdscr, menu_state):
menu_win.erase()
menu_win.refresh()
if menu_state.selected_index < len(options): # Handle selection of a menu item
if menu_state.selected_index < len(
options
): # Handle selection of a menu item
selected_key = options[menu_state.selected_index]
menu_state.menu_path.append(str(selected_key))
menu_state.start_index.append(0)
menu_state.menu_index.append(menu_state.selected_index)
# Handle nested data
if isinstance(menu_state.current_menu, dict):
if selected_key in menu_state.current_menu:
@@ -296,7 +398,9 @@ def json_editor(stdscr, menu_state):
else:
continue # Skip invalid key
elif isinstance(menu_state.current_menu, list):
selected_data = menu_state.current_menu[int(selected_key.strip("[]"))]
selected_data = menu_state.current_menu[
int(selected_key.strip("[]"))
]
if isinstance(selected_data, list) and len(selected_data) == 2:
# Edit color pair
@@ -313,13 +417,13 @@ def json_editor(stdscr, menu_state):
else:
# General value editing
new_value = edit_value(selected_key, selected_data, menu_state)
new_value = edit_value(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.menu_index.pop()
menu_state.start_index.pop()
menu_state.current_menu[selected_key] = new_value
need_redraw = True
else:
# Save button selected
save_json(file_path, data)
@@ -341,23 +445,28 @@ def json_editor(stdscr, menu_state):
menu_state.current_menu = data
for path in menu_state.menu_path[2:]:
menu_state.current_menu = menu_state.current_menu[path] if isinstance(menu_state.current_menu, dict) else menu_state.current_menu[int(path.strip("[]"))]
menu_state.current_menu = (
menu_state.current_menu[path]
if isinstance(menu_state.current_menu, dict)
else menu_state.current_menu[int(path.strip("[]"))]
)
else:
# Exit the editor
menu_win.clear()
menu_win.refresh()
break
def save_json(file_path, data):
def save_json(file_path: str, data: dict[str, Any]) -> None:
formatted_json = format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
setup_colors(reinit=True)
def main(stdscr):
def main(stdscr: curses.window) -> None:
from contact.ui.ui_state import MenuState
menu_state = MenuState()
@@ -369,5 +478,6 @@ def main(stdscr):
setup_colors()
json_editor(stdscr, menu_state)
if __name__ == "__main__":
curses.wrapper(main)
curses.wrapper(main)

View File

@@ -1,7 +1,7 @@
import argparse
from argparse import ArgumentParser
def setup_parser():
parser = argparse.ArgumentParser(
def setup_parser() -> ArgumentParser:
parser = ArgumentParser(
add_help=True,
epilog="If no connection arguments are specified, we attempt a serial connection and then a TCP connection to localhost.")

View File

@@ -1,9 +1,8 @@
import yaml
import logging
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import BROADCAST_ADDR, mt_config
from meshtastic import mt_config
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
@@ -20,9 +19,9 @@ def traverseConfig(config_root, config, interface_config) -> bool:
return True
def splitCompoundName(comp_name: str) -> List[str]:
def splitCompoundName(comp_name: str) -> list[str]:
"""Split compound (dot separated) preference name into parts"""
name: List[str] = comp_name.split(".")
name: list[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
@@ -123,24 +122,24 @@ def setPref(config, comp_name, raw_val) -> bool:
def config_import(node_state, filename):
def config_import(interface, filename):
with open(filename, encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
node_state.interface.getNode('^local', False).beginSettingsTransaction()
interface.getNode('^local', False).beginSettingsTransaction()
if "owner" in configuration:
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
node_state.interface.getNode('^local', False).setOwner(configuration["owner"])
interface.getNode('^local', False).setOwner(configuration["owner"])
if "owner_short" in configuration:
logging.info(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
node_state.interface.getNode('^local', False).setOwner(
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
@@ -149,23 +148,23 @@ def config_import(node_state, filename):
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
node_state.interface.getNode('^local', False).setOwner(
interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
node_state.interface.getNode('^local').setURL(configuration["channel_url"])
interface.getNode('^local').setURL(configuration["channel_url"])
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
node_state.interface.getNode('^local').setURL(configuration["channelUrl"])
interface.getNode('^local').setURL(configuration["channelUrl"])
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = node_state.interface.localNode.localConfig
localConfig = interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"] or 0)
@@ -177,43 +176,43 @@ def config_import(node_state, filename):
lon = float(configuration["location"]["lon"] or 0)
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
node_state.interface.localNode.setFixedPosition(lat, lon, alt)
interface.localNode.setFixedPosition(lat, lon, alt)
if "config" in configuration:
localConfig = node_state.interface.getNode('^local').localConfig
localConfig = interface.getNode('^local').localConfig
for section in configuration["config"]:
traverseConfig(
section, configuration["config"][section], localConfig
)
node_state.interface.getNode('^local').writeConfig(
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
if "module_config" in configuration:
moduleConfig = node_state.interface.getNode('^local').moduleConfig
moduleConfig = interface.getNode('^local').moduleConfig
for section in configuration["module_config"]:
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
node_state.interface.getNode('^local').writeConfig(
interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
node_state.interface.getNode('^local', False).commitSettingsTransaction()
interface.getNode('^local', False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")
def config_export(node_state) -> str:
def config_export(interface) -> str:
"""used in --export-config"""
configObj = {}
owner = node_state.interface.getLongName()
owner_short = node_state.interface.getShortName()
channel_url = node_state.interface.localNode.getURL()
myinfo = node_state.interface.getMyNodeInfo()
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
pos = myinfo.get("position")
lat = None
lon = None
@@ -238,7 +237,7 @@ def config_export(node_state) -> str:
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(node_state.interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
@@ -261,7 +260,7 @@ def config_export(node_state) -> str:
else:
configObj["config"] = config
module_config = MessageToDict(node_state.interface.localNode.moduleConfig)
module_config = MessageToDict(interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}

View File

@@ -1,10 +1,11 @@
import re
def parse_ini_file(ini_file_path: str) -> tuple[dict[str, str], dict[str, str]]:
"""Parses an INI file and returns a mapping of keys to human-readable names and help text."""
def parse_ini_file(ini_file_path):
field_mapping = {}
help_text = {}
current_section = None
field_mapping: dict[str, str] = {}
help_text: dict[str, str] = {}
current_section: str | None = None
with open(ini_file_path, 'r', encoding='utf-8') as f:
for line in f:
@@ -46,14 +47,14 @@ def parse_ini_file(ini_file_path):
return field_mapping, help_text
def transform_menu_path(menu_path):
def transform_menu_path(menu_path: list[str]) -> list[str]:
"""Applies path replacements and normalizes entries in the menu path."""
path_replacements = {
"Radio Settings": "config",
"Module Settings": "module"
}
transformed_path = []
transformed_path: list[str] = []
for part in menu_path[1:]: # Skip 'Main Menu'
# Apply fixed replacements
part = path_replacements.get(part, part)

View File

@@ -7,14 +7,15 @@ from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
def get_table_name(channel, node_state):
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(node_state.myNodeNum)}_{channel}_messages"
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
def save_message_to_db(channel, user_id, message_text):
def save_message_to_db(channel: str, user_id: str, message_text: str) -> int | None:
"""Save messages to the database, ensuring the table exists."""
try:
quoted_table_name = get_table_name(channel)
@@ -47,7 +48,7 @@ def save_message_to_db(channel, user_id, message_text):
logging.error(f"Unexpected error in save_message_to_db: {e}")
def update_ack_nak(channel, timestamp, message, ack, node_state):
def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
@@ -59,7 +60,7 @@ def update_ack_nak(channel, timestamp, message, ack, node_state):
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(node_state.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -69,14 +70,14 @@ def update_ack_nak(channel, timestamp, message, ack, node_state):
logging.error(f"Unexpected error in update_ack_nak: {e}")
def load_messages_from_db(node_state):
def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(node_state.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -123,10 +124,10 @@ def load_messages_from_db(node_state):
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(node_state.myNodeNum):
if user_id == str(globals.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), node_state,'short')}: ", message)
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ", message)
hourly_messages[hour].append(formatted_message)
@@ -142,14 +143,14 @@ def load_messages_from_db(node_state):
logging.error(f"SQLite error in load_messages_from_db: {e}")
def init_nodedb(node_state):
def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists(node_state) # Ensure the table exists before insertion
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
# Insert or update all nodes
@@ -172,7 +173,7 @@ def init_nodedb(node_state):
logging.error(f"Unexpected error in init_nodedb: {e}")
def maybe_store_nodeinfo_in_db(packet, node_state):
def maybe_store_nodeinfo_in_db(packet: dict[str, object]) -> None:
"""Save nodeinfo unless that record is already there, updating if necessary."""
try:
user_id = packet['from']
@@ -183,21 +184,31 @@ def maybe_store_nodeinfo_in_db(packet, node_state):
role = packet['decoded']['user'].get('role', 'CLIENT')
public_key = packet['decoded']['user'].get('publicKey', '')
update_node_info_in_db(node_state, user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
update_node_info_in_db(user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
except sqlite3.Error as e:
logging.error(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
except Exception as e:
logging.error(f"Unexpected error in maybe_store_nodeinfo_in_db: {e}")
def update_node_info_in_db(node_state, user_id, long_name=None, short_name=None, hw_model=None, is_licensed=None, role=None, public_key=None, chat_archived=None):
def update_node_info_in_db(
user_id: int | str,
long_name: str | None = None,
short_name: str | None = None,
hw_model: str | None = None,
is_licensed: str | int | None = None,
role: str | None = None,
public_key: str | None = None,
chat_archived: int | None = None
) -> None:
"""Update or insert node information into the database, preserving unchanged fields."""
try:
ensure_node_table_exists(node_state) # Ensure the table exists before any operation
ensure_node_table_exists() # Ensure the table exists before any operation
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{node_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f'PRAGMA table_info({table_name})')]
if "chat_archived" not in table_columns:
@@ -249,9 +260,9 @@ def update_node_info_in_db(node_state, user_id, long_name=None, short_name=None,
logging.error(f"Unexpected error in update_node_info_in_db: {e}")
def ensure_node_table_exists(node_state):
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{node_state.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
schema = '''
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -265,7 +276,7 @@ def ensure_node_table_exists(node_state):
ensure_table_exists(table_name, schema)
def ensure_table_exists(table_name, schema):
def ensure_table_exists(table_name: str, schema: str) -> None:
"""Ensure the given table exists in the database."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
@@ -279,7 +290,7 @@ def ensure_table_exists(table_name, schema):
logging.error(f"Unexpected error in ensure_table_exists({table_name}): {e}")
def get_name_from_database(user_id, node_state, type="long"):
def get_name_from_database(user_id: int, type: str = "long") -> str:
"""
Retrieve a user's name (long or short) from the node database.
@@ -292,7 +303,7 @@ def get_name_from_database(user_id, node_state, type="long"):
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(node_state.myNodeNum)}_nodedb"
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -313,11 +324,11 @@ def get_name_from_database(user_id, node_state, type="long"):
logging.error(f"Unexpected error in get_name_from_database: {e}")
return "Unknown"
def is_chat_archived(user_id, node_state):
def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(node_state.myNodeNum)}_nodedb"
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -3,11 +3,14 @@ import binascii
import curses
import ipaddress
import re
from typing import Any, Optional
from contact.ui.colors import get_color
from contact.ui.navigation_utils import move_highlight
def wrap_text(text, wrap_width):
def wrap_text(text: str, wrap_width: int) -> list[str]:
"""Wraps text while preserving spaces and breaking long words."""
words = re.findall(r'\S+|\s+', text) # Capture words and spaces separately
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
line_length = 0
@@ -23,7 +26,7 @@ def wrap_text(text, wrap_width):
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
wrapped_lines.append(word[i:i+wrap_width])
wrapped_lines.append(word[i : i + wrap_width])
continue
if line_length + word_length > wrap_width and word.strip():
@@ -38,9 +41,9 @@ def wrap_text(text, wrap_width):
wrapped_lines.append(line_buffer)
return wrapped_lines
def get_text_input(prompt):
def get_text_input(prompt: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts."""
height = 8
width = 80
@@ -60,14 +63,16 @@ def get_text_input(prompt):
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
input_win.addstr(
row, margin, line[:input_width], get_color("settings_default", bold=True)
)
row += 1
if row >= height - 3: # Prevent overflow
break
prompt_text = "Enter new value: "
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.refresh()
curses.curs_set(1)
@@ -104,21 +109,35 @@ def get_text_input(prompt):
first_line = user_input[:first_line_width] # Cut to max first line width
remaining_text = user_input[first_line_width:] # Remaining text for wrapping
wrapped_lines = wrap_text(remaining_text, wrap_width=input_width) if remaining_text else []
wrapped_lines = (
wrap_text(remaining_text, wrap_width=input_width) if remaining_text else []
)
# Clear only the input area (without touching prompt text)
for i in range(max_input_rows):
if row + 1 + i < height - 1:
input_win.addstr(row + 1 + i, margin, " " * min(input_width, width - margin - 1), get_color("settings_default"))
input_win.addstr(
row + 1 + i,
margin,
" " * min(input_width, width - margin - 1),
get_color("settings_default"),
)
# Redraw the prompt text so it never disappears
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
# Redraw wrapped input
input_win.addstr(row + 1, col_start, first_line, get_color("settings_default")) # First line next to prompt
input_win.addstr(
row + 1, col_start, first_line, get_color("settings_default")
) # First line next to prompt
for i, line in enumerate(wrapped_lines):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.addstr(
row + 2 + i,
margin,
line[:input_width],
get_color("settings_default"),
)
input_win.refresh()
@@ -128,7 +147,7 @@ def get_text_input(prompt):
return user_input
def get_admin_key_input(current_value):
def get_admin_key_input(current_value: list[bytes]) -> Optional[list[str]]:
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
@@ -163,56 +182,76 @@ def get_admin_key_input(current_value):
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
repeated_win.addstr(
1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True)
)
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(3 + i, 2, f"{prefix}Admin Key {i + 1}: ", get_color("settings_default", bold=(i == cursor_pos)))
repeated_win.addstr(
3 + i,
2,
f"{prefix}Admin Key {i + 1}: ",
get_color("settings_default", bold=(i == cursor_pos)),
)
repeated_win.addstr(3 + i, 18, line) # Align text for easier editing
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
repeated_win.move(
3 + cursor_pos, 18 + len(user_values[cursor_pos])
) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.addstr(
7, 2, error_message, get_color("settings_default", bold=True)
)
repeated_win.refresh()
key = repeated_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
if (
key == 27 or key == curses.KEY_LEFT
): # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
return None
elif key == ord('\n'): # Enter key to save and return
if all(is_valid_base64(val) for val in user_values): # Ensure all values are valid Base64 and 32 bytes
elif key == ord("\n"): # Enter key to save and return
if all(
is_valid_base64(val) for val in user_values
): # Ensure all values are valid Base64 and 32 bytes
curses.noecho()
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
error_message = (
"Error: Each key must be valid Base64 and 32 bytes long!"
)
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
user_values[cursor_pos] = user_values[cursor_pos][
:-1
] # Remove last character
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
user_values[cursor_pos] += chr(
key
) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
def get_repeated_input(current_value):
def get_repeated_input(current_value: list[str]) -> Optional[str]:
height = 9
width = 80
start_y = (curses.LINES - height) // 2
@@ -234,33 +273,46 @@ def get_repeated_input(current_value):
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True))
repeated_win.addstr(
1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True)
)
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(3 + i, 2, f"{prefix}Value{i + 1}: ", get_color("settings_default", bold=(i == cursor_pos)))
repeated_win.addstr(3 + i, 18, line)
repeated_win.addstr(
3 + i,
2,
f"{prefix}Value{i + 1}: ",
get_color("settings_default", bold=(i == cursor_pos)),
)
repeated_win.addstr(3 + i, 18, line)
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
repeated_win.move(
3 + cursor_pos, 18 + len(user_values[cursor_pos])
) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.addstr(
7, 2, error_message, get_color("settings_default", bold=True)
)
repeated_win.refresh()
key = repeated_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
if (
key == 27 or key == curses.KEY_LEFT
): # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
return None
elif key == ord('\n'): # Enter key to save and return
elif key == ord("\n"): # Enter key to save and return
curses.noecho()
curses.curs_set(0)
return ", ".join(user_values)
@@ -270,16 +322,20 @@ def get_repeated_input(current_value):
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
user_values[cursor_pos] = user_values[cursor_pos][
:-1
] # Remove last character
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
user_values[cursor_pos] += chr(
key
) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
def get_fixed32_input(current_value):
def get_fixed32_input(current_value: int) -> int:
cvalue = current_value
current_value = str(ipaddress.IPv4Address(current_value))
height = 10
@@ -299,7 +355,9 @@ def get_fixed32_input(current_value):
while True:
fixed32_win.erase()
fixed32_win.border()
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(
1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD
)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
fixed32_win.addstr(5, 2, f"New value: {user_input}")
fixed32_win.refresh()
@@ -312,16 +370,23 @@ def get_fixed32_input(current_value):
curses.noecho()
curses.curs_set(0)
return cvalue # Return the current value unchanged
elif key == ord('\n'): # Enter key to validate and save
elif key == ord("\n"): # Enter key to validate and save
# Validate IP address
octets = user_input.split(".")
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
if len(octets) == 4 and all(
octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets
):
curses.noecho()
curses.curs_set(0)
fixed32_address = ipaddress.ip_address(user_input)
return int(fixed32_address) # Return the valid IP address
else:
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
fixed32_win.addstr(
7,
2,
"Invalid IP address. Try again.",
curses.A_BOLD | curses.color_pair(5),
)
fixed32_win.refresh()
curses.napms(1500) # Wait for 1.5 seconds before refreshing
user_input = "" # Clear invalid input
@@ -336,11 +401,16 @@ def get_fixed32_input(current_value):
pass # Ignore invalid inputs
def get_list_input(prompt, current_option, list_options):
def get_list_input(
prompt: str, current_option: Optional[str], list_options: list[str]
) -> Optional[str]:
"""
Displays a scrollable list of list_options for the user to choose from.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
selected_index = (
list_options.index(current_option) if current_option in list_options else 0
)
scroll_offset = 0
height = min(len(list_options) + 5, curses.LINES)
width = 80
@@ -363,16 +433,28 @@ def get_list_input(prompt, current_option, list_options):
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
list_pad.addstr(
idx,
0,
color.ljust(width - 8),
get_color("settings_default", reverse=True),
)
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
list_pad.addstr(
idx, 0, color.ljust(width - 8), get_color("settings_default")
)
# Initial refresh
list_win.refresh()
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
max_index = len(list_options) - 1
visible_height = list_win.getmaxyx()[0] - 5
@@ -382,14 +464,32 @@ def get_list_input(prompt, current_option, list_options):
key = list_win.getch()
if key == curses.KEY_UP:
old_selected_index = selected_index
old_idx = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
scroll_ref = [scroll_offset]
move_highlight(
old_idx,
selected_index,
list_options,
list_win,
list_pad,
start_index_ref=scroll_ref,
)
scroll_offset = scroll_ref[0]
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
old_idx = selected_index
selected_index = min(len(list_options) - 1, selected_index + 1)
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
elif key == ord('\n'): # Enter key
scroll_ref = [scroll_offset]
move_highlight(
old_idx,
selected_index,
list_options,
list_win,
list_pad,
start_index_ref=scroll_ref,
)
scroll_offset = scroll_ref[0]
elif key == ord("\n"): # Enter key
list_win.clear()
list_win.refresh()
return list_options[selected_index]
@@ -399,47 +499,55 @@ def get_list_input(prompt, current_option, list_options):
return current_option
def move_highlight(old_idx, new_idx, options, list_win, list_pad):
# def move_highlight(
# old_idx: int,
# new_idx: int,
# options: list[str],
# list_win: curses.window,
# list_pad: curses.window
# ) -> int:
global scroll_offset
if 'scroll_offset' not in globals():
scroll_offset = 0 # Initialize if not set
# global scroll_offset
# if 'scroll_offset' not in globals():
# scroll_offset = 0 # Initialize if not set
if old_idx == new_idx:
return # No-op
# if old_idx == new_idx:
# return # No-op
max_index = len(options) - 1
visible_height = list_win.getmaxyx()[0] - 5
# max_index = len(options) - 1
# visible_height = list_win.getmaxyx()[0] - 5
# Adjust scroll_offset only when moving out of visible range
if new_idx < scroll_offset: # Moving above the visible area
scroll_offset = new_idx
elif new_idx >= scroll_offset + visible_height: # Moving below the visible area
scroll_offset = new_idx - visible_height
# # Adjust scroll_offset only when moving out of visible range
# if new_idx < scroll_offset: # Moving above the visible area
# scroll_offset = new_idx
# elif new_idx >= scroll_offset + visible_height: # Moving below the visible area
# scroll_offset = new_idx - visible_height
# Ensure scroll_offset is within bounds
scroll_offset = max(0, min(scroll_offset, max_index - visible_height + 1))
# # Ensure scroll_offset is within bounds
# scroll_offset = max(0, min(scroll_offset, max_index - visible_height + 1))
# Clear old highlight
list_pad.chgat(old_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default"))
# # Clear old highlight
# list_pad.chgat(old_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default"))
# Highlight new selection
list_pad.chgat(new_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default", reverse=True))
# # Highlight new selection
# list_pad.chgat(new_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default", reverse=True))
list_win.refresh()
# Refresh pad only if scrolling is needed
list_pad.refresh(scroll_offset, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + 3 + visible_height,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
draw_arrows(list_win, visible_height, max_index, scroll_offset)
# list_win.refresh()
return scroll_offset # Return updated scroll_offset to be stored externally
# # Refresh pad only if scrolling is needed
# list_pad.refresh(scroll_offset, 0,
# list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
# list_win.getbegyx()[0] + 3 + visible_height,
# list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
# draw_arrows(list_win, visible_height, max_index, scroll_offset)
# return scroll_offset # Return updated scroll_offset to be stored externally
def draw_arrows(win, visible_height, max_index, start_index):
def draw_arrows(
win: curses.window, visible_height: int, max_index: int, start_index: int
) -> None:
if visible_height < max_index:
if start_index > 0:
@@ -451,4 +559,3 @@ def draw_arrows(win, visible_height, max_index, start_index):
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))

View File

@@ -1,23 +1,41 @@
import logging
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
import contact.globals as globals
def initialize_interface(args):
try:
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
return meshtastic.tcp_interface.TCPInterface(args.host)
try:
if ":" in args.host:
tcp_hostname, tcp_port = args.host.split(':')
else:
tcp_hostname = args.host
tcp_port = meshtastic.tcp_interface.DEFAULT_TCP_PORT
return meshtastic.tcp_interface.TCPInterface(tcp_hostname, portNumber=tcp_port)
except Exception as ex:
logging.error(f"Error connecting to {args.host}. {ex}")
else:
try:
return meshtastic.serial_interface.SerialInterface(args.port)
client = meshtastic.serial_interface.SerialInterface(args.port)
except FileNotFoundError as ex:
logging.error(f"The serial device at '{args.port}' was not found. {ex}")
except PermissionError as ex:
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
except Exception as ex:
logging.error(f"Unexpected error initializing interface: {ex}")
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
except OSError as ex:
logging.error(f"The serial device couldn't be opened, it might be in use by another process. {ex}")
if client.devPath is None:
try:
client = meshtastic.tcp_interface.TCPInterface("localhost")
except Exception as ex:
logging.error(f"Error connecting to localhost:{ex}")
return client
except Exception as ex:
logging.critical(f"Fatal error initializing interface: {ex}")

View File

@@ -4,7 +4,7 @@ import logging
import base64
import time
def save_changes(node_state, modified_settings, menu_state):
def save_changes(interface, modified_settings, menu_state):
"""
Save changes to the device based on modified settings.
:param interface: Meshtastic interface instance
@@ -16,7 +16,7 @@ def save_changes(node_state, modified_settings, menu_state):
logging.info("No changes to save. modified_settings is empty.")
return
node = node_state.interface.getNode('^local')
node = interface.getNode('^local')
admin_key_backup = None
if 'admin_key' in modified_settings:
# Get reference to security config
@@ -60,7 +60,7 @@ def save_changes(node_state, modified_settings, menu_state):
lon = float(modified_settings.get('longitude', 0.0))
alt = int(modified_settings.get('altitude', 0))
node_state.interface.localNode.setFixedPosition(lat, lon, alt)
interface.localNode.setFixedPosition(lat, lon, alt)
logging.info(f"Updated {config_category} with Latitude: {lat} and Longitude {lon} and Altitude {alt}")
return

View File

@@ -67,9 +67,10 @@ def refresh_node_list():
return True
return False
def get_nodeNum(node_state):
myinfo = node_state.interface.getMyNodeInfo()
node_state.myNodeNum = myinfo['num']
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myNodeNum = myinfo['num']
return myNodeNum
def decimal_to_hex(decimal_number):
return f"!{decimal_number:08x}"

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.2"
version = "1.3.6"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}