Compare commits

..

2 Commits

Author SHA1 Message Date
pdxlocations
8c2f0429ac probably not working changes 2025-04-05 20:55:33 -07:00
pdxlocations
425ee85ef0 rename state 2025-04-05 20:06:48 -07:00
27 changed files with 646 additions and 1340 deletions

View File

@@ -1,6 +0,0 @@
{
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
}
}

View File

@@ -1,21 +1,17 @@
## Contact - A Console UI for Meshtastic
### (Formerly Curses Client for Meshtastic)
#### Powered by Meshtastic.org
### Install with:
```bash
pip install contact
```
This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores.
<img width="846" alt="Contact - Main UI Screenshot" src="https://github.com/user-attachments/assets/d2996bfb-2c6d-46a8-b820-92a9143375f4">
<br><br>
The settings dialogue can be accessed within the client or may be run standalone to configure your node by launching `contact --settings` or `contact -c`
The settings dialogue can be accessed within the client or may be run standalone to configure your node by launching `settings.py`
<img width="696" alt="Screenshot 2025-04-08 at 6 10 06PM" src="https://github.com/user-attachments/assets/3d5e3964-f009-4772-bd6e-91b907c65a3b" />
<img width="441" alt="Contact - Settings Dialogue" src="https://github.com/user-attachments/assets/dd47f52a-d4d8-4e40-8001-9ea53d87f816" />
## Message Persistence
@@ -66,4 +62,4 @@ contact --ble BlAddressOfDevice
To quickly connect to localhost, use:
```sh
contact -t
```
```

View File

@@ -1,133 +1,118 @@
#!/usr/bin/env python3
"""
'''
Contact - A Console UI for Meshtastic by http://github.com/pdxlocations
Powered by Meshtastic.org
Meshtastic® is a registered trademark of Meshtastic LLC.
Meshtastic software components are released under various licenses—see GitHub for details.
No warranty is provided. Use at your own risk.
"""
Meshtastic® is a registered trademark of Meshtastic LLC. Meshtastic software components are released under various licenses, see GitHub for details. No warranty is provided - use at your own risk.
'''
# Standard library
import contextlib
import curses
import os
from pubsub import pub
import sys
import io
import logging
import os
import subprocess
import sys
import threading
import traceback
import threading
# Third-party
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
from contact.ui.colors import setup_colors
from contact.ui.contact_ui import main_ui
from contact.ui.colors import setup_colors
from contact.ui.splash import draw_splash
import contact.ui.default_config as config
from contact.utilities.arg_parser import setup_parser
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.input_handlers import get_list_input
from contact.utilities.utils import get_channels, get_node_list, get_nodeNum
import contact.globals as globals
from contact.ui.ui_state import NodeState
# ------------------------------------------------------------------------------
# Environment & Logging Setup
# ------------------------------------------------------------------------------
node_state = NodeState()
# Set ncurses compatibility settings
os.environ["NCURSES_NO_UTF8_ACS"] = "1"
os.environ["LANG"] = "C.UTF-8"
os.environ.setdefault("TERM", "xterm-256color")
if os.environ.get("COLORTERM") == "gnome-terminal":
os.environ["TERM"] = "xterm-256color"
# Configure logging
# Run `tail -f client.log` in another terminal to view live
logging.basicConfig(
filename=config.log_file_path,
level=logging.INFO,
level=logging.INFO, # DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s"
)
globals.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb()
load_messages_from_db()
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
def main(stdscr):
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
parser = setup_parser()
args = parser.parse_args()
args = setup_parser().parse_args()
# Check if --settings was passed and run settings.py as a subprocess
if getattr(args, 'settings', False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface...")
logging.info("Initializing interface %s", args)
with globals.lock:
initialize_globals(args)
node_state.interface = initialize_interface(args)
if node_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(node_state)
node_state.interface.close()
node_state.interface = initialize_interface(args)
logging.info("Interface initialized")
get_nodeNum(node_state)
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, 'meshtastic.receive')
init_nodedb(node_state)
load_messages_from_db(node_state)
logging.info("Starting main UI")
main_ui(stdscr)
main_ui(stdscr, node_state)
except Exception as e:
console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
logging.error("An error occurred: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
raise
logging.error("Console output before crash:\n%s", console_output)
raise # Re-raise only unexpected errors
def start():
log_file = config.log_file_path
log_f = open(log_file, "a", buffering=1) # Enable line-buffering for immediate log writes
def start() -> None:
"""Launch curses wrapper and redirect logs to file."""
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stderr(log_f), contextlib.redirect_stdout(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C or Ctrl+X") # Clean exit logging
sys.exit(0) # Ensure a clean exit
except Exception as e:
logging.error("Fatal error in curses wrapper: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1) # Exit with an error code
if __name__ == "__main__":
start()

View File

@@ -1,4 +1,4 @@
interface = None
# interface = None
lock = None
display_log = False
all_messages = {}
@@ -6,7 +6,7 @@ channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
# myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0

View File

@@ -78,13 +78,6 @@ dns, "IPv4 DNS server", ""
rsyslog_server, "RSyslog server", ""
enabled_protocols, "Enabled protocols", ""
[config.network.ipv4_config]
title, "IPv4 Config", ""
ip, "IP", ""
gateway, "Gateway", ""
subnet, "Subnet", ""
dns, "DNS", ""
[config.display]
title, "Display"
screen_on_secs, "Screen on duration", "How long the screen remains on in seconds after the user button is pressed or messages are received."
@@ -112,35 +105,6 @@ theme, "Theme", ""
alert_enabled, "Alert enabled", ""
banner_enabled, "Banner enabled", ""
ring_tone_id, "Ring tone ID", ""
language, "Language", ""
node_filter, "Node Filter", ""
node_highlight, "Node Highlight", ""
calibration_data, "Calibration Data", ""
map_data, "Map Data", ""
[config.device_ui.node_filter]
title, "Node Filter"
unknown_switch, "Unknown Switch", ""
offline_switch, "Offline Switch", ""
public_key_switch, "Public Key Switch", ""
hops_away, "Hops Away", ""
position_switch, "Position Switch", ""
node_name, "Node Name", ""
channel, "Channel", ""
[config.device_ui.node_highlight]
title, "Node Highlight"
chat_switch, "Chat Switch", ""
position_switch, "Position Switch", ""
telemetry_switch, "Telemetry Switch", ""
iaq_switch, "IAQ Switch", ""
node_name, "Node Name", ""
[config.device_ui.map_data]
title, "Map Data"
home, "Home", ""
style, "Style", ""
follow_gps, "Follow GPS", ""
[config.lora]
title, "LoRa"

View File

@@ -1,34 +1,17 @@
import logging
import time
from datetime import datetime
from typing import Any
from contact.utilities.utils import refresh_node_list
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
from datetime import datetime
from contact.ui.contact_ui import draw_packetlog_win, draw_node_list, draw_messages_window, draw_channel_list, add_notification
from contact.utilities.db_handler import save_message_to_db, maybe_store_nodeinfo_in_db, get_name_from_database, update_node_info_in_db
import contact.ui.default_config as config
import contact.globals as globals
def on_receive(packet: dict[str, Any], interface: Any) -> None:
"""
Handles an incoming packet from a Meshtastic interface.
from datetime import datetime
def on_receive(packet, node_state):
Args:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with globals.lock:
# Update packet log
globals.packet_buffer.append(packet)
@@ -37,7 +20,7 @@ def on_receive(packet: dict[str, Any], interface: Any) -> None:
globals.packet_buffer = globals.packet_buffer[-20:]
if globals.display_log:
draw_packetlog_win()
draw_packetlog_win(node_state)
try:
if 'decoded' not in packet:
return
@@ -49,7 +32,7 @@ def on_receive(packet: dict[str, Any], interface: Any) -> None:
if packet['decoded']['portnum'] == 'NODEINFO_APP':
if "user" in packet['decoded'] and "longName" in packet['decoded']["user"]:
maybe_store_nodeinfo_in_db(packet)
maybe_store_nodeinfo_in_db(packet, node_state)
elif packet['decoded']['portnum'] == 'TEXT_MESSAGE_APP':
message_bytes = packet['decoded']['payload']
@@ -83,7 +66,7 @@ def on_receive(packet: dict[str, Any], interface: Any) -> None:
# Add received message to the messages list
message_from_id = packet['from']
message_from_string = get_name_from_database(message_from_id, type='short') + ":"
message_from_string = get_name_from_database(message_from_id, node_state, type='short') + ":"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
@@ -112,9 +95,9 @@ def on_receive(packet: dict[str, Any], interface: Any) -> None:
globals.all_messages[globals.channel_list[channel_number]].append((f"{config.message_prefix} {message_from_string} ", message_string))
if refresh_channels:
draw_channel_list()
draw_channel_list(node_state)
if refresh_messages:
draw_messages_window(True)
draw_messages_window(node_state, True)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)

View File

@@ -1,29 +1,17 @@
from datetime import datetime
from typing import Any
import google.protobuf.json_format
from meshtastic import BROADCAST_NUM
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from contact.utilities.db_handler import (
save_message_to_db,
update_ack_nak,
get_name_from_database,
is_chat_archived,
update_node_info_in_db,
)
from contact.utilities.db_handler import save_message_to_db, update_ack_nak, get_name_from_database, is_chat_archived, update_node_info_in_db
import contact.ui.default_config as config
import contact.globals as globals
ack_naks: dict[str, dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
ack_naks = {}
# Note "onAckNak" has special meaning to the API, thus the nonstandard naming convention
# See https://github.com/meshtastic/python/blob/master/meshtastic/mesh_interface.py#L462
def onAckNak(packet: dict[str, Any]) -> None:
"""
Handles incoming ACK/NAK response packets.
"""
def onAckNak(packet, node_state):
from contact.ui.contact_ui import draw_messages_window
request = packet['decoded']['requestId']
if(request not in ack_naks):
@@ -47,16 +35,14 @@ def onAckNak(packet: dict[str, Any]) -> None:
globals.all_messages[acknak['channel']][acknak['messageIndex']] = (config.sent_message_prefix + confirm_string + ": ", message)
update_ack_nak(acknak['channel'], acknak['timestamp'], message, ack_type)
update_ack_nak(acknak['channel'], acknak['timestamp'], message, ack_type, node_state)
channel_number = globals.channel_list.index(acknak['channel'])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
draw_messages_window()
draw_messages_window(node_state)
def on_response_traceroute(packet: dict[str, Any]) -> None:
"""
Handle traceroute response packets and render the route visually in the UI.
"""
def on_response_traceroute(packet, node_state):
"""on response for trace route"""
from contact.ui.contact_ui import draw_channel_list, draw_messages_window, add_notification
refresh_channels = False
@@ -70,18 +56,18 @@ def on_response_traceroute(packet: dict[str, Any]) -> None:
msg_str = "Traceroute to:\n"
route_str = get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}" # Start with destination of response
route_str = get_name_from_database(packet["to"], node_state, 'short') or f"{packet['to']:08x}" # Start with destination of response
# SNR list should have one more entry than the route, as the final destination adds its SNR also
lenTowards = 0 if "route" not in msg_dict else len(msg_dict["route"])
snrTowardsValid = "snrTowards" in msg_dict and len(msg_dict["snrTowards"]) == lenTowards + 1
if lenTowards > 0: # Loop through hops in route and add SNR if available
for idx, node_num in enumerate(msg_dict["route"]):
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, node_state, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrTowards"][idx] / 4) if snrTowardsValid and msg_dict["snrTowards"][idx] != UNK_SNR else "?") + "dB)"
# End with origin of response
route_str += " --> " + (get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}") \
route_str += " --> " + (get_name_from_database(packet["from"], node_state, 'short') or f"{packet['from']:08x}") \
+ " (" + (str(msg_dict["snrTowards"][-1] / 4) if snrTowardsValid and msg_dict["snrTowards"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route towards destination
@@ -91,15 +77,15 @@ def on_response_traceroute(packet: dict[str, Any]) -> None:
backValid = "hopStart" in packet and "snrBack" in msg_dict and len(msg_dict["snrBack"]) == lenBack + 1
if backValid:
msg_str += "Back:\n"
route_str = get_name_from_database(packet["from"], 'short') or f"{packet['from']:08x}" # Start with origin of response
route_str = get_name_from_database(packet["from"], node_state, 'short') or f"{packet['from']:08x}" # Start with origin of response
if lenBack > 0: # Loop through hops in routeBack and add SNR if available
for idx, node_num in enumerate(msg_dict["routeBack"]):
route_str += " --> " + (get_name_from_database(node_num, 'short') or f"{node_num:08x}") \
route_str += " --> " + (get_name_from_database(node_num, node_state, 'short') or f"{node_num:08x}") \
+ " (" + (str(msg_dict["snrBack"][idx] / 4) if msg_dict["snrBack"][idx] != UNK_SNR else "?") + "dB)"
# End with destination of response (us)
route_str += " --> " + (get_name_from_database(packet["to"], 'short') or f"{packet['to']:08x}") \
route_str += " --> " + (get_name_from_database(packet["to"], node_state, 'short') or f"{packet['to']:08x}") \
+ " (" + (str(msg_dict["snrBack"][-1] / 4) if msg_dict["snrBack"][-1] != UNK_SNR else "?") + "dB)"
msg_str += route_str + "\n" # Print the route back to us
@@ -108,7 +94,7 @@ def on_response_traceroute(packet: dict[str, Any]) -> None:
globals.channel_list.append(packet['from'])
refresh_channels = True
if(is_chat_archived(packet['from'])):
if(is_chat_archived(packet['from']), node_state):
update_node_info_in_db(packet['from'], chat_archived=False)
channel_number = globals.channel_list.index(packet['from'])
@@ -119,24 +105,21 @@ def on_response_traceroute(packet: dict[str, Any]) -> None:
add_notification(channel_number)
refresh_channels = True
message_from_string = get_name_from_database(packet['from'], type='short') + ":\n"
message_from_string = get_name_from_database(packet['from'], node_state, type='short') + ":\n"
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append((f"{config.message_prefix} {message_from_string}", msg_str))
if refresh_channels:
draw_channel_list()
draw_channel_list(node_state)
if refresh_messages:
draw_messages_window(True)
draw_messages_window(node_state, True)
save_message_to_db(globals.channel_list[channel_number], packet['from'], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = globals.myNodeNum
def send_message(message, node_state, destination=BROADCAST_NUM, channel=0):
myid = node_state.myNodeNum
send_on_channel = 0
channel_id = globals.channel_list[channel]
if isinstance(channel_id, int):
@@ -145,7 +128,7 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = globals.interface.sendText(
sent_message_data = node_state.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -185,12 +168,9 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
ack_naks[sent_message_data.id] = {'channel': channel_id, 'messageIndex': len(globals.all_messages[channel_id]) - 1, 'timestamp': timestamp}
def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
def send_traceroute(node_state):
r = mesh_pb2.RouteDiscovery()
globals.interface.sendData(
node_state.interface.sendData(
r,
destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,

View File

@@ -14,7 +14,7 @@ from contact.utilities.arg_parser import setup_parser
from contact.utilities.interfaces import initialize_interface
def main(stdscr: curses.window) -> None:
def main(stdscr):
output_capture = io.StringIO()
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
@@ -25,17 +25,17 @@ def main(stdscr: curses.window) -> None:
parser = setup_parser()
args = parser.parse_args()
interface = initialize_interface(args)
node_state.interface = initialize_interface(args)
if interface.localNode.localConfig.lora.region == 0:
if node_state.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface)
interface.close()
interface = initialize_interface(args)
set_region(node_state.interface)
node_state.interface.close()
node_state.interface = initialize_interface(args)
stdscr.clear()
stdscr.refresh()
settings_menu(stdscr, interface)
settings_menu(stdscr, node_state)
except Exception as e:
console_output = output_capture.getvalue()
@@ -52,6 +52,9 @@ logging.basicConfig( # Run `tail -f client.log` in another terminal to view live
)
if __name__ == "__main__":
from contact.ui.ui_state import NodeState
node_state = NodeState()
log_file = config.log_file_path
log_f = open(log_file, "a", buffering=1) # Enable line-buffering for immediate log writes

View File

@@ -12,7 +12,7 @@ COLOR_MAP = {
"white": curses.COLOR_WHITE
}
def setup_colors(reinit: bool = False) -> None:
def setup_colors(reinit=False):
"""
Initialize curses color pairs based on the COLOR_CONFIG.
"""
@@ -29,7 +29,7 @@ def setup_colors(reinit: bool = False) -> None:
print()
def get_color(category: str, bold: bool = False, reverse: bool = False, underline: bool = False) -> int:
def get_color(category, bold=False, reverse=False, underline=False):
"""
Retrieve a curses color pair with optional attributes.
"""

View File

@@ -13,7 +13,7 @@ import contact.ui.default_config as config
import contact.ui.dialog
import contact.globals as globals
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
def handle_resize(stdscr, firstrun, node_state):
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, function_win, packetlog_win, entry_win
# Calculate window max dimensions
@@ -80,17 +80,17 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
curses.curs_set(1)
try:
draw_function_win()
draw_channel_list()
draw_messages_window(True)
draw_node_list()
draw_function_win(node_state)
draw_channel_list(node_state)
draw_messages_window(node_state, True)
draw_node_list(node_state)
except:
# Resize events can come faster than we can re-draw, which can cause a curses error.
# In this case we'll see another curses.KEY_RESIZE in our key handler and draw again later.
pass
def main_ui(stdscr: curses.window) -> None:
def main_ui(stdscr, node_state):
global input_text
input_text = ""
stdscr.keypad(True)
@@ -111,7 +111,7 @@ def main_ui(stdscr: curses.window) -> None:
elif globals.current_window == 1:
scroll_messages(-1)
elif globals.current_window == 2:
scroll_nodes(-1)
scroll_nodes(-1, node_state)
elif char == curses.KEY_DOWN:
if globals.current_window == 0:
@@ -119,11 +119,11 @@ def main_ui(stdscr: curses.window) -> None:
elif globals.current_window == 1:
scroll_messages(1)
elif globals.current_window == 2:
scroll_nodes(1)
scroll_nodes(1, node_state)
elif char == curses.KEY_HOME:
if globals.current_window == 0:
select_channel(0)
select_channel(0, node_state)
elif globals.current_window == 1:
globals.selected_message = 0
refresh_pad(1)
@@ -132,7 +132,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_END:
if globals.current_window == 0:
select_channel(len(globals.channel_list) - 1)
select_channel(len(globals.channel_list) - 1, node_state)
elif globals.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = max(msg_line_count - get_msg_window_lines(), 0)
@@ -142,7 +142,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_PPAGE:
if globals.current_window == 0:
select_channel(globals.selected_channel - (channel_win.getmaxyx()[0] - 2)) # select_channel will bounds check for us
select_channel(globals.selected_channel - (channel_win.getmaxyx()[0] - 2), node_state) # select_channel will bounds check for us
elif globals.current_window == 1:
globals.selected_message = max(globals.selected_message - get_msg_window_lines(), 0)
refresh_pad(1)
@@ -151,7 +151,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_NPAGE:
if globals.current_window == 0:
select_channel(globals.selected_channel + (channel_win.getmaxyx()[0] - 2)) # select_channel will bounds check for us
select_channel(globals.selected_channel + (channel_win.getmaxyx()[0] - 2), node_state) # select_channel will bounds check for us
elif globals.current_window == 1:
msg_line_count = messages_pad.getmaxyx()[0]
globals.selected_message = min(globals.selected_message + get_msg_window_lines(), msg_line_count - get_msg_window_lines())
@@ -169,7 +169,7 @@ def main_ui(stdscr: curses.window) -> None:
channel_win.attrset(get_color("window_frame"))
channel_win.box()
channel_win.refresh()
highlight_line(False, 0, globals.selected_channel)
highlight_line(False, 0, globals.selected_channel, node_state)
refresh_pad(0)
if old_window == 1:
messages_win.attrset(get_color("window_frame"))
@@ -177,11 +177,11 @@ def main_ui(stdscr: curses.window) -> None:
messages_win.refresh()
refresh_pad(1)
elif old_window == 2:
draw_function_win()
draw_function_win(node_state)
nodes_win.attrset(get_color("window_frame"))
nodes_win.box()
nodes_win.refresh()
highlight_line(False, 2, globals.selected_node)
highlight_line(False, 2, globals.selected_node, node_state)
refresh_pad(2)
if globals.current_window == 0:
@@ -189,7 +189,7 @@ def main_ui(stdscr: curses.window) -> None:
channel_win.box()
channel_win.attrset(get_color("window_frame"))
channel_win.refresh()
highlight_line(True, 0, globals.selected_channel)
highlight_line(True, 0, globals.selected_channel, node_state)
refresh_pad(0)
elif globals.current_window == 1:
messages_win.attrset(get_color("window_frame_selected"))
@@ -198,12 +198,12 @@ def main_ui(stdscr: curses.window) -> None:
messages_win.refresh()
refresh_pad(1)
elif globals.current_window == 2:
draw_function_win()
draw_function_win(node_state)
nodes_win.attrset(get_color("window_frame_selected"))
nodes_win.box()
nodes_win.attrset(get_color("window_frame"))
nodes_win.refresh()
highlight_line(True, 2, globals.selected_node)
highlight_line(True, 2, globals.selected_node, node_state)
refresh_pad(2)
# Check for Esc
@@ -212,7 +212,7 @@ def main_ui(stdscr: curses.window) -> None:
# Check for Ctrl + t
elif char == chr(20):
send_traceroute()
send_traceroute(node_state)
curses.curs_set(0) # Hide cursor
contact.ui.dialog.dialog(stdscr, "Traceroute Sent", "Results will appear in messages window.\nNote: Traceroute is limited to once every 30 seconds.")
curses.curs_set(1) # Show cursor again
@@ -229,20 +229,20 @@ def main_ui(stdscr: curses.window) -> None:
globals.selected_channel = globals.channel_list.index(node_list[globals.selected_node])
if(is_chat_archived(globals.channel_list[globals.selected_channel])):
if(is_chat_archived(globals.channel_list[globals.selected_channel], node_state)):
update_node_info_in_db(globals.channel_list[globals.selected_channel], chat_archived=False)
globals.selected_node = 0
globals.current_window = 0
draw_node_list()
draw_channel_list()
draw_messages_window(True)
draw_node_list(node_state)
draw_channel_list(node_state)
draw_messages_window(node_state, True)
elif len(input_text) > 0:
# Enter key pressed, send user input as message
send_message(input_text, channel=globals.selected_channel)
draw_messages_window(True)
send_message(input_text, node_state, channel=globals.selected_channel)
draw_messages_window(node_state, True)
# Clear entry window and reset input text
input_text = ""
@@ -259,7 +259,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == "`": # ` Launch the settings interface
curses.curs_set(0)
settings_menu(stdscr, globals.interface)
settings_menu(stdscr, node_state.interface)
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
@@ -268,11 +268,11 @@ def main_ui(stdscr: curses.window) -> None:
# Display packet log
if globals.display_log is False:
globals.display_log = True
draw_messages_window(True)
draw_messages_window(node_state, True)
else:
globals.display_log = False
packetlog_win.erase()
draw_messages_window(True)
draw_messages_window(node_state, True)
elif char == curses.KEY_RESIZE:
input_text = ""
@@ -291,29 +291,29 @@ def main_ui(stdscr: curses.window) -> None:
del globals.channel_list[globals.selected_channel]
globals.selected_channel = min(globals.selected_channel, len(globals.channel_list) - 1)
select_channel(globals.selected_channel)
draw_channel_list()
draw_messages_window()
select_channel(globals.selected_channel, node_state)
draw_channel_list(node_state)
draw_messages_window(node_state)
if(globals.current_window == 2):
curses.curs_set(0)
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from nodedb?", "No", ["Yes", "No"])
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node], node_state)} from nodedb?", "No", ["Yes", "No"])
if confirmation == "Yes":
globals.interface.localNode.removeNode(globals.node_list[globals.selected_node])
node_state.interface.localNode.removeNode(globals.node_list[globals.selected_node])
# Directly modifying the interface from client code - good? Bad? If it's stupid but it works, it's not supid?
del(globals.interface.nodesByNum[globals.node_list[globals.selected_node]])
del(node_state.interface.nodesByNum[globals.node_list[globals.selected_node]])
# Convert to "!hex" representation that interface.nodes uses
hexid = f"!{hex(globals.node_list[globals.selected_node])[2:]}"
del(globals.interface.nodes[hexid])
del(node_state.interface.nodes[hexid])
globals.node_list.pop(globals.selected_node)
draw_messages_window()
draw_node_list()
draw_messages_window(node_state)
draw_node_list(node_state)
else:
draw_messages_window()
draw_messages_window(node_state)
curses.curs_set(1)
continue
@@ -325,25 +325,25 @@ def main_ui(stdscr: curses.window) -> None:
# ^F
elif char == chr(6):
if globals.current_window == 2:
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
selectedNode = node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]
curses.curs_set(0)
if 'isFavorite' not in selectedNode or selectedNode['isFavorite'] == False:
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", None, ["Yes", "No"])
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Favorite?", node_state, None, ["Yes", "No"])
if confirmation == "Yes":
globals.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
node_state.interface.localNode.setFavorite(globals.node_list[globals.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = True
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = True
refresh_node_list()
else:
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", None, ["Yes", "No"])
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Favorites?", node_state, None, ["Yes", "No"])
if confirmation == "Yes":
globals.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
node_state.interface.localNode.removeFavorite(globals.node_list[globals.selected_node])
# Maybe we shouldn't be modifying the nodedb, but maybe it should update itself
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = False
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isFavorite'] = False
refresh_node_list()
@@ -351,20 +351,20 @@ def main_ui(stdscr: curses.window) -> None:
elif char == chr(7):
if globals.current_window == 2:
selectedNode = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
selectedNode = node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]
curses.curs_set(0)
if 'isIgnored' not in selectedNode or selectedNode['isIgnored'] == False:
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", "No", ["Yes", "No"])
confirmation = get_list_input(f"Set {get_name_from_database(globals.node_list[globals.selected_node])} as Ignored?", node_state, "No", ["Yes", "No"])
if confirmation == "Yes":
globals.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = True
node_state.interface.localNode.setIgnored(globals.node_list[globals.selected_node])
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = True
else:
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", "No", ["Yes", "No"])
confirmation = get_list_input(f"Remove {get_name_from_database(globals.node_list[globals.selected_node])} from Ignored?", node_state, "No", ["Yes", "No"])
if confirmation == "Yes":
globals.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
globals.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = False
node_state.interface.localNode.removeIgnored(globals.node_list[globals.selected_node])
node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]['isIgnored'] = False
handle_resize(stdscr, False)
@@ -377,7 +377,7 @@ def main_ui(stdscr: curses.window) -> None:
def draw_channel_list() -> None:
def draw_channel_list(node_state):
channel_pad.erase()
win_height, win_width = channel_win.getmaxyx()
start_index = max(0, globals.selected_channel - (win_height - 3)) # Leave room for borders
@@ -388,9 +388,9 @@ def draw_channel_list() -> None:
for channel in globals.channel_list:
# Convert node number to long name if it's an integer
if isinstance(channel, int):
if is_chat_archived(channel):
if is_chat_archived(channel, node_state):
continue
channel_name = get_name_from_database(channel, type='long')
channel_name = get_name_from_database(channel, node_state, type='long')
if channel_name is None:
continue
channel = channel_name
@@ -418,7 +418,7 @@ def draw_channel_list() -> None:
refresh_pad(0)
def draw_messages_window(scroll_to_bottom: bool = False) -> None:
def draw_messages_window(node_state, scroll_to_bottom = False):
"""Update the messages window based on the selected channel and scroll position."""
messages_pad.erase()
@@ -459,9 +459,9 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
refresh_pad(1)
draw_packetlog_win()
draw_packetlog_win(node_state)
def draw_node_list() -> None:
def draw_node_list(node_state):
global nodes_pad
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
@@ -479,9 +479,9 @@ def draw_node_list() -> None:
logging.error("Traceback: %s", traceback.format_exc())
for i, node_num in enumerate(globals.node_list):
node = globals.interface.nodesByNum[node_num]
node = node_state.interface.nodesByNum[node_num]
secure = 'user' in node and 'publicKey' in node['user'] and node['user']['publicKey']
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, 'long')}".ljust(box_width - 2)[:box_width - 2]
node_str = f"{'🔐' if secure else '🔓'} {get_name_from_database(node_num, node_state, 'long')}".ljust(box_width - 2)[:box_width - 2]
color = "node_list"
if 'isFavorite' in node and node['isFavorite']:
color = "node_favorite"
@@ -501,21 +501,21 @@ def draw_node_list() -> None:
curses.curs_set(1)
entry_win.refresh()
def select_channel(idx: int) -> None:
def select_channel(idx, node_state):
old_selected_channel = globals.selected_channel
globals.selected_channel = max(0, min(idx, len(globals.channel_list) - 1))
draw_messages_window(True)
draw_messages_window(node_state, True)
# For now just re-draw channel list when clearing notifications, we can probably make this more efficient
if globals.selected_channel in globals.notifications:
remove_notification(globals.selected_channel)
draw_channel_list()
draw_channel_list(node_state)
return
highlight_line(False, 0, old_selected_channel)
highlight_line(True, 0, globals.selected_channel)
highlight_line(False, 0, old_selected_channel, node_state)
highlight_line(True, 0, globals.selected_channel, node_state)
refresh_pad(0)
def scroll_channels(direction: int) -> None:
def scroll_channels(direction, node_state):
new_selected_channel = globals.selected_channel + direction
if new_selected_channel < 0:
@@ -523,9 +523,9 @@ def scroll_channels(direction: int) -> None:
elif new_selected_channel >= len(globals.channel_list):
new_selected_channel = 0
select_channel(new_selected_channel)
select_channel(new_selected_channel, node_state)
def scroll_messages(direction: int) -> None:
def scroll_messages(direction):
globals.selected_message += direction
msg_line_count = messages_pad.getmaxyx()[0]
@@ -533,17 +533,17 @@ def scroll_messages(direction: int) -> None:
refresh_pad(1)
def select_node(idx: int) -> None:
def select_node(idx, node_state):
old_selected_node = globals.selected_node
globals.selected_node = max(0, min(idx, len(globals.node_list) - 1))
highlight_line(False, 2, old_selected_node)
highlight_line(True, 2, globals.selected_node)
highlight_line(False, 2, old_selected_node, node_state)
highlight_line(True, 2, globals.selected_node, node_state)
refresh_pad(2)
draw_function_win()
draw_function_win(node_state)
def scroll_nodes(direction: int) -> None:
def scroll_nodes(direction, node_state):
new_selected_node = globals.selected_node + direction
if new_selected_node < 0:
@@ -551,9 +551,9 @@ def scroll_nodes(direction: int) -> None:
elif new_selected_node >= len(globals.node_list):
new_selected_node = 0
select_node(new_selected_node)
select_node(new_selected_node, node_state)
def draw_packetlog_win() -> None:
def draw_packetlog_win(node_state):
columns = [10,10,15,30]
span = 0
@@ -574,10 +574,10 @@ def draw_packetlog_win() -> None:
break
# Format each field
from_id = get_name_from_database(packet['from'], 'short').ljust(columns[0])
from_id = get_name_from_database(packet['from'], node_state, 'short').ljust(columns[0])
to_id = (
"BROADCAST".ljust(columns[1]) if str(packet['to']) == "4294967295"
else get_name_from_database(packet['to'], 'short').ljust(columns[1])
else get_name_from_database(packet['to'], node_state, 'short').ljust(columns[1])
)
if 'decoded' in packet:
port = packet['decoded']['portnum'].ljust(columns[2])
@@ -602,7 +602,7 @@ def draw_packetlog_win() -> None:
curses.curs_set(1)
entry_win.refresh()
def search(win: int) -> None:
def search(win):
start_idx = globals.selected_node
select_func = select_node
@@ -645,10 +645,10 @@ def search(win: int) -> None:
entry_win.erase()
def draw_node_details() -> None:
def draw_node_details(node_state):
node = None
try:
node = globals.interface.nodesByNum[globals.node_list[globals.selected_node]]
node = node_state.interface.nodesByNum[globals.node_list[globals.selected_node]]
except KeyError:
return
@@ -693,7 +693,7 @@ def draw_node_details() -> None:
draw_centered_text_field(function_win, nodestr, 0, get_color("commands"))
def draw_help() -> None:
def draw_help():
cmds = ["↑→↓← = Select", " ENTER = Send", " ` = Settings", " ^P = Packet Log", " ESC = Quit", " ^t = Traceroute", " ^d = Archive Chat", " ^f = Favorite", " ^g = Ignore"]
function_str = ""
for s in cmds:
@@ -702,18 +702,19 @@ def draw_help() -> None:
draw_centered_text_field(function_win, function_str, 0, get_color("commands"))
def draw_function_win() -> None:
def draw_function_win(node_state):
if(globals.current_window == 2):
draw_node_details()
draw_node_details(node_state)
else:
draw_help()
def get_msg_window_lines() -> None:
def get_msg_window_lines():
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if globals.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height
def refresh_pad(window: int) -> None:
def refresh_pad(window):
# global messages_pad, nodes_pad, channel_pad
win_height = channel_win.getmaxyx()[0]
if(window == 1):
@@ -745,7 +746,7 @@ def refresh_pad(window: int) -> None:
box.getbegyx()[0] + 1, box.getbegyx()[1] + 1,
box.getbegyx()[0] + lines, box.getbegyx()[1] + box.getmaxyx()[1] - 2)
def highlight_line(highlight: bool, window: int, line: int) -> None:
def highlight_line(highlight, window, line, node_state):
pad = nodes_pad
color = get_color("node_list")
@@ -753,7 +754,7 @@ def highlight_line(highlight: bool, window: int, line: int) -> None:
if window == 2:
node_num = globals.node_list[line]
node = globals.interface.nodesByNum[node_num]
node = node_state.interface.nodesByNum[node_num]
if 'isFavorite' in node and node['isFavorite']:
color = get_color("node_favorite")
if 'isIgnored' in node and node['isIgnored']:
@@ -766,25 +767,25 @@ def highlight_line(highlight: bool, window: int, line: int) -> None:
pad.chgat(line, 1, select_len, color | curses.A_REVERSE if highlight else color)
def add_notification(channel_number: int) -> None:
def add_notification(channel_number):
if channel_number not in globals.notifications:
globals.notifications.append(channel_number)
def remove_notification(channel_number: int) -> None:
def remove_notification(channel_number):
if channel_number in globals.notifications:
globals.notifications.remove(channel_number)
def draw_text_field(win: curses.window, text: str, color: int) -> None:
def draw_text_field(win, text, color):
win.border()
win.addstr(1, 1, text, color)
def draw_centered_text_field(win: curses.window, text: str, y_offset: int, color: int) -> None:
def draw_centered_text_field(win, text, y_offset, color):
height, width = win.getmaxyx()
x = (width - len(text)) // 2
y = (height // 2) + y_offset
win.addstr(y, x, text, color)
win.refresh()
def draw_debug(value: str | int) -> None:
def draw_debug(value):
function_win.addstr(1, 1, f"debug: {value} ")
function_win.refresh()

View File

@@ -7,23 +7,17 @@ import sys
from contact.utilities.save_to_radio import save_changes
from contact.utilities.config_io import config_export, config_import
from contact.utilities.input_handlers import (
get_repeated_input,
get_text_input,
get_fixed32_input,
get_list_input,
get_admin_key_input,
)
from contact.utilities.input_handlers import get_repeated_input, get_text_input, get_fixed32_input, get_list_input, get_admin_key_input
from contact.ui.menus import generate_menu_from_protobuf
from contact.ui.colors import get_color
from contact.ui.dialog import dialog
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
from contact.ui.user_config import json_editor
from contact.ui.ui_state import MenuState
from contact.ui.navigation_utils import move_highlight
menu_state = MenuState()
# Constants
width = 80
save_option = "Save Changes"
@@ -44,21 +38,15 @@ config_folder = os.path.join(locals_dir, "node-configs")
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
# Aliases
Segment = tuple[str, str, bool, bool]
WrappedLine = list[Segment]
def display_menu(
menu_state: MenuState,
) -> tuple[object, object]: # curses.window or pad types
def display_menu(menu_state):
min_help_window_height = 6
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
# Determine the available height for the menu
max_menu_height = curses.LINES
menu_height = min(max_menu_height - min_help_window_height, num_items + 5)
max_menu_height = curses.LINES
menu_height = min(max_menu_height - min_help_window_height, num_items + 5)
start_y = (curses.LINES - menu_height) // 2 - (min_help_window_height // 2)
start_x = (curses.COLS - width) // 2
@@ -79,7 +67,7 @@ def display_menu(
header = " > ".join(word.title() for word in menu_state.menu_path)
if len(header) > width - 4:
header = header[: width - 7] + "..."
header = header[:width - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
transformed_path = transform_menu_path(menu_state.menu_path)
@@ -87,118 +75,55 @@ def display_menu(
for idx, option in enumerate(menu_state.current_menu):
field_info = menu_state.current_menu[option]
current_value = field_info[1] if isinstance(field_info, tuple) else ""
full_key = ".".join(transformed_path + [option])
full_key = '.'.join(transformed_path + [option])
display_name = field_mapping.get(full_key, option)
display_option = f"{display_name}"[: width // 2 - 2]
display_value = f"{current_value}"[: width // 2 - 4]
display_option = f"{display_name}"[:width // 2 - 2]
display_value = f"{current_value}"[:width // 2 - 4]
try:
color = get_color(
(
"settings_sensitive"
if option in sensitive_settings
else "settings_default"
),
reverse=(idx == menu_state.selected_index),
)
menu_pad.addstr(
idx,
0,
f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8),
color,
)
color = get_color("settings_sensitive" if option in sensitive_settings else "settings_default", reverse=(idx == menu_state.selected_index))
menu_pad.addstr(idx, 0, f"{display_option:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
except curses.error:
pass
if menu_state.show_save_option:
save_position = menu_height - 2
menu_win.addstr(
save_position,
(width - len(save_option)) // 2,
save_option,
get_color(
"settings_save",
reverse=(menu_state.selected_index == len(menu_state.current_menu)),
),
)
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))))
# Draw help window with dynamically updated max_help_lines
draw_help_window(
start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state
)
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
menu_win.refresh()
menu_pad.refresh(
menu_state.start_index[-1],
0,
menu_win.getbegyx()[0] + 3,
menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0]
+ 3
+ menu_win.getmaxyx()[0]
- 5
- (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4
)
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
visible_height = (
menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
)
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
draw_arrows(menu_win, visible_height, max_index, menu_state)
return menu_win, menu_pad
def draw_help_window(
menu_start_y: int,
menu_start_x: int,
menu_height: int,
max_help_lines: int,
transformed_path: list[str],
menu_state: MenuState,
) -> None:
def draw_help_window(menu_start_y, menu_start_x, menu_height, max_help_lines, transformed_path, menu_state):
global help_win
if "help_win" not in globals():
if 'help_win' not in globals():
help_win = None # Initialize if it does not exist
selected_option = (
list(menu_state.current_menu.keys())[menu_state.selected_index]
if menu_state.current_menu
else None
)
selected_option = list(menu_state.current_menu.keys())[menu_state.selected_index] if menu_state.current_menu else None
help_y = menu_start_y + menu_height
help_win = update_help_window(
help_win,
help_text,
transformed_path,
selected_option,
max_help_lines,
width,
help_y,
menu_start_x,
)
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_start_x)
def update_help_window(
help_win: object, # curses window or None
help_text: dict[str, str],
transformed_path: list[str],
selected_option: str | None,
max_help_lines: int,
width: int,
help_y: int,
help_x: int,
) -> object: # returns a curses window
def update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, help_x):
"""Handles rendering the help window consistently."""
wrapped_help = get_wrapped_help_text(
help_text, transformed_path, selected_option, width, max_help_lines
)
wrapped_help = get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_help_lines)
help_height = min(len(wrapped_help) + 2, max_help_lines + 2) # +2 for border
help_height = max(help_height, 3) # Ensure at least 3 rows (1 text + border)
@@ -234,41 +159,26 @@ def update_help_window(
return help_win
def get_wrapped_help_text(
help_text: dict[str, str],
transformed_path: list[str],
selected_option: str | None,
width: int,
max_lines: int,
) -> list[WrappedLine]:
def get_wrapped_help_text(help_text, transformed_path, selected_option, width, max_lines):
"""Fetches and formats help text for display, ensuring it fits within the allowed lines."""
full_help_key = (
".".join(transformed_path + [selected_option]) if selected_option else None
)
full_help_key = '.'.join(transformed_path + [selected_option]) if selected_option else None
help_content = help_text.get(full_help_key, "No help available.")
wrap_width = max(width - 6, 10) # Ensure a valid wrapping width
# Color replacements
color_mappings = {
r"\[warning\](.*?)\[/warning\]": (
"settings_warning",
True,
False,
), # Red for warnings
r"\[note\](.*?)\[/note\]": ("settings_note", True, False), # Green for notes
r"\[underline\](.*?)\[/underline\]": (
"settings_default",
False,
True,
), # Underline
r"\\033\[31m(.*?)\\033\[0m": ("settings_warning", True, False), # Red text
r"\\033\[32m(.*?)\\033\[0m": ("settings_note", True, False), # Green text
r"\\033\[4m(.*?)\\033\[0m": ("settings_default", False, True), # Underline
r'\[warning\](.*?)\[/warning\]': ('settings_warning', True, False), # Red for warnings
r'\[note\](.*?)\[/note\]': ('settings_note', True, False), # Green for notes
r'\[underline\](.*?)\[/underline\]': ('settings_default', False, True), # Underline
r'\\033\[31m(.*?)\\033\[0m': ('settings_warning', True, False), # Red text
r'\\033\[32m(.*?)\\033\[0m': ('settings_note', True, False), # Green text
r'\\033\[4m(.*?)\\033\[0m': ('settings_default', False, True) # Underline
}
def extract_ansi_segments(text: str) -> list[Segment]:
def extract_ansi_segments(text):
"""Extracts and replaces ANSI color codes, ensuring spaces are preserved."""
matches = []
last_pos = 0
@@ -277,9 +187,7 @@ def get_wrapped_help_text(
# Find all matches and store their positions
for pattern, (color, bold, underline) in color_mappings.items():
for match in re.finditer(pattern, text):
pattern_matches.append(
(match.start(), match.end(), match.group(1), color, bold, underline)
)
pattern_matches.append((match.start(), match.end(), match.group(1), color, bold, underline))
# Sort matches by start position to process sequentially
pattern_matches.sort(key=lambda x: x[0])
@@ -289,7 +197,7 @@ def get_wrapped_help_text(
if last_pos < start:
segment = text[last_pos:start]
matches.append((segment, "settings_default", False, False))
# Append the colored segment
matches.append((content, color, bold, underline))
last_pos = end
@@ -300,14 +208,14 @@ def get_wrapped_help_text(
return matches
def wrap_ansi_text(segments: list[Segment], wrap_width: int) -> list[WrappedLine]:
def wrap_ansi_text(segments, wrap_width):
"""Wraps text while preserving ANSI formatting and spaces."""
wrapped_lines = []
line_buffer = []
line_length = 0
for text, color, bold, underline in segments:
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
words = re.findall(r'\S+|\s+', text) # Capture words and spaces separately
for word in words:
word_length = len(word)
@@ -337,76 +245,64 @@ def get_wrapped_help_text(
# Trim and add ellipsis if needed
if len(wrapped_help) > max_lines:
wrapped_help = wrapped_help[:max_lines]
wrapped_help[-1].append(("...", "settings_default", False, False))
wrapped_help = wrapped_help[:max_lines]
wrapped_help[-1].append(("...", "settings_default", False, False))
return wrapped_help
# def move_highlight(
# old_idx: int,
# options: list[str],
# menu_win: object,
# menu_pad: object,
# help_win: object,
# help_text: dict[str, str],
# max_help_lines: int,
# menu_state: MenuState
# ) -> None:
def move_highlight(old_idx, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state):
if old_idx == menu_state.selected_index: # No-op
return
# if old_idx == menu_state.selected_index: # No-op
# return
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# Adjust menu_state.start_index only when moving out of visible range
if menu_state.selected_index == max_index and menu_state.show_save_option:
pass
elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
menu_state.start_index[-1] = menu_state.selected_index
elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
menu_state.start_index[-1] = menu_state.selected_index - visible_height
pass
# # Adjust menu_state.start_index only when moving out of visible range
# if menu_state.selected_index == max_index and menu_state.show_save_option:
# pass
# elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
# menu_state.start_index[-1] = menu_state.selected_index
# elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
# menu_state.start_index[-1] = menu_state.selected_index - visible_height
# pass
# Ensure menu_state.start_index is within bounds
menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# # Ensure menu_state.start_index is within bounds
# menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# Clear old selection
if menu_state.show_save_option and old_idx == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
else:
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
# # Clear old selection
# if menu_state.show_save_option and old_idx == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
# else:
# menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
# Highlight new selection
if menu_state.show_save_option and menu_state.selected_index == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
else:
menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
# # Highlight new selection
# if menu_state.show_save_option and menu_state.selected_index == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
# else:
# menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# menu_win.refresh()
# Update help window
transformed_path = transform_menu_path(menu_state.menu_path)
selected_option = options[menu_state.selected_index] if menu_state.selected_index < len(options) else None
help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_win.getbegyx()[1])
# # Refresh pad only if scrolling is needed
# menu_pad.refresh(menu_state.start_index[-1], 0,
# menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
# menu_win.getbegyx()[0] + 3 + visible_height,
# menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# # Update help window
# transformed_path = transform_menu_path(menu_state.menu_path)
# selected_option = options[menu_state.selected_index] if menu_state.selected_index < len(options) else None
# help_y = menu_win.getbegyx()[0] + menu_win.getmaxyx()[0]
# help_win = update_help_window(help_win, help_text, transformed_path, selected_option, max_help_lines, width, help_y, menu_win.getbegyx()[1])
# draw_arrows(menu_win, visible_height, max_index, menu_state)
draw_arrows(menu_win, visible_height, max_index, menu_state)
def draw_arrows(
win: object, visible_height: int, max_index: int, menu_state: MenuState
) -> None:
def draw_arrows(win, visible_height, max_index, menu_state):
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if menu_state.show_save_option else 0)
mi = max_index - (2 if menu_state.show_save_option else 0)
if visible_height < mi:
if menu_state.start_index[-1] > 0:
@@ -414,46 +310,35 @@ def draw_arrows(
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if mi - menu_state.start_index[-1] >= visible_height + (
0 if menu_state.show_save_option else 1
):
if mi - menu_state.start_index[-1] >= visible_height + (0 if menu_state.show_save_option else 1) :
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
def settings_menu(stdscr: object, interface: object) -> None:
def settings_menu(stdscr, node_state):
curses.update_lines_cols()
menu = generate_menu_from_protobuf(interface)
menu = generate_menu_from_protobuf(node_state)
menu_state.current_menu = menu["Main Menu"]
menu_state.menu_path = ["Main Menu"]
modified_settings = {}
modified_settings = {}
need_redraw = True
menu_state.show_save_option = False
while True:
if need_redraw:
if(need_redraw):
options = list(menu_state.current_menu.keys())
menu_state.show_save_option = (
(
len(menu_state.menu_path) > 2
and (
"Radio Settings" in menu_state.menu_path
or "Module Settings" in menu_state.menu_path
)
)
or (
len(menu_state.menu_path) == 2
and "User Settings" in menu_state.menu_path
)
or (
len(menu_state.menu_path) == 3
and "Channels" in menu_state.menu_path
)
len(menu_state.menu_path) > 2 and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
) or (
len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path
) or (
len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path
)
# Display the menu
@@ -468,52 +353,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
# max_help_lines = 4
if key == curses.KEY_UP:
old_idx = menu_state.selected_index
menu_state.selected_index = (
max_index
if menu_state.selected_index == 0
else menu_state.selected_index - 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
help_win=help_win,
help_updater=update_help_window,
field_mapping=help_text,
menu_path=transform_menu_path(menu_state.menu_path),
max_help_lines=max_help_lines,
sensitive_mode=True,
)
old_selected_index = menu_state.selected_index
menu_state.selected_index = max_index if menu_state.selected_index == 0 else menu_state.selected_index - 1
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
elif key == curses.KEY_DOWN:
old_idx = menu_state.selected_index
menu_state.selected_index = (
0
if menu_state.selected_index == max_index
else menu_state.selected_index + 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
help_win=help_win,
help_updater=update_help_window,
field_mapping=help_text,
menu_path=transform_menu_path(menu_state.menu_path),
max_help_lines=max_help_lines,
sensitive_mode=True,
)
old_selected_index = menu_state.selected_index
menu_state.selected_index = 0 if menu_state.selected_index == max_index else menu_state.selected_index + 1
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
elif key == curses.KEY_RESIZE:
need_redraw = True
@@ -526,25 +373,11 @@ def settings_menu(stdscr: object, interface: object) -> None:
help_win.refresh()
elif key == ord("\t") and menu_state.show_save_option:
old_idx = menu_state.selected_index
old_selected_index = menu_state.selected_index
menu_state.selected_index = max_index
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
help_win=help_win,
help_updater=update_help_window,
field_mapping=help_text,
menu_path=transform_menu_path(menu_state.menu_path),
max_help_lines=max_help_lines,
sensitive_mode=True,
)
elif key == curses.KEY_RIGHT or key == ord("\n"):
move_highlight(old_selected_index, options, menu_win, menu_pad, help_win, help_text, max_help_lines, menu_state)
elif key == curses.KEY_RIGHT or key == ord('\n'):
need_redraw = True
menu_state.start_index.append(0)
menu_win.erase()
@@ -555,10 +388,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_win.refresh()
help_win.refresh()
if menu_state.show_save_option and menu_state.selected_index == len(
options
):
save_changes(interface, modified_settings, menu_state)
if menu_state.show_save_option and menu_state.selected_index == len(options):
save_changes(node_state, modified_settings, menu_state)
modified_settings.clear()
logging.info("Changes Saved")
@@ -585,19 +416,13 @@ def settings_menu(stdscr: object, interface: object) -> None:
filename += ".yaml"
try:
config_text = config_export(interface)
config_text = config_export(node_state)
yaml_file_path = os.path.join(config_folder, filename)
if os.path.exists(yaml_file_path):
overwrite = get_list_input(
f"{filename} already exists. Overwrite?",
None,
["Yes", "No"],
)
overwrite = get_list_input(f"{filename} already exists. Overwrite?", None, ["Yes", "No"])
if overwrite == "No":
logging.info(
"Export cancelled: User chose not to overwrite."
)
logging.info("Export cancelled: User chose not to overwrite.")
menu_state.start_index.pop()
continue # Return to menu
os.makedirs(os.path.dirname(yaml_file_path), exist_ok=True)
@@ -608,30 +433,22 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
continue
except PermissionError:
logging.error(
f"Permission denied: Unable to write to {yaml_file_path}"
)
logging.error(f"Permission denied: Unable to write to {yaml_file_path}")
except OSError as e:
logging.error(f"OS error while saving config: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
menu_state.start_index.pop()
continue
elif selected_option == "Load Config File":
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(
os.listdir(config_folder)
):
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog(stdscr, "", " No config files found. Export a config first.")
continue # Return to menu
file_list = [
f
for f in os.listdir(config_folder)
if os.path.isfile(os.path.join(config_folder, f))
]
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
@@ -641,68 +458,52 @@ def settings_menu(stdscr: object, interface: object) -> None:
filename = get_list_input("Choose a config file", None, file_list)
if filename:
file_path = os.path.join(config_folder, filename)
overwrite = get_list_input(
f"Are you sure you want to load {filename}?",
None,
["Yes", "No"],
)
overwrite = get_list_input(f"Are you sure you want to load {filename}?", None, ["Yes", "No"])
if overwrite == "Yes":
config_import(interface, file_path)
config_import(node_state, file_path)
menu_state.start_index.pop()
continue
elif selected_option == "Config URL":
current_value = interface.localNode.getURL()
current_value = node_state.interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}")
if new_value is not None:
current_value = new_value
overwrite = get_list_input(
f"Are you sure you want to load this config?",
None,
["Yes", "No"],
)
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
if overwrite == "Yes":
interface.localNode.setURL(new_value)
node_state.interface.localNode.setURL(new_value)
logging.info(f"New Config URL sent to node")
menu_state.start_index.pop()
continue
elif selected_option == "Reboot":
confirmation = get_list_input(
"Are you sure you want to Reboot?", None, ["Yes", "No"]
)
confirmation = get_list_input("Are you sure you want to Reboot?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.reboot()
node_state.interface.localNode.reboot()
logging.info(f"Node Reboot Requested by menu")
menu_state.start_index.pop()
continue
elif selected_option == "Reset Node DB":
confirmation = get_list_input(
"Are you sure you want to Reset Node DB?", None, ["Yes", "No"]
)
confirmation = get_list_input("Are you sure you want to Reset Node DB?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.resetNodeDb()
node_state.interface.localNode.resetNodeDb()
logging.info(f"Node DB Reset Requested by menu")
menu_state.start_index.pop()
continue
elif selected_option == "Shutdown":
confirmation = get_list_input(
"Are you sure you want to Shutdown?", None, ["Yes", "No"]
)
confirmation = get_list_input("Are you sure you want to Shutdown?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.shutdown()
node_state.interface.localNode.shutdown()
logging.info(f"Node Shutdown Requested by menu")
menu_state.start_index.pop()
continue
elif selected_option == "Factory Reset":
confirmation = get_list_input(
"Are you sure you want to Factory Reset?", None, ["Yes", "No"]
)
confirmation = get_list_input("Are you sure you want to Factory Reset?", None, ["Yes", "No"])
if confirmation == "Yes":
interface.localNode.factoryReset()
node_state.interface.localNode.factoryReset()
logging.info(f"Factory Reset Requested by menu")
menu_state.start_index.pop()
continue
@@ -719,32 +520,26 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.selected_index = 4
continue
# need_redraw = True
field_info = menu_state.current_menu.get(selected_option)
if isinstance(field_info, tuple):
field, current_value = field_info
# Transform the menu path to get the full key
transformed_path = transform_menu_path(menu_state.menu_path)
full_key = ".".join(transformed_path + [selected_option])
full_key = '.'.join(transformed_path + [selected_option])
# Fetch human-readable name from field_mapping
human_readable_name = field_mapping.get(full_key, selected_option)
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
if selected_option in ['longName', 'shortName', 'isLicensed']:
if selected_option in ['longName', 'shortName']:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
elif selected_option == "isLicensed":
new_value = get_list_input(
f"{human_readable_name} is currently: {current_value}",
str(current_value),
["True", "False"],
)
elif selected_option == 'isLicensed':
new_value = get_list_input(f"{human_readable_name} is currently: {current_value}", str(current_value), ["True", "False"])
new_value = new_value == "True"
menu_state.current_menu[selected_option] = (field, new_value)
@@ -753,82 +548,57 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
elif selected_option in ['latitude', 'longitude', 'altitude']:
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
for option in ["latitude", "longitude", "altitude"]:
for option in ['latitude', 'longitude', 'altitude']:
if option in menu_state.current_menu:
modified_settings[option] = menu_state.current_menu[option][
1
]
modified_settings[option] = menu_state.current_menu[option][1]
menu_state.start_index.pop()
elif selected_option == "admin_key":
new_values = get_admin_key_input(current_value)
new_value = (
current_value
if new_values is None
else [base64.b64decode(key) for key in new_values]
)
new_value = current_value if new_values is None else [base64.b64decode(key) for key in new_values]
menu_state.start_index.pop()
elif field.type == 8: # Handle boolean type
new_value = get_list_input(
human_readable_name, str(current_value), ["True", "False"]
)
if new_value == "Not Set":
pass # Leave it as-is
else:
new_value = new_value == "True" or new_value is True
new_value = get_list_input(human_readable_name, str(current_value), ["True", "False"])
new_value = new_value == "True" or new_value is True
menu_state.start_index.pop()
elif (
field.label == field.LABEL_REPEATED
): # Handle repeated field - Not currently used
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value)
new_value = (
current_value if new_value is None else new_value.split(", ")
)
new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop()
elif field.enum_type: # Enum field
enum_options = {v.name: v.number for v in field.enum_type.values}
new_value_name = get_list_input(
human_readable_name, current_value, list(enum_options.keys())
)
new_value_name = get_list_input(human_readable_name, current_value, list(enum_options.keys()))
new_value = enum_options.get(new_value_name, current_value)
menu_state.start_index.pop()
elif field.type == 7: # Field type 7 corresponds to FIXED32
elif field.type == 7: # Field type 7 corresponds to FIXED32
new_value = get_fixed32_input(current_value)
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
elif field.type == 13: # Field type 13 corresponds to UINT32
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
elif field.type == 2: # Field type 13 corresponds to INT64
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}"
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()
for key in menu_state.menu_path[3:]: # Skip "Main Menu"
modified_settings = modified_settings.setdefault(key, {})
@@ -837,14 +607,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Convert enum string to int
if field and field.enum_type:
enum_value_descriptor = field.enum_type.values_by_number.get(
new_value
)
new_value = (
enum_value_descriptor.name
if enum_value_descriptor
else new_value
)
enum_value_descriptor = field.enum_type.values_by_number.get(new_value)
new_value = enum_value_descriptor.name if enum_value_descriptor else new_value
menu_state.current_menu[selected_option] = (field, new_value)
else:
@@ -853,6 +617,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.menu_index.append(menu_state.selected_index)
menu_state.selected_index = 0
elif key == curses.KEY_LEFT:
need_redraw = True
@@ -876,15 +641,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.current_menu = menu_state.current_menu.get(step, {})
menu_state.selected_index = menu_state.menu_index.pop()
menu_state.start_index.pop()
elif key == 27: # Escape key
menu_win.erase()
menu_win.refresh()
break
def set_region(interface: object) -> None:
node = interface.getNode("^local")
def set_region(node_state):
node = node_state.interface.getNode('^local')
device_config = node.localConfig
lora_descriptor = device_config.lora.DESCRIPTOR
@@ -894,12 +658,10 @@ def set_region(interface: object) -> None:
regions = list(region_name_to_number.keys())
new_region_name = get_list_input("Select your region:", "UNSET", regions)
new_region_name = get_list_input('Select your region:', 'UNSET', regions)
# Convert region name to corresponding enum number
new_region_number = region_name_to_number.get(
new_region_name, 0
) # Default to 0 if not found
new_region_number = region_name_to_number.get(new_region_name, 0) # Default to 0 if not found
node.localConfig.lora.region = new_region_number
node.writeConfig("lora")
node.writeConfig("lora")

View File

@@ -1,5 +1,5 @@
import json
import logging
import json
import os
# Get the parent directory of the script
@@ -11,11 +11,11 @@ json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
def format_json_single_line_arrays(data: dict[str, object], indent: int = 4) -> str:
def format_json_single_line_arrays(data, indent=4):
"""
Formats JSON with arrays on a single line while keeping other elements properly indented.
"""
def format_value(value: object, current_indent: int) -> str:
def format_value(value, current_indent):
if isinstance(value, dict):
items = []
for key, val in value.items():
@@ -31,7 +31,7 @@ def format_json_single_line_arrays(data: dict[str, object], indent: int = 4) ->
return format_value(data, indent)
# Recursive function to check and update nested dictionaries
def update_dict(default: dict[str, object], actual: dict[str, object]) -> bool:
def update_dict(default, actual):
updated = False
for key, value in default.items():
if key not in actual:
@@ -42,7 +42,7 @@ def update_dict(default: dict[str, object], actual: dict[str, object]) -> bool:
updated = update_dict(value, actual[key]) or updated
return updated
def initialize_config() -> dict[str, object]:
def initialize_config():
COLOR_CONFIG_DARK = {
"default": ["white", "black"],
"background": [" ", "black"],
@@ -161,7 +161,7 @@ def initialize_config() -> dict[str, object]:
return loaded_config
def assign_config_variables(loaded_config: dict[str, object]) -> None:
def assign_config_variables(loaded_config):
# Assign values to local variables
global db_file_path, log_file_path, message_prefix, sent_message_prefix

View File

@@ -1,7 +1,7 @@
import curses
from contact.ui.colors import get_color
def dialog(stdscr: curses.window, title: str, message: str) -> None:
def dialog(stdscr, title, message):
height, width = stdscr.getmaxyx()
# Calculate dialog dimensions

View File

@@ -1,27 +1,19 @@
import base64
import logging
import os
from collections import OrderedDict
from typing import Any
from google.protobuf.message import Message
from meshtastic.protobuf import channel_pb2, config_pb2, module_config_pb2
from meshtastic.protobuf import config_pb2, module_config_pb2, channel_pb2
import logging
import base64
import os
locals_dir = os.path.dirname(os.path.abspath(__file__))
translation_file = os.path.join(locals_dir, "localisations", "en.ini")
def encode_if_bytes(value: Any) -> str:
def encode_if_bytes(value):
"""Encode byte values to base64 string."""
if isinstance(value, bytes):
return base64.b64encode(value).decode('utf-8')
return value
def extract_fields(
message_instance: Message,
current_config: Message | dict[str, Any] | None = None
) -> dict[str, Any]:
def extract_fields(message_instance, current_config=None):
if isinstance(current_config, dict): # Handle dictionaries
return {key: (None, encode_if_bytes(current_config.get(key, "Not Set"))) for key in current_config}
@@ -55,10 +47,7 @@ def extract_fields(
menu[field.name] = (field, encode_if_bytes(current_value))
return menu
def generate_menu_from_protobuf(interface: object) -> dict[str, Any]:
"""
Builds the full settings menu structure from the protobuf definitions.
"""
def generate_menu_from_protobuf(interface):
menu_structure = {"Main Menu": {}}
# Add User Settings

View File

@@ -1,120 +0,0 @@
# contact/ui/navigation_utils.py
import curses
from contact.ui.colors import get_color
save_option = "Save Changes"
sensitive_settings = ["Reboot", "Reset Node DB", "Shutdown", "Factory Reset"]
def move_highlight(
old_idx: int,
new_idx: int,
options: list[str],
win: curses.window,
pad: curses.window,
*,
start_index_ref: list[int] = None,
selected_index: int = None,
max_help_lines: int = 0,
show_save: bool = False,
help_win: curses.window = None,
help_updater=None,
field_mapping=None,
menu_path: list[str] = None,
width: int = 80,
sensitive_mode: bool = False,
):
if old_idx == new_idx:
return
max_index = len(options) + (1 if show_save else 0) - 1
visible_height = win.getmaxyx()[0] - 5 - (2 if show_save else 0)
# Scrolling logic
if start_index_ref is not None:
if new_idx == max_index and show_save:
pass
elif new_idx < start_index_ref[0]:
start_index_ref[0] = new_idx
elif new_idx >= start_index_ref[0] + visible_height:
start_index_ref[0] = new_idx - visible_height
start_index_ref[0] = max(
0, min(start_index_ref[0], max_index - visible_height + 1)
)
scroll = start_index_ref[0] if start_index_ref else 0
# Clear previous highlight
if show_save and old_idx == max_index:
win.chgat(
win.getmaxyx()[0] - 2,
(width - len(save_option)) // 2,
len(save_option),
get_color("settings_save"),
)
else:
color = (
"settings_sensitive"
if sensitive_mode and options[old_idx] in sensitive_settings
else "settings_default"
)
pad.chgat(old_idx, 0, pad.getmaxyx()[1], get_color(color))
# Apply new highlight
if show_save and new_idx == max_index:
win.chgat(
win.getmaxyx()[0] - 2,
(width - len(save_option)) // 2,
len(save_option),
get_color("settings_save", reverse=True),
)
else:
color = (
"settings_sensitive"
if sensitive_mode and options[new_idx] in sensitive_settings
else "settings_default"
)
pad.chgat(new_idx, 0, pad.getmaxyx()[1], get_color(color, reverse=True))
win.refresh()
pad.refresh(
scroll,
0,
win.getbegyx()[0] + 3,
win.getbegyx()[1] + 4,
win.getbegyx()[0] + 3 + visible_height,
win.getbegyx()[1] + win.getmaxyx()[1] - 4,
)
# Optional help update
if help_win and help_updater and menu_path and selected_index is not None:
selected_option = (
options[selected_index] if selected_index < len(options) else None
)
help_y = win.getbegyx()[0] + win.getmaxyx()[0]
help_updater(
help_win,
field_mapping,
menu_path,
selected_option,
max_help_lines,
width,
help_y,
win.getbegyx()[1],
)
def draw_arrows(
win: curses.window, visible_height: int, max_index: int, start_index: int
) -> None:
if visible_height < max_index:
if start_index > 0:
win.addstr(3, 2, "", get_color("settings_default"))
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if max_index - start_index > visible_height:
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))

View File

@@ -1,8 +1,7 @@
import curses
from contact.ui.colors import get_color
def draw_splash(stdscr: object) -> None:
"""Draw the splash screen with a logo and connecting message."""
def draw_splash(stdscr):
curses.curs_set(0)
stdscr.clear()

View File

@@ -1,10 +1,27 @@
from typing import Any
class MenuState:
def __init__(self):
self.menu_index: list[int]= [] # Row we left the previous menus
self.start_index: list[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: dict[str, Any] | list[Any] | str | int = {} # Contents of the current menu
self.menu_path: list[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'
self.menu_index = [] # Row we left the previous menus
self.start_index = [0] # Row to start the menu if it doesn't all fit
self.selected_index = 0 # Selected Row
self.current_menu = {} # Contents of the current menu
self.menu_path = [] # Menu Path
self.show_save_option = False
class NodeState:
def __init__(self):
self.interface = None
self.myNodeNum = 0
# self.lock = None
# self.display_log = False
# self.all_messages = {}
# self.channel_list = []
# self.notifications = []
# self.packet_buffer = []
# self.node_list = []
# self.selected_channel = 0
# self.selected_message = 0
# self.selected_node = 0
# self.current_window = 0

View File

@@ -1,33 +1,26 @@
import os
import json
import curses
from typing import Any
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
from contact.utilities.input_handlers import get_list_input
from contact.ui.navigation_utils import move_highlight
width = 80
save_option = "Save Changes"
sensitive_settings = []
def edit_color_pair(key, current_value):
def edit_color_pair(key: str, current_value: list[str]) -> list[str]:
"""
Allows the user to select a foreground and background color for a key.
"""
color_list = [" "] + list(COLOR_MAP.keys())
fg_color = get_list_input(
f"Select Foreground Color for {key}", current_value[0], color_list
)
bg_color = get_list_input(
f"Select Background Color for {key}", current_value[1], color_list
)
fg_color = get_list_input(f"Select Foreground Color for {key}", current_value[0], color_list)
bg_color = get_list_input(f"Select Background Color for {key}", current_value[1], color_list)
return [fg_color, bg_color]
def edit_value(key: str, current_value: str) -> str:
def edit_value(key, current_value, menu_state):
height = 10
input_width = width - 16 # Allow space for "New Value: "
@@ -45,14 +38,9 @@ def edit_value(key: str, current_value: str) -> str:
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
wrap_width = width - 4 # Account for border and padding
wrapped_lines = [
current_value[i : i + wrap_width]
for i in range(0, len(current_value), wrap_width)
]
wrapped_lines = [current_value[i:i+wrap_width] for i in range(0, len(current_value), wrap_width)]
for i, line in enumerate(
wrapped_lines[:4]
): # Limit display to fit the window height
for i, line in enumerate(wrapped_lines[:4]): # Limit display to fit the window height
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
edit_win.refresh()
@@ -60,14 +48,10 @@ def edit_value(key: str, current_value: str) -> str:
# Handle theme selection dynamically
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [
k.split("_", 2)[2].lower()
for k in loaded_config.keys()
if k.startswith("COLOR_CONFIG")
]
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"]
sort_options = ['lastHeard', 'name', 'hops']
return get_list_input("Sort By", current_value, sort_options)
# Standard Input Mode (Scrollable)
@@ -79,26 +63,18 @@ def edit_value(key: str, current_value: str) -> str:
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
visible_text = user_input[
scroll_offset : scroll_offset + input_width
] # Only show what fits
edit_win.addstr(
row, col, " " * input_width, get_color("settings_default")
) # Clear previous text
edit_win.addstr(
row, col, visible_text, get_color("settings_default")
) # Display text
visible_text = user_input[scroll_offset:scroll_offset + input_width] # Only show what fits
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
edit_win.refresh()
edit_win.move(
row, col + min(len(user_input) - scroll_offset, input_width)
) # Adjust cursor position
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
curses.curs_set(0)
return current_value # Exit without returning a value
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
@@ -106,9 +82,7 @@ def edit_value(key: str, current_value: str) -> str:
if user_input: # Only process if there's something to delete
user_input = user_input[:-1]
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
scroll_offset -= (
1 # Move back if text is shorter than scrolled area
)
scroll_offset -= 1 # Move back if text is shorter than scrolled area
else:
if isinstance(key, str):
user_input += key
@@ -122,7 +96,7 @@ def edit_value(key: str, current_value: str) -> str:
return user_input if user_input else current_value
def display_menu(menu_state: Any) -> tuple[Any, Any, list[str]]:
def display_menu(menu_state):
"""
Render the configuration menu with a Save button directly added to the window.
"""
@@ -138,7 +112,7 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, list[str]]:
# Calculate dynamic dimensions for the menu
max_menu_height = curses.LINES
menu_height = min(max_menu_height, num_items + 5)
menu_height = min(max_menu_height, num_items + 5)
num_items = len(options)
start_y = (curses.LINES - menu_height) // 2
start_x = (curses.COLS - width) // 2
@@ -158,120 +132,86 @@ def display_menu(menu_state: Any) -> tuple[Any, Any, list[str]]:
# Display the menu path
header = " > ".join(menu_state.menu_path)
if len(header) > width - 4:
header = header[: width - 7] + "..."
header = header[:width - 7] + "..."
menu_win.addstr(1, 2, header, get_color("settings_breadcrumbs", bold=True))
# Populate the pad with menu options
for idx, key in enumerate(options):
value = (
menu_state.current_menu[key]
if isinstance(menu_state.current_menu, dict)
else menu_state.current_menu[int(key.strip("[]"))]
value = menu_state.current_menu[key] if isinstance(menu_state.current_menu, dict) else menu_state.current_menu[int(key.strip("[]"))]
display_key = f"{key}"[:width // 2 - 2]
display_value = (
f"{value}"[:width // 2 - 8]
)
display_key = f"{key}"[: width // 2 - 2]
display_value = f"{value}"[: width // 2 - 8]
color = get_color(
"settings_default", reverse=(idx == menu_state.selected_index)
)
menu_pad.addstr(
idx,
0,
f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8),
color,
)
color = get_color("settings_default", reverse=(idx == menu_state.selected_index))
menu_pad.addstr(idx, 0, f"{display_key:<{width // 2 - 2}} {display_value}".ljust(width - 8), color)
# Add Save button to the main window
if menu_state.show_save_option:
save_position = menu_height - 2
menu_win.addstr(
save_position,
(width - len(save_option)) // 2,
save_option,
get_color(
"settings_save",
reverse=(menu_state.selected_index == len(menu_state.current_menu)),
),
)
menu_win.addstr(save_position, (width - len(save_option)) // 2, save_option, get_color("settings_save", reverse=(menu_state.selected_index == len(menu_state.current_menu))))
menu_win.refresh()
menu_pad.refresh(
menu_state.start_index[-1],
0,
menu_win.getbegyx()[0] + 3,
menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0]
+ 3
+ menu_win.getmaxyx()[0]
- 5
- (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4,
menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0),
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4
)
max_index = num_items + (1 if menu_state.show_save_option else 0) - 1
visible_height = (
menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
)
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
draw_arrows(menu_win, visible_height, max_index, menu_state)
return menu_win, menu_pad, options
# def move_highlight(
# old_idx: int,
# options: list[str],
# menu_win: curses.window,
# menu_pad: curses.window,
# menu_state: Any
# ) -> None:
def move_highlight(old_idx, new_idx, options, menu_win, menu_pad, menu_state):
if old_idx == new_idx: # No-op
return
# if old_idx == menu_state.selected_index: # No-op
# return
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# visible_height = menu_win.getmaxyx()[0] - 5 - (2 if menu_state.show_save_option else 0)
# Adjust menu_state.start_index only when moving out of visible range
if new_idx == max_index and menu_state.show_save_option:
pass
elif new_idx < menu_state.start_index[-1]: # Moving above the visible area
menu_state.start_index[-1] = new_idx
elif new_idx >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
menu_state.start_index[-1] = new_idx - visible_height
pass
# # Adjust menu_state.start_index only when moving out of visible range
# if menu_state.selected_index == max_index and menu_state.show_save_option:
# pass
# elif menu_state.selected_index < menu_state.start_index[-1]: # Moving above the visible area
# menu_state.start_index[-1] = menu_state.selected_index
# elif menu_state.selected_index >= menu_state.start_index[-1] + visible_height: # Moving below the visible area
# menu_state.start_index[-1] = menu_state.selected_index - visible_height
# pass
# Ensure menu_state.start_index is within bounds
menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# # Ensure menu_state.start_index is within bounds
# menu_state.start_index[-1] = max(0, min(menu_state.start_index[-1], max_index - visible_height + 1))
# Clear old selection
if menu_state.show_save_option and old_idx == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
else:
menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
# # Clear old selection
# if menu_state.show_save_option and old_idx == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save"))
# else:
# menu_pad.chgat(old_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive") if options[old_idx] in sensitive_settings else get_color("settings_default"))
# Highlight new selection
if menu_state.show_save_option and new_idx == max_index:
menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
else:
menu_pad.chgat(new_idx, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[new_idx] in sensitive_settings else get_color("settings_default", reverse=True))
# # Highlight new selection
# if menu_state.show_save_option and menu_state.selected_index == max_index:
# menu_win.chgat(menu_win.getmaxyx()[0] - 2, (width - len(save_option)) // 2, len(save_option), get_color("settings_save", reverse=True))
# else:
# menu_pad.chgat(menu_state.selected_index, 0, menu_pad.getmaxyx()[1], get_color("settings_sensitive", reverse=True) if options[menu_state.selected_index] in sensitive_settings else get_color("settings_default", reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(menu_state.start_index[-1], 0,
menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
menu_win.getbegyx()[0] + 3 + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# menu_win.refresh()
# # Refresh pad only if scrolling is needed
# menu_pad.refresh(menu_state.start_index[-1], 0,
# menu_win.getbegyx()[0] + 3, menu_win.getbegyx()[1] + 4,
# menu_win.getbegyx()[0] + 3 + visible_height,
# menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 4)
# draw_arrows(menu_win, visible_height, max_index, menu_state)
draw_arrows(menu_win, visible_height, max_index, menu_state)
def draw_arrows(
win: curses.window, visible_height: int, max_index: int, menu_state: any
) -> None:
def draw_arrows(win, visible_height, max_index, menu_state):
mi = max_index - (2 if menu_state.show_save_option else 0)
mi = max_index - (2 if menu_state.show_save_option else 0)
if visible_height < mi:
if menu_state.start_index[-1] > 0:
@@ -279,15 +219,13 @@ def draw_arrows(
else:
win.addstr(3, 2, " ", get_color("settings_default"))
if mi - menu_state.start_index[-1] >= visible_height + (
0 if menu_state.show_save_option else 1
):
if mi - menu_state.start_index[-1] >= visible_height + (0 if menu_state.show_save_option else 1) :
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
def json_editor(stdscr, menu_state):
menu_state.selected_index = 0 # Track the selected option
@@ -303,7 +241,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
json.dump({}, f)
# Load JSON data
with open(file_path, "r", encoding="utf-8") as f:
with open(file_path, "r") as f:
original_data = json.load(f)
data = original_data # Reference to the original data
@@ -314,68 +252,30 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
need_redraw = True
while True:
if need_redraw:
if(need_redraw):
menu_win, menu_pad, options = display_menu(menu_state)
menu_win.refresh()
need_redraw = False
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
key = menu_win.getch()
if key == curses.KEY_UP:
old_idx = menu_state.selected_index
menu_state.selected_index = (
max_index
if menu_state.selected_index == 0
else menu_state.selected_index - 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
sensitive_mode=True,
)
old_selected_index = menu_state.selected_index
menu_state.selected_index = max_index if menu_state.selected_index == 0 else menu_state.selected_index - 1
move_highlight(old_selected_index, menu_state.selected_index, options, menu_win, menu_pad, menu_state)
elif key == curses.KEY_DOWN:
old_idx = menu_state.selected_index
menu_state.selected_index = (
0
if menu_state.selected_index == max_index
else menu_state.selected_index + 1
)
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
sensitive_mode=True,
)
old_selected_index = menu_state.selected_index
menu_state.selected_index = 0 if menu_state.selected_index == max_index else menu_state.selected_index + 1
move_highlight(old_selected_index, menu_state.selected_index, options, menu_win, menu_pad, menu_state)
elif key == ord("\t") and menu_state.show_save_option:
old_idx = menu_state.selected_index
old_selected_index = menu_state.selected_index
menu_state.selected_index = max_index
move_highlight(
old_idx,
menu_state.selected_index,
options,
menu_win,
menu_pad,
start_index_ref=menu_state.start_index[-1:],
selected_index=menu_state.selected_index,
show_save=menu_state.show_save_option,
sensitive_mode=True,
)
move_highlight(old_selected_index, menu_state.selected_index, options, menu_win, menu_pad, menu_state)
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
@@ -383,14 +283,12 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_win.erase()
menu_win.refresh()
if menu_state.selected_index < len(
options
): # Handle selection of a menu item
if menu_state.selected_index < len(options): # Handle selection of a menu item
selected_key = options[menu_state.selected_index]
menu_state.menu_path.append(str(selected_key))
menu_state.start_index.append(0)
menu_state.menu_index.append(menu_state.selected_index)
# Handle nested data
if isinstance(menu_state.current_menu, dict):
if selected_key in menu_state.current_menu:
@@ -398,9 +296,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
continue # Skip invalid key
elif isinstance(menu_state.current_menu, list):
selected_data = menu_state.current_menu[
int(selected_key.strip("[]"))
]
selected_data = menu_state.current_menu[int(selected_key.strip("[]"))]
if isinstance(selected_data, list) and len(selected_data) == 2:
# Edit color pair
@@ -417,13 +313,13 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# General value editing
new_value = edit_value(selected_key, selected_data)
new_value = edit_value(selected_key, selected_data, menu_state)
menu_state.menu_path.pop()
menu_state.menu_index.pop()
menu_state.start_index.pop()
menu_state.current_menu[selected_key] = new_value
need_redraw = True
else:
# Save button selected
save_json(file_path, data)
@@ -445,28 +341,23 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.current_menu = data
for path in menu_state.menu_path[2:]:
menu_state.current_menu = (
menu_state.current_menu[path]
if isinstance(menu_state.current_menu, dict)
else menu_state.current_menu[int(path.strip("[]"))]
)
menu_state.current_menu = menu_state.current_menu[path] if isinstance(menu_state.current_menu, dict) else menu_state.current_menu[int(path.strip("[]"))]
else:
# Exit the editor
menu_win.clear()
menu_win.refresh()
break
def save_json(file_path: str, data: dict[str, Any]) -> None:
def save_json(file_path, data):
formatted_json = format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
setup_colors(reinit=True)
def main(stdscr: curses.window) -> None:
def main(stdscr):
from contact.ui.ui_state import MenuState
menu_state = MenuState()
@@ -478,6 +369,5 @@ def main(stdscr: curses.window) -> None:
setup_colors()
json_editor(stdscr, menu_state)
if __name__ == "__main__":
curses.wrapper(main)
curses.wrapper(main)

View File

@@ -1,7 +1,7 @@
from argparse import ArgumentParser
import argparse
def setup_parser() -> ArgumentParser:
parser = ArgumentParser(
def setup_parser():
parser = argparse.ArgumentParser(
add_help=True,
epilog="If no connection arguments are specified, we attempt a serial connection and then a TCP connection to localhost.")

View File

@@ -1,8 +1,9 @@
import yaml
import logging
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config
from meshtastic import BROADCAST_ADDR, mt_config
from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
@@ -19,9 +20,9 @@ def traverseConfig(config_root, config, interface_config) -> bool:
return True
def splitCompoundName(comp_name: str) -> list[str]:
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name: list[str] = comp_name.split(".")
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
@@ -122,24 +123,24 @@ def setPref(config, comp_name, raw_val) -> bool:
def config_import(interface, filename):
def config_import(node_state, filename):
with open(filename, encoding="utf8") as file:
configuration = yaml.safe_load(file)
closeNow = True
interface.getNode('^local', False).beginSettingsTransaction()
node_state.interface.getNode('^local', False).beginSettingsTransaction()
if "owner" in configuration:
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode('^local', False).setOwner(configuration["owner"])
node_state.interface.getNode('^local', False).setOwner(configuration["owner"])
if "owner_short" in configuration:
logging.info(
f"Setting device owner short to {configuration['owner_short']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
node_state.interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["owner_short"]
)
@@ -148,23 +149,23 @@ def config_import(interface, filename):
f"Setting device owner short to {configuration['ownerShort']}"
)
waitForAckNak = True
interface.getNode('^local', False).setOwner(
node_state.interface.getNode('^local', False).setOwner(
long_name=None, short_name=configuration["ownerShort"]
)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode('^local').setURL(configuration["channel_url"])
node_state.interface.getNode('^local').setURL(configuration["channel_url"])
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode('^local').setURL(configuration["channelUrl"])
node_state.interface.getNode('^local').setURL(configuration["channelUrl"])
if "location" in configuration:
alt = 0
lat = 0.0
lon = 0.0
localConfig = interface.localNode.localConfig
localConfig = node_state.interface.localNode.localConfig
if "alt" in configuration["location"]:
alt = int(configuration["location"]["alt"] or 0)
@@ -176,43 +177,43 @@ def config_import(interface, filename):
lon = float(configuration["location"]["lon"] or 0)
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
node_state.interface.localNode.setFixedPosition(lat, lon, alt)
if "config" in configuration:
localConfig = interface.getNode('^local').localConfig
localConfig = node_state.interface.getNode('^local').localConfig
for section in configuration["config"]:
traverseConfig(
section, configuration["config"][section], localConfig
)
interface.getNode('^local').writeConfig(
node_state.interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
if "module_config" in configuration:
moduleConfig = interface.getNode('^local').moduleConfig
moduleConfig = node_state.interface.getNode('^local').moduleConfig
for section in configuration["module_config"]:
traverseConfig(
section,
configuration["module_config"][section],
moduleConfig,
)
interface.getNode('^local').writeConfig(
node_state.interface.getNode('^local').writeConfig(
camel_to_snake(section)
)
interface.getNode('^local', False).commitSettingsTransaction()
node_state.interface.getNode('^local', False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")
def config_export(interface) -> str:
def config_export(node_state) -> str:
"""used in --export-config"""
configObj = {}
owner = interface.getLongName()
owner_short = interface.getShortName()
channel_url = interface.localNode.getURL()
myinfo = interface.getMyNodeInfo()
owner = node_state.interface.getLongName()
owner_short = node_state.interface.getShortName()
channel_url = node_state.interface.localNode.getURL()
myinfo = node_state.interface.getMyNodeInfo()
pos = myinfo.get("position")
lat = None
lon = None
@@ -237,7 +238,7 @@ def config_export(interface) -> str:
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
config = MessageToDict(node_state.interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
@@ -260,7 +261,7 @@ def config_export(interface) -> str:
else:
configObj["config"] = config
module_config = MessageToDict(interface.localNode.moduleConfig)
module_config = MessageToDict(node_state.interface.localNode.moduleConfig)
if module_config:
# Convert inner keys to correct snake/camelCase
prefs = {}

View File

@@ -1,11 +1,10 @@
import re
def parse_ini_file(ini_file_path: str) -> tuple[dict[str, str], dict[str, str]]:
"""Parses an INI file and returns a mapping of keys to human-readable names and help text."""
field_mapping: dict[str, str] = {}
help_text: dict[str, str] = {}
current_section: str | None = None
def parse_ini_file(ini_file_path):
field_mapping = {}
help_text = {}
current_section = None
with open(ini_file_path, 'r', encoding='utf-8') as f:
for line in f:
@@ -47,14 +46,14 @@ def parse_ini_file(ini_file_path: str) -> tuple[dict[str, str], dict[str, str]]:
return field_mapping, help_text
def transform_menu_path(menu_path: list[str]) -> list[str]:
def transform_menu_path(menu_path):
"""Applies path replacements and normalizes entries in the menu path."""
path_replacements = {
"Radio Settings": "config",
"Module Settings": "module"
}
transformed_path: list[str] = []
transformed_path = []
for part in menu_path[1:]: # Skip 'Main Menu'
# Apply fixed replacements
part = path_replacements.get(part, part)

View File

@@ -7,15 +7,14 @@ from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
import contact.globals as globals
def get_table_name(channel: str) -> str:
def get_table_name(channel, node_state):
# Construct the table name
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
table_name = f"{str(node_state.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
def save_message_to_db(channel: str, user_id: str, message_text: str) -> int | None:
def save_message_to_db(channel, user_id, message_text):
"""Save messages to the database, ensuring the table exists."""
try:
quoted_table_name = get_table_name(channel)
@@ -48,7 +47,7 @@ def save_message_to_db(channel: str, user_id: str, message_text: str) -> int | N
logging.error(f"Unexpected error in save_message_to_db: {e}")
def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None:
def update_ack_nak(channel, timestamp, message, ack, node_state):
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
@@ -60,7 +59,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(node_state.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -70,14 +69,14 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
logging.error(f"Unexpected error in update_ack_nak: {e}")
def load_messages_from_db() -> None:
def load_messages_from_db(node_state):
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(node_state.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -124,10 +123,10 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(globals.myNodeNum):
if user_id == str(node_state.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ", message)
formatted_message = (f"{config.message_prefix} {get_name_from_database(int(user_id), node_state,'short')}: ", message)
hourly_messages[hour].append(formatted_message)
@@ -143,14 +142,14 @@ def load_messages_from_db() -> None:
logging.error(f"SQLite error in load_messages_from_db: {e}")
def init_nodedb() -> None:
def init_nodedb(node_state):
"""Initialize the node database and update it with nodes from the interface."""
try:
if not globals.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
ensure_node_table_exists(node_state) # Ensure the table exists before insertion
nodes_snapshot = list(globals.interface.nodes.values())
# Insert or update all nodes
@@ -173,7 +172,7 @@ def init_nodedb() -> None:
logging.error(f"Unexpected error in init_nodedb: {e}")
def maybe_store_nodeinfo_in_db(packet: dict[str, object]) -> None:
def maybe_store_nodeinfo_in_db(packet, node_state):
"""Save nodeinfo unless that record is already there, updating if necessary."""
try:
user_id = packet['from']
@@ -184,31 +183,21 @@ def maybe_store_nodeinfo_in_db(packet: dict[str, object]) -> None:
role = packet['decoded']['user'].get('role', 'CLIENT')
public_key = packet['decoded']['user'].get('publicKey', '')
update_node_info_in_db(user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
update_node_info_in_db(node_state, user_id, long_name, short_name, hw_model, is_licensed, role, public_key)
except sqlite3.Error as e:
logging.error(f"SQLite error in maybe_store_nodeinfo_in_db: {e}")
except Exception as e:
logging.error(f"Unexpected error in maybe_store_nodeinfo_in_db: {e}")
def update_node_info_in_db(
user_id: int | str,
long_name: str | None = None,
short_name: str | None = None,
hw_model: str | None = None,
is_licensed: str | int | None = None,
role: str | None = None,
public_key: str | None = None,
chat_archived: int | None = None
) -> None:
def update_node_info_in_db(node_state, user_id, long_name=None, short_name=None, hw_model=None, is_licensed=None, role=None, public_key=None, chat_archived=None):
"""Update or insert node information into the database, preserving unchanged fields."""
try:
ensure_node_table_exists() # Ensure the table exists before any operation
ensure_node_table_exists(node_state) # Ensure the table exists before any operation
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{node_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f'PRAGMA table_info({table_name})')]
if "chat_archived" not in table_columns:
@@ -260,9 +249,9 @@ def update_node_info_in_db(
logging.error(f"Unexpected error in update_node_info_in_db: {e}")
def ensure_node_table_exists() -> None:
def ensure_node_table_exists(node_state):
"""Ensure the node database table exists."""
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{node_state.myNodeNum}_nodedb"' # Quote for safety
schema = '''
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -276,7 +265,7 @@ def ensure_node_table_exists() -> None:
ensure_table_exists(table_name, schema)
def ensure_table_exists(table_name: str, schema: str) -> None:
def ensure_table_exists(table_name, schema):
"""Ensure the given table exists in the database."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
@@ -290,7 +279,7 @@ def ensure_table_exists(table_name: str, schema: str) -> None:
logging.error(f"Unexpected error in ensure_table_exists({table_name}): {e}")
def get_name_from_database(user_id: int, type: str = "long") -> str:
def get_name_from_database(user_id, node_state, type="long"):
"""
Retrieve a user's name (long or short) from the node database.
@@ -303,7 +292,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(node_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -324,11 +313,11 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
logging.error(f"Unexpected error in get_name_from_database: {e}")
return "Unknown"
def is_chat_archived(user_id: int) -> int:
def is_chat_archived(user_id, node_state):
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(globals.myNodeNum)}_nodedb"
table_name = f"{str(node_state.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -3,14 +3,11 @@ import binascii
import curses
import ipaddress
import re
from typing import Any, Optional
from contact.ui.colors import get_color
from contact.ui.navigation_utils import move_highlight
def wrap_text(text: str, wrap_width: int) -> list[str]:
def wrap_text(text, wrap_width):
"""Wraps text while preserving spaces and breaking long words."""
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
words = re.findall(r'\S+|\s+', text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
line_length = 0
@@ -26,7 +23,7 @@ def wrap_text(text: str, wrap_width: int) -> list[str]:
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
wrapped_lines.append(word[i : i + wrap_width])
wrapped_lines.append(word[i:i+wrap_width])
continue
if line_length + word_length > wrap_width and word.strip():
@@ -41,9 +38,9 @@ def wrap_text(text: str, wrap_width: int) -> list[str]:
wrapped_lines.append(line_buffer)
return wrapped_lines
def get_text_input(prompt: str) -> Optional[str]:
def get_text_input(prompt):
"""Handles user input with wrapped text for long prompts."""
height = 8
width = 80
@@ -63,16 +60,14 @@ def get_text_input(prompt: str) -> Optional[str]:
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(
row, margin, line[:input_width], get_color("settings_default", bold=True)
)
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3: # Prevent overflow
break
prompt_text = "Enter new value: "
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.refresh()
curses.curs_set(1)
@@ -109,35 +104,21 @@ def get_text_input(prompt: str) -> Optional[str]:
first_line = user_input[:first_line_width] # Cut to max first line width
remaining_text = user_input[first_line_width:] # Remaining text for wrapping
wrapped_lines = (
wrap_text(remaining_text, wrap_width=input_width) if remaining_text else []
)
wrapped_lines = wrap_text(remaining_text, wrap_width=input_width) if remaining_text else []
# Clear only the input area (without touching prompt text)
for i in range(max_input_rows):
if row + 1 + i < height - 1:
input_win.addstr(
row + 1 + i,
margin,
" " * min(input_width, width - margin - 1),
get_color("settings_default"),
)
input_win.addstr(row + 1 + i, margin, " " * min(input_width, width - margin - 1), get_color("settings_default"))
# Redraw the prompt text so it never disappears
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
# Redraw wrapped input
input_win.addstr(
row + 1, col_start, first_line, get_color("settings_default")
) # First line next to prompt
input_win.addstr(row + 1, col_start, first_line, get_color("settings_default")) # First line next to prompt
for i, line in enumerate(wrapped_lines):
if row + 2 + i < height - 1:
input_win.addstr(
row + 2 + i,
margin,
line[:input_width],
get_color("settings_default"),
)
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
@@ -147,7 +128,7 @@ def get_text_input(prompt: str) -> Optional[str]:
return user_input
def get_admin_key_input(current_value: list[bytes]) -> Optional[list[str]]:
def get_admin_key_input(current_value):
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
@@ -182,76 +163,56 @@ def get_admin_key_input(current_value: list[bytes]) -> Optional[list[str]]:
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(
1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True)
)
repeated_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(
3 + i,
2,
f"{prefix}Admin Key {i + 1}: ",
get_color("settings_default", bold=(i == cursor_pos)),
)
repeated_win.addstr(3 + i, 2, f"{prefix}Admin Key {i + 1}: ", get_color("settings_default", bold=(i == cursor_pos)))
repeated_win.addstr(3 + i, 18, line) # Align text for easier editing
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(
3 + cursor_pos, 18 + len(user_values[cursor_pos])
) # Position cursor at end of text
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(
7, 2, error_message, get_color("settings_default", bold=True)
)
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
if (
key == 27 or key == curses.KEY_LEFT
): # Escape or Left Arrow -> Cancel and return original
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
return None
elif key == ord("\n"): # Enter key to save and return
if all(
is_valid_base64(val) for val in user_values
): # Ensure all values are valid Base64 and 32 bytes
elif key == ord('\n'): # Enter key to save and return
if all(is_valid_base64(val) for val in user_values): # Ensure all values are valid Base64 and 32 bytes
curses.noecho()
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
error_message = (
"Error: Each key must be valid Base64 and 32 bytes long!"
)
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][
:-1
] # Remove last character
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
else:
try:
user_values[cursor_pos] += chr(
key
) # Append valid character input to the selected field
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
def get_repeated_input(current_value: list[str]) -> Optional[str]:
def get_repeated_input(current_value):
height = 9
width = 80
start_y = (curses.LINES - height) // 2
@@ -273,46 +234,33 @@ def get_repeated_input(current_value: list[str]) -> Optional[str]:
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(
1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True)
)
repeated_win.addstr(1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(
3 + i,
2,
f"{prefix}Value{i + 1}: ",
get_color("settings_default", bold=(i == cursor_pos)),
)
repeated_win.addstr(3 + i, 18, line)
repeated_win.addstr(3 + i, 2, f"{prefix}Value{i + 1}: ", get_color("settings_default", bold=(i == cursor_pos)))
repeated_win.addstr(3 + i, 18, line)
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(
3 + cursor_pos, 18 + len(user_values[cursor_pos])
) # Position cursor at end of text
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(
7, 2, error_message, get_color("settings_default", bold=True)
)
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.refresh()
key = repeated_win.getch()
if (
key == 27 or key == curses.KEY_LEFT
): # Escape or Left Arrow -> Cancel and return original
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
return None
elif key == ord("\n"): # Enter key to save and return
elif key == ord('\n'): # Enter key to save and return
curses.noecho()
curses.curs_set(0)
return ", ".join(user_values)
@@ -322,20 +270,16 @@ def get_repeated_input(current_value: list[str]) -> Optional[str]:
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][
:-1
] # Remove last character
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
else:
try:
user_values[cursor_pos] += chr(
key
) # Append valid character input to the selected field
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
def get_fixed32_input(current_value: int) -> int:
def get_fixed32_input(current_value):
cvalue = current_value
current_value = str(ipaddress.IPv4Address(current_value))
height = 10
@@ -355,9 +299,7 @@ def get_fixed32_input(current_value: int) -> int:
while True:
fixed32_win.erase()
fixed32_win.border()
fixed32_win.addstr(
1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD
)
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
fixed32_win.addstr(5, 2, f"New value: {user_input}")
fixed32_win.refresh()
@@ -370,23 +312,16 @@ def get_fixed32_input(current_value: int) -> int:
curses.noecho()
curses.curs_set(0)
return cvalue # Return the current value unchanged
elif key == ord("\n"): # Enter key to validate and save
elif key == ord('\n'): # Enter key to validate and save
# Validate IP address
octets = user_input.split(".")
if len(octets) == 4 and all(
octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets
):
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
curses.noecho()
curses.curs_set(0)
fixed32_address = ipaddress.ip_address(user_input)
return int(fixed32_address) # Return the valid IP address
else:
fixed32_win.addstr(
7,
2,
"Invalid IP address. Try again.",
curses.A_BOLD | curses.color_pair(5),
)
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
fixed32_win.refresh()
curses.napms(1500) # Wait for 1.5 seconds before refreshing
user_input = "" # Clear invalid input
@@ -401,16 +336,11 @@ def get_fixed32_input(current_value: int) -> int:
pass # Ignore invalid inputs
def get_list_input(
prompt: str, current_option: Optional[str], list_options: list[str]
) -> Optional[str]:
def get_list_input(prompt, current_option, list_options):
"""
Displays a scrollable list of list_options for the user to choose from.
"""
selected_index = (
list_options.index(current_option) if current_option in list_options else 0
)
scroll_offset = 0
selected_index = list_options.index(current_option) if current_option in list_options else 0
height = min(len(list_options) + 5, curses.LINES)
width = 80
@@ -433,28 +363,16 @@ def get_list_input(
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(
idx,
0,
color.ljust(width - 8),
get_color("settings_default", reverse=True),
)
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(
idx, 0, color.ljust(width - 8), get_color("settings_default")
)
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
list_pad.refresh(0, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2, list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
max_index = len(list_options) - 1
visible_height = list_win.getmaxyx()[0] - 5
@@ -464,32 +382,14 @@ def get_list_input(
key = list_win.getch()
if key == curses.KEY_UP:
old_idx = selected_index
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
scroll_ref = [scroll_offset]
move_highlight(
old_idx,
selected_index,
list_options,
list_win,
list_pad,
start_index_ref=scroll_ref,
)
scroll_offset = scroll_ref[0]
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
elif key == curses.KEY_DOWN:
old_idx = selected_index
old_selected_index = selected_index
selected_index = min(len(list_options) - 1, selected_index + 1)
scroll_ref = [scroll_offset]
move_highlight(
old_idx,
selected_index,
list_options,
list_win,
list_pad,
start_index_ref=scroll_ref,
)
scroll_offset = scroll_ref[0]
elif key == ord("\n"): # Enter key
move_highlight(old_selected_index, selected_index, list_options, list_win, list_pad)
elif key == ord('\n'): # Enter key
list_win.clear()
list_win.refresh()
return list_options[selected_index]
@@ -499,55 +399,47 @@ def get_list_input(
return current_option
# def move_highlight(
# old_idx: int,
# new_idx: int,
# options: list[str],
# list_win: curses.window,
# list_pad: curses.window
# ) -> int:
def move_highlight(old_idx, new_idx, options, list_win, list_pad):
# global scroll_offset
# if 'scroll_offset' not in globals():
# scroll_offset = 0 # Initialize if not set
global scroll_offset
if 'scroll_offset' not in globals():
scroll_offset = 0 # Initialize if not set
# if old_idx == new_idx:
# return # No-op
if old_idx == new_idx:
return # No-op
# max_index = len(options) - 1
# visible_height = list_win.getmaxyx()[0] - 5
max_index = len(options) - 1
visible_height = list_win.getmaxyx()[0] - 5
# # Adjust scroll_offset only when moving out of visible range
# if new_idx < scroll_offset: # Moving above the visible area
# scroll_offset = new_idx
# elif new_idx >= scroll_offset + visible_height: # Moving below the visible area
# scroll_offset = new_idx - visible_height
# Adjust scroll_offset only when moving out of visible range
if new_idx < scroll_offset: # Moving above the visible area
scroll_offset = new_idx
elif new_idx >= scroll_offset + visible_height: # Moving below the visible area
scroll_offset = new_idx - visible_height
# # Ensure scroll_offset is within bounds
# scroll_offset = max(0, min(scroll_offset, max_index - visible_height + 1))
# Ensure scroll_offset is within bounds
scroll_offset = max(0, min(scroll_offset, max_index - visible_height + 1))
# # Clear old highlight
# list_pad.chgat(old_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default"))
# Clear old highlight
list_pad.chgat(old_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default"))
# # Highlight new selection
# list_pad.chgat(new_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default", reverse=True))
# Highlight new selection
list_pad.chgat(new_idx, 0, list_pad.getmaxyx()[1], get_color("settings_default", reverse=True))
# list_win.refresh()
list_win.refresh()
# Refresh pad only if scrolling is needed
list_pad.refresh(scroll_offset, 0,
list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + 3 + visible_height,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
draw_arrows(list_win, visible_height, max_index, scroll_offset)
# # Refresh pad only if scrolling is needed
# list_pad.refresh(scroll_offset, 0,
# list_win.getbegyx()[0] + 3, list_win.getbegyx()[1] + 4,
# list_win.getbegyx()[0] + 3 + visible_height,
# list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4)
# draw_arrows(list_win, visible_height, max_index, scroll_offset)
# return scroll_offset # Return updated scroll_offset to be stored externally
return scroll_offset # Return updated scroll_offset to be stored externally
def draw_arrows(
win: curses.window, visible_height: int, max_index: int, start_index: int
) -> None:
def draw_arrows(win, visible_height, max_index, start_index):
if visible_height < max_index:
if start_index > 0:
@@ -559,3 +451,4 @@ def draw_arrows(
win.addstr(visible_height + 3, 2, "", get_color("settings_default"))
else:
win.addstr(visible_height + 3, 2, " ", get_color("settings_default"))

View File

@@ -1,41 +1,23 @@
import logging
import meshtastic.serial_interface, meshtastic.tcp_interface, meshtastic.ble_interface
import contact.globals as globals
def initialize_interface(args):
try:
if args.ble:
return meshtastic.ble_interface.BLEInterface(args.ble if args.ble != "any" else None)
elif args.host:
try:
if ":" in args.host:
tcp_hostname, tcp_port = args.host.split(':')
else:
tcp_hostname = args.host
tcp_port = meshtastic.tcp_interface.DEFAULT_TCP_PORT
return meshtastic.tcp_interface.TCPInterface(tcp_hostname, portNumber=tcp_port)
except Exception as ex:
logging.error(f"Error connecting to {args.host}. {ex}")
return meshtastic.tcp_interface.TCPInterface(args.host)
else:
try:
client = meshtastic.serial_interface.SerialInterface(args.port)
except FileNotFoundError as ex:
logging.error(f"The serial device at '{args.port}' was not found. {ex}")
return meshtastic.serial_interface.SerialInterface(args.port)
except PermissionError as ex:
logging.error(f"You probably need to add yourself to the `dialout` group to use a serial connection. {ex}")
except Exception as ex:
logging.error(f"Unexpected error initializing interface: {ex}")
except OSError as ex:
logging.error(f"The serial device couldn't be opened, it might be in use by another process. {ex}")
if client.devPath is None:
try:
client = meshtastic.tcp_interface.TCPInterface("localhost")
except Exception as ex:
logging.error(f"Error connecting to localhost:{ex}")
return client
if globals.interface.devPath is None:
return meshtastic.tcp_interface.TCPInterface("meshtastic.local")
except Exception as ex:
logging.critical(f"Fatal error initializing interface: {ex}")

View File

@@ -4,7 +4,7 @@ import logging
import base64
import time
def save_changes(interface, modified_settings, menu_state):
def save_changes(node_state, modified_settings, menu_state):
"""
Save changes to the device based on modified settings.
:param interface: Meshtastic interface instance
@@ -16,7 +16,7 @@ def save_changes(interface, modified_settings, menu_state):
logging.info("No changes to save. modified_settings is empty.")
return
node = interface.getNode('^local')
node = node_state.interface.getNode('^local')
admin_key_backup = None
if 'admin_key' in modified_settings:
# Get reference to security config
@@ -60,7 +60,7 @@ def save_changes(interface, modified_settings, menu_state):
lon = float(modified_settings.get('longitude', 0.0))
alt = int(modified_settings.get('altitude', 0))
interface.localNode.setFixedPosition(lat, lon, alt)
node_state.interface.localNode.setFixedPosition(lat, lon, alt)
logging.info(f"Updated {config_category} with Latitude: {lat} and Longitude {lon} and Altitude {alt}")
return

View File

@@ -67,10 +67,9 @@ def refresh_node_list():
return True
return False
def get_nodeNum():
myinfo = globals.interface.getMyNodeInfo()
myNodeNum = myinfo['num']
return myNodeNum
def get_nodeNum(node_state):
myinfo = node_state.interface.getMyNodeInfo()
node_state.myNodeNum = myinfo['num']
def decimal_to_hex(decimal_number):
return f"!{decimal_number:08x}"

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.6"
version = "1.3.2"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}