1
0
forked from iarv/contact

Compare commits

..

1 Commits

Author SHA1 Message Date
pdxlocations
3d1a489fd1 maybe dead end until refactor scrolling 2025-04-19 18:57:47 -07:00
19 changed files with 852 additions and 1531 deletions

View File

@@ -33,7 +33,6 @@ By navigating to Settings -> App Settings, you may customize your UI's icons, co
- `CTRL` + `p` = Hide/show a log of raw received packets.
- `CTRL` + `t` = With the Node List highlighted, send a traceroute to the selected node
- `CTRL` + `d` = With the Channel List hightlighted, archive a chat to reduce UI clutter. Messages will be saved in the db and repopulate if you send or receive a DM from this user.
- `CTRL` + `d` = With the Note List highlghted, remove a node from your nodedb.
- `ESC` = Exit out of the Settings Dialogue, or Quit the application if settings are not displayed.
### Search
@@ -68,11 +67,3 @@ To quickly connect to localhost, use:
```sh
contact -t
```
## Install in development (editable) mode:
```bash
git clone https://github.com/pdxlocations/contact.git
cd contact
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
```

View File

@@ -24,6 +24,7 @@ import traceback
from pubsub import pub
# Local application
import contact.globals as globals
import contact.ui.default_config as config
from contact.message_handlers.rx_handler import on_receive
from contact.settings import set_region
@@ -35,7 +36,7 @@ from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.input_handlers import get_list_input
from contact.utilities.interfaces import initialize_interface
from contact.utilities.utils import get_channels, get_nodeNum, get_node_list
from contact.utilities.singleton import ui_state, interface_state, app_state
# ------------------------------------------------------------------------------
# Environment & Logging Setup
@@ -51,27 +52,28 @@ logging.basicConfig(
filename=config.log_file_path, level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
app_state.lock = threading.Lock()
globals.lock = threading.Lock()
# ------------------------------------------------------------------------------
# Main Program Logic
# ------------------------------------------------------------------------------
def prompt_region_if_unset(args: object) -> None:
"""Prompt user to set region if it is unset."""
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(interface_state.interface)
interface_state.interface.close()
interface_state.interface = initialize_interface(args)
def initialize_globals() -> None:
def initialize_globals(args) -> None:
"""Initializes interface and shared globals."""
globals.interface = initialize_interface(args)
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
# Prompt for region if unset
if globals.interface.localNode.localConfig.lora.region == 0:
confirmation = get_list_input("Your region is UNSET. Set it now?", "Yes", ["Yes", "No"])
if confirmation == "Yes":
set_region(globals.interface)
globals.interface.close()
globals.interface = initialize_interface(args)
globals.myNodeNum = get_nodeNum()
globals.channel_list = get_channels()
globals.node_list = get_node_list()
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
@@ -80,63 +82,55 @@ def initialize_globals() -> None:
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
output_capture = io.StringIO()
try:
setup_colors()
draw_splash(stdscr)
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
setup_colors()
draw_splash(stdscr)
args = setup_parser().parse_args()
args = setup_parser().parse_args()
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
if getattr(args, "settings", False):
subprocess.run([sys.executable, "-m", "contact.settings"], check=True)
return
logging.info("Initializing interface...")
with app_state.lock:
interface_state.interface = initialize_interface(args)
logging.info("Initializing interface...")
with globals.lock:
initialize_globals(args)
logging.info("Starting main UI")
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
main_ui(stdscr)
initialize_globals()
logging.info("Starting main UI")
try:
with contextlib.redirect_stdout(output_capture), contextlib.redirect_stderr(output_capture):
main_ui(stdscr)
except Exception:
console_output = output_capture.getvalue()
logging.error("Uncaught exception inside main_ui")
logging.error("Traceback:\n%s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
return
except Exception:
except Exception as e:
console_output = output_capture.getvalue()
logging.error("Uncaught exception: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
logging.error("Console output:\n%s", console_output)
raise
def start() -> None:
"""Entry point for the application."""
"""Launch curses wrapper and redirect logs to file."""
if "--help" in sys.argv or "-h" in sys.argv:
setup_parser().print_help()
sys.exit(0)
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
try:
curses.endwin()
except Exception:
pass
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
with open(config.log_file_path, "a", buffering=1) as log_f:
sys.stdout = log_f
sys.stderr = log_f
with contextlib.redirect_stdout(log_f), contextlib.redirect_stderr(log_f):
try:
curses.wrapper(main)
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
sys.exit(0)
except Exception as e:
logging.error("Fatal error: %s", e)
logging.error("Traceback: %s", traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":

13
contact/globals.py Normal file
View File

@@ -0,0 +1,13 @@
interface = None
lock = None
display_log = False
all_messages = {}
channel_list = []
notifications = []
packet_buffer = []
node_list = []
myNodeNum = 0
selected_channel = 0
selected_message = 0
selected_node = 0
current_window = 0

View File

@@ -1,14 +1,9 @@
import logging
import os
import platform
import shutil
import subprocess
import time
from datetime import datetime
from typing import Any, Dict
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.utilities.utils import refresh_node_list
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
@@ -23,48 +18,7 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state
def play_sound():
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
elif shutil.which("ffplay") and os.path.exists(ogg_path):
executable = "ffplay"
sound_path = ogg_path
elif shutil.which("aplay") and os.path.exists(wav_path):
executable = "aplay"
sound_path = wav_path
else:
logging.warning("No suitable sound player or sound file found on Linux")
if executable and sound_path:
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
import contact.globals as globals
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
@@ -75,14 +29,14 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with app_state.lock:
with globals.lock:
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
globals.packet_buffer.append(packet)
if len(globals.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
globals.packet_buffer = globals.packet_buffer[-20:]
if ui_state.display_log:
if globals.display_log:
draw_packetlog_win()
try:
if "decoded" not in packet:
@@ -98,10 +52,6 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
if config.notification_sound == "True":
play_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
@@ -113,21 +63,19 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
else:
channel_number = 0
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
if packet["to"] == globals.myNodeNum:
if packet["from"] in globals.channel_list:
pass
else:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
globals.channel_list.append(packet["from"])
if packet["from"] not in globals.all_messages:
globals.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"])
channel_number = globals.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
if globals.channel_list[channel_number] != globals.channel_list[globals.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
@@ -137,14 +85,40 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string} ", message_string)
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[globals.channel_list[channel_number]]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[globals.channel_list[channel_number]].append((f"-- {current_hour} --", ""))
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string} ", message_string)
)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
save_message_to_db(globals.channel_list[channel_number], message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict
import google.protobuf.json_format
@@ -12,10 +13,7 @@ from contact.utilities.db_handler import (
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
from contact.utilities.utils import add_new_message
import contact.globals as globals
ack_naks: Dict[str, Dict[str, Any]] = {} # requestId -> {channel, messageIndex, timestamp}
@@ -33,12 +31,12 @@ def onAckNak(packet: Dict[str, Any]) -> None:
return
acknak = ack_naks.pop(request)
message = ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
message = globals.all_messages[acknak["channel"]][acknak["messageIndex"]][1]
confirm_string = " "
ack_type = None
if packet["decoded"]["routing"]["errorReason"] == "NONE":
if packet["from"] == interface_state.myNodeNum: # Ack "from" ourself means implicit ACK
if packet["from"] == globals.myNodeNum: # Ack "from" ourself means implicit ACK
confirm_string = config.ack_implicit_str
ack_type = "Implicit"
else:
@@ -48,15 +46,15 @@ def onAckNak(packet: Dict[str, Any]) -> None:
confirm_string = config.nak_str
ack_type = "Nak"
ui_state.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
globals.all_messages[acknak["channel"]][acknak["messageIndex"]] = (
config.sent_message_prefix + confirm_string + ": ",
message,
)
update_ack_nak(acknak["channel"], acknak["timestamp"], message, ack_type)
channel_number = ui_state.channel_list.index(acknak["channel"])
if ui_state.channel_list[channel_number] == ui_state.channel_list[ui_state.selected_channel]:
channel_number = globals.channel_list.index(acknak["channel"])
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
draw_messages_window()
@@ -139,17 +137,16 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
msg_str += route_str + "\n" # Print the route back to us
if packet["from"] not in ui_state.channel_list:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in globals.channel_list:
globals.channel_list.append(packet["from"])
refresh_channels = True
if is_chat_archived(packet["from"]):
update_node_info_in_db(packet["from"], chat_archived=False)
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
channel_number = globals.channel_list.index(packet["from"])
if channel_id == ui_state.channel_list[ui_state.selected_channel]:
if globals.channel_list[channel_number] == globals.channel_list[globals.selected_channel]:
refresh_messages = True
else:
add_notification(channel_number)
@@ -157,29 +154,33 @@ def on_response_traceroute(packet: Dict[str, Any]) -> None:
message_from_string = get_name_from_database(packet["from"], type="short") + ":\n"
add_new_message(channel_id, f"{config.message_prefix} {message_from_string}", msg_str)
if globals.channel_list[channel_number] not in globals.all_messages:
globals.all_messages[globals.channel_list[channel_number]] = []
globals.all_messages[globals.channel_list[channel_number]].append(
(f"{config.message_prefix} {message_from_string}", msg_str)
)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(globals.channel_list[channel_number], packet["from"], msg_str)
save_message_to_db(channel_id, packet["from"], msg_str)
def send_message(message: str, destination: int = BROADCAST_NUM, channel: int = 0) -> None:
"""
Sends a chat message using the selected channel.
"""
myid = interface_state.myNodeNum
myid = globals.myNodeNum
send_on_channel = 0
channel_id = ui_state.channel_list[channel]
channel_id = globals.channel_list[channel]
if isinstance(channel_id, int):
send_on_channel = 0
destination = channel_id
elif isinstance(channel_id, str):
send_on_channel = channel
sent_message_data = interface_state.interface.sendText(
sent_message_data = globals.interface.sendText(
text=message,
destinationId=destination,
wantAck=True,
@@ -188,13 +189,38 @@ def send_message(message: str, destination: int = BROADCAST_NUM, channel: int =
channelIndex=send_on_channel,
)
add_new_message(channel_id, config.sent_message_prefix + config.ack_unknown_str + ": ", message)
# Add sent message to the messages dictionary
if channel_id not in globals.all_messages:
globals.all_messages[channel_id] = []
# Handle timestamp logic
current_timestamp = int(datetime.now().timestamp()) # Get current timestamp
current_hour = datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = globals.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
globals.all_messages[channel_id].append((f"-- {current_hour} --", ""))
globals.all_messages[channel_id].append((config.sent_message_prefix + config.ack_unknown_str + ": ", message))
timestamp = save_message_to_db(channel_id, myid, message)
ack_naks[sent_message_data.id] = {
"channel": channel_id,
"messageIndex": len(ui_state.all_messages[channel_id]) - 1,
"messageIndex": len(globals.all_messages[channel_id]) - 1,
"timestamp": timestamp,
}
@@ -203,14 +229,10 @@ def send_traceroute() -> None:
"""
Sends a RouteDiscovery protobuf to the selected node.
"""
channel_id = ui_state.node_list[ui_state.selected_node]
add_new_message(channel_id, f"{config.message_prefix} Sent Traceroute", "")
r = mesh_pb2.RouteDiscovery()
interface_state.interface.sendData(
globals.interface.sendData(
r,
destinationId=channel_id,
destinationId=globals.node_list[globals.selected_node],
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=on_response_traceroute,

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,6 @@ import sys
from typing import List
from contact.utilities.save_to_radio import save_changes
import contact.ui.default_config as config
from contact.utilities.config_io import config_export, config_import
from contact.utilities.control_utils import parse_ini_file, transform_menu_path
from contact.utilities.input_handlers import (
@@ -21,7 +20,9 @@ from contact.ui.dialog import dialog
from contact.ui.menus import generate_menu_from_protobuf
from contact.ui.nav_utils import move_highlight, draw_arrows, update_help_window
from contact.ui.user_config import json_editor
from contact.utilities.singleton import menu_state
from contact.ui.ui_state import MenuState
menu_state = MenuState()
# Constants
width = 80
@@ -35,17 +36,16 @@ script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# Paths
# locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
locals_dir = os.path.dirname(os.path.abspath(sys.argv[0])) # Current script directory
translation_file = os.path.join(parent_dir, "localisations", "en.ini")
# config_folder = os.path.join(locals_dir, "node-configs")
config_folder = os.path.abspath(config.node_configs_file_path)
config_folder = os.path.join(locals_dir, "node-configs")
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
def display_menu() -> tuple[object, object]: # curses.window or pad types
def display_menu(menu_state: MenuState) -> tuple[object, object]: # curses.window or pad types
min_help_window_height = 6
num_items = len(menu_state.current_menu) + (1 if menu_state.show_save_option else 0)
@@ -106,7 +106,7 @@ def display_menu() -> tuple[object, object]: # curses.window or pad types
)
# Draw help window with dynamically updated max_help_lines
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path)
draw_help_window(start_y, start_x, menu_height, max_help_lines, transformed_path, menu_state)
menu_win.refresh()
menu_pad.refresh(
@@ -132,6 +132,7 @@ def draw_help_window(
menu_height: int,
max_help_lines: int,
transformed_path: List[str],
menu_state: MenuState,
) -> None:
global help_win
@@ -149,15 +150,6 @@ def draw_help_window(
)
def get_input_type_for_field(field) -> type:
if field.type in (field.TYPE_INT32, field.TYPE_UINT32, field.TYPE_INT64):
return int
elif field.type in (field.TYPE_FLOAT, field.TYPE_DOUBLE):
return float
else:
return str
def settings_menu(stdscr: object, interface: object) -> None:
curses.update_lines_cols()
@@ -167,32 +159,29 @@ def settings_menu(stdscr: object, interface: object) -> None:
modified_settings = {}
menu_state.need_redraw = True
need_redraw = True
menu_state.show_save_option = False
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
if need_redraw:
options = list(menu_state.current_menu.keys())
# Determine if save option should be shown
path = menu_state.menu_path
menu_state.show_save_option = (
(len(path) > 2 and ("Radio Settings" in path or "Module Settings" in path))
or (len(path) == 2 and "User Settings" in path)
or (len(path) == 3 and "Channels" in path)
(
len(menu_state.menu_path) > 2
and ("Radio Settings" in menu_state.menu_path or "Module Settings" in menu_state.menu_path)
)
or (len(menu_state.menu_path) == 2 and "User Settings" in menu_state.menu_path)
or (len(menu_state.menu_path) == 3 and "Channels" in menu_state.menu_path)
)
# Display the menu
menu_win, menu_pad = display_menu()
menu_win, menu_pad = display_menu(menu_state)
if menu_win is None:
continue # Skip if menu_win is not initialized
need_redraw = False
menu_win.timeout(200) # wait up to 200 ms for a keypress (or less if key is pressed)
# Capture user input
key = menu_win.getch()
if key == -1:
continue
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
# max_help_lines = 4
@@ -226,7 +215,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
)
elif key == curses.KEY_RESIZE:
menu_state.need_redraw = True
need_redraw = True
curses.update_lines_cols()
menu_win.erase()
@@ -250,7 +239,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
)
elif key == curses.KEY_RIGHT or key == ord("\n"):
menu_state.need_redraw = True
need_redraw = True
menu_state.start_index.append(0)
menu_win.erase()
help_win.erase()
@@ -279,8 +268,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
elif selected_option == "Export Config File":
filename = get_text_input("Enter a filename for the config file", None, None)
filename = get_text_input("Enter a filename for the config file")
if not filename:
logging.info("Export aborted: No filename provided.")
menu_state.start_index.pop()
@@ -302,8 +290,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
with open(yaml_file_path, "w", encoding="utf-8") as file:
file.write(config_text)
logging.info(f"Config file saved to {yaml_file_path}")
dialog("Config File Saved:", yaml_file_path)
menu_state.need_redraw = True
dialog(stdscr, "Config File Saved:", yaml_file_path)
menu_state.start_index.pop()
continue
except PermissionError:
@@ -319,16 +306,14 @@ def settings_menu(stdscr: object, interface: object) -> None:
# Check if folder exists and is not empty
if not os.path.exists(config_folder) or not any(os.listdir(config_folder)):
dialog("", " No config files found. Export a config first.")
menu_state.need_redraw = True
dialog(stdscr, "", " No config files found. Export a config first.")
continue # Return to menu
file_list = [f for f in os.listdir(config_folder) if os.path.isfile(os.path.join(config_folder, f))]
# Ensure file_list is not empty before proceeding
if not file_list:
dialog("", " No config files found. Export a config first.")
menu_state.need_redraw = True
dialog(stdscr, "", " No config files found. Export a config first.")
continue
filename = get_list_input("Choose a config file", None, file_list)
@@ -342,7 +327,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
elif selected_option == "Config URL":
current_value = interface.localNode.getURL()
new_value = get_text_input(f"Config URL is currently: {current_value}", None, str)
new_value = get_text_input(f"Config URL is currently: {current_value}")
if new_value is not None:
current_value = new_value
overwrite = get_list_input(f"Are you sure you want to load this config?", None, ["Yes", "No"])
@@ -395,6 +380,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
menu_state.selected_index = 4
continue
# need_redraw = True
field_info = menu_state.current_menu.get(selected_option)
if isinstance(field_info, tuple):
@@ -409,9 +395,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
if selected_option in ["longName", "shortName", "isLicensed"]:
if selected_option in ["longName", "shortName"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, None
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -430,9 +414,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif selected_option in ["latitude", "longitude", "altitude"]:
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, float
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.current_menu[selected_option] = (field, new_value)
@@ -471,26 +453,17 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.start_index.pop()
elif field.type == 13: # Field type 13 corresponds to UINT32
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else int(new_value)
menu_state.start_index.pop()
elif field.type == 2: # Field type 13 corresponds to INT64
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else float(new_value)
menu_state.start_index.pop()
else: # Handle other field types
input_type = get_input_type_for_field(field)
new_value = get_text_input(
f"{human_readable_name} is currently: {current_value}", selected_option, input_type
)
new_value = get_text_input(f"{human_readable_name} is currently: {current_value}")
new_value = current_value if new_value is None else new_value
menu_state.start_index.pop()
@@ -513,28 +486,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_state.selected_index = 0
elif key == curses.KEY_LEFT:
# If we are at the main menu and there are unsaved changes, prompt to save
if len(menu_state.menu_path) == 3 and modified_settings:
current_section = menu_state.menu_path[-1]
save_prompt = get_list_input(
f"You have unsaved changes in {current_section}. Save before exiting?",
None,
["Yes", "No", "Cancel"],
mandatory=True,
)
if save_prompt == "Cancel":
continue # Stay in the menu without doing anything
elif save_prompt == "Yes":
save_changes(interface, modified_settings, menu_state)
logging.info("Changes Saved")
modified_settings.clear()
menu = rebuild_menu_at_current_path(interface, menu_state)
pass
menu_state.need_redraw = True
need_redraw = True
menu_win.erase()
help_win.erase()
@@ -545,8 +497,8 @@ def settings_menu(stdscr: object, interface: object) -> None:
menu_win.refresh()
help_win.refresh()
# if len(menu_state.menu_path) < 2:
# modified_settings.clear()
if len(menu_state.menu_path) < 2:
modified_settings.clear()
# Navigate back to the previous menu
if len(menu_state.menu_path) > 1:
@@ -563,16 +515,6 @@ def settings_menu(stdscr: object, interface: object) -> None:
break
def rebuild_menu_at_current_path(interface, menu_state):
"""Rebuild menus from the device and re-point current_menu to the same path."""
new_menu = generate_menu_from_protobuf(interface)
cur = new_menu["Main Menu"]
for step in menu_state.menu_path[1:]:
cur = cur.get(step, {})
menu_state.current_menu = cur
return new_menu
def set_region(interface: object) -> None:
node = interface.getNode("^local")
device_config = node.localConfig

View File

@@ -2,69 +2,15 @@ import json
import logging
import os
from typing import Dict
from contact.ui.colors import setup_colors
# Get the parent directory of the script
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
# To test writting to a non-writable directory, you can uncomment the following lines:
# mkdir /tmp/test_nonwritable
# chmod -w /tmp/test_nonwritable
# parent_dir = "/tmp/test_nonwritable"
def reload_config() -> None:
loaded_config = initialize_config()
assign_config_variables(loaded_config)
setup_colors(reinit=True)
def _is_writable_dir(path: str) -> bool:
"""
Return True if we can create & delete a temp file in `path`.
"""
if not os.path.isdir(path):
return False
test_path = os.path.join(path, ".perm_test_tmp")
try:
with open(test_path, "w", encoding="utf-8") as _tmp:
_tmp.write("ok")
os.remove(test_path)
return True
except OSError:
return False
def _get_config_root(preferred_dir: str, fallback_name: str = ".contact_client") -> str:
"""
Choose a writable directory for config artifacts.
"""
if _is_writable_dir(preferred_dir):
return preferred_dir
home = os.path.expanduser("~")
fallback_dir = os.path.join(home, fallback_name)
# Ensure the fallback exists.
os.makedirs(fallback_dir, exist_ok=True)
# If *that* still isn't writable, last-ditch: use a system temp dir.
if not _is_writable_dir(fallback_dir):
import tempfile
fallback_dir = tempfile.mkdtemp(prefix="contact_client_")
return fallback_dir
# Pick the root now.
config_root = _get_config_root(parent_dir)
# Paths (derived from the chosen root)
json_file_path = os.path.join(config_root, "config.json")
log_file_path = os.path.join(config_root, "client.log")
db_file_path = os.path.join(config_root, "client.db")
node_configs_file_path = os.path.join(config_root, "node-configs/")
# Paths
json_file_path = os.path.join(parent_dir, "config.json")
log_file_path = os.path.join(parent_dir, "client.log")
db_file_path = os.path.join(parent_dir, "client.db")
def format_json_single_line_arrays(data: Dict[str, object], indent: int = 4) -> str:
@@ -177,19 +123,15 @@ def initialize_config() -> Dict[str, object]:
"settings_breadcrumbs": ["green", "black"],
"settings_warning": ["green", "black"],
"settings_note": ["green", "black"],
"node_favorite": ["cyan", "green"],
"node_ignored": ["red", "black"],
"node_favorite": ["cyan", "white"],
"node_ignored": ["red", "white"],
}
default_config_variables = {
"channel_list_16ths": "3",
"node_list_16ths": "5",
"db_file_path": db_file_path,
"log_file_path": log_file_path,
"node_configs_file_path": node_configs_file_path,
"message_prefix": ">>",
"sent_message_prefix": ">> Sent",
"notification_symbol": "*",
"notification_sound": "True",
"ack_implicit_str": "[◌]",
"ack_str": "[✓]",
"nak_str": "[x]",
@@ -226,26 +168,20 @@ def initialize_config() -> Dict[str, object]:
def assign_config_variables(loaded_config: Dict[str, object]) -> None:
# Assign values to local variables
global db_file_path, log_file_path, node_configs_file_path, message_prefix, sent_message_prefix
global db_file_path, log_file_path, message_prefix, sent_message_prefix
global notification_symbol, ack_implicit_str, ack_str, nak_str, ack_unknown_str
global node_list_16ths, channel_list_16ths
global theme, COLOR_CONFIG
global node_sort, notification_sound
global node_sort
channel_list_16ths = loaded_config["channel_list_16ths"]
node_list_16ths = loaded_config["node_list_16ths"]
db_file_path = loaded_config["db_file_path"]
log_file_path = loaded_config["log_file_path"]
node_configs_file_path = loaded_config.get("node_configs_file_path")
message_prefix = loaded_config["message_prefix"]
sent_message_prefix = loaded_config["sent_message_prefix"]
notification_symbol = loaded_config["notification_symbol"]
notification_sound = loaded_config["notification_sound"]
ack_implicit_str = loaded_config["ack_implicit_str"]
ack_str = loaded_config["ack_str"]
nak_str = loaded_config["nak_str"]
ack_unknown_str = loaded_config["ack_unknown_str"]
node_sort = loaded_config["node_sort"]
theme = loaded_config["theme"]
if theme == "dark":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_DARK"]
@@ -253,6 +189,7 @@ def assign_config_variables(loaded_config: Dict[str, object]) -> None:
COLOR_CONFIG = loaded_config["COLOR_CONFIG_LIGHT"]
elif theme == "green":
COLOR_CONFIG = loaded_config["COLOR_CONFIG_GREEN"]
node_sort = loaded_config["node_sort"]
# Call the function when the script is imported
@@ -268,7 +205,6 @@ if __name__ == "__main__":
print("\nLoaded Configuration:")
print(f"Database File Path: {db_file_path}")
print(f"Log File Path: {log_file_path}")
print(f"Configs File Path: {node_configs_file_path}")
print(f"Message Prefix: {message_prefix}")
print(f"Sent Message Prefix: {sent_message_prefix}")
print(f"Notification Symbol: {notification_symbol}")

View File

@@ -1,63 +1,44 @@
import curses
from contact.ui.colors import get_color
from contact.utilities.singleton import menu_state, ui_state
def dialog(title: str, message: str) -> None:
"""Display a dialog with a title and message."""
def dialog(stdscr: curses.window, title: str, message: str) -> None:
height, width = stdscr.getmaxyx()
previous_window = ui_state.current_window
ui_state.current_window = 4
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
# Parse message into lines and calculate dimensions
# Calculate dialog dimensions
max_line_lengh = 0
message_lines = message.splitlines()
max_line_length = max(len(l) for l in message_lines)
for l in message_lines:
max_line_length = max(len(l), max_line_lengh)
dialog_width = max(len(title) + 4, max_line_length + 4)
dialog_height = len(message_lines) + 4
x = (width - dialog_width) // 2
y = (height - dialog_height) // 2
def draw_window():
win.erase()
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
win.addstr(0, 2, title, get_color("settings_default"))
for i, line in enumerate(message_lines):
msg_x = (dialog_width - len(line)) // 2
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
ok_text = " Ok "
win.addstr(
dialog_height - 2,
(dialog_width - len(ok_text)) // 2,
ok_text,
get_color("settings_default", reverse=True),
)
win.refresh()
# Create dialog window
win = curses.newwin(dialog_height, dialog_width, y, x)
draw_window()
win.bkgd(get_color("background"))
win.attrset(get_color("window_frame"))
win.border(0)
# Add title
win.addstr(0, 2, title, get_color("settings_default"))
# Add message
for i, l in enumerate(message_lines):
win.addstr(2 + i, 2, l, get_color("settings_default"))
# Add button
win.addstr(dialog_height - 2, (dialog_width - 4) // 2, " Ok ", get_color("settings_default", reverse=True))
# Refresh dialog window
win.refresh()
# Get user input
while True:
win.timeout(200)
char = win.getch()
if menu_state.need_redraw:
menu_state.need_redraw = False
draw_window()
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, Esc
# Close dialog with enter, space, or esc
if char in (curses.KEY_ENTER, 10, 13, 32, 27):
win.erase()
win.refresh()
ui_state.current_window = previous_window
return
if char == -1:
continue

View File

@@ -1,22 +1,8 @@
import curses
import re
from unicodedata import east_asian_width
from contact.ui.colors import get_color
from contact.utilities.control_utils import transform_menu_path
from typing import Any, Optional, List, Dict
from contact.utilities.singleton import interface_state, ui_state
def get_node_color(node_index: int, reverse: bool = False):
node_num = ui_state.node_list[node_index]
node = interface_state.interface.nodesByNum.get(node_num, {})
if node.get("isFavorite"):
return get_color("node_favorite", reverse=reverse)
elif node.get("isIgnored"):
return get_color("node_ignored", reverse=reverse)
return get_color("settings_default", reverse=reverse)
# Aliases
Segment = tuple[str, str, bool, bool]
@@ -142,6 +128,7 @@ def draw_arrows(
win: object, visible_height: int, max_index: int, start_index: List[int], show_save_option: bool
) -> None:
# vh = visible_height + (1 if show_save_option else 0)
mi = max_index - (2 if show_save_option else 0)
if visible_height < mi:
@@ -295,16 +282,9 @@ def get_wrapped_help_text(
return wrapped_help
def text_width(text: str) -> int:
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
def wrap_text(text: str, wrap_width: int) -> List[str]:
"""Wraps text while preserving spaces and breaking long words."""
whitespace = '\t\n\x0b\x0c\r '
whitespace_trans = dict.fromkeys(map(ord, whitespace), ord(' '))
text = text.translate(whitespace_trans)
words = re.findall(r"\S+|\s+", text) # Capture words and spaces separately
wrapped_lines = []
line_buffer = ""
@@ -313,11 +293,11 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
wrap_width -= margin
for word in words:
word_length = text_width(word)
word_length = len(word)
if word_length > wrap_width: # Break long words
if line_buffer:
wrapped_lines.append(line_buffer.strip())
wrapped_lines.append(line_buffer)
line_buffer = ""
line_length = 0
for i in range(0, word_length, wrap_width):
@@ -325,7 +305,7 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
continue
if line_length + word_length > wrap_width and word.strip():
wrapped_lines.append(line_buffer.strip())
wrapped_lines.append(line_buffer)
line_buffer = ""
line_length = 0
@@ -333,94 +313,6 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
line_length += word_length
if line_buffer:
wrapped_lines.append(line_buffer.strip())
wrapped_lines.append(line_buffer)
return wrapped_lines
def move_main_highlight(
old_idx: int, new_idx, options: List[str], menu_win: curses.window, menu_pad: curses.window, ui_state: object
) -> None:
if old_idx == new_idx: # No-op
return
max_index = len(options) - 1
visible_height = menu_win.getmaxyx()[0] - 2
if new_idx < ui_state.start_index[ui_state.current_window]: # Moving above the visible area
ui_state.start_index[ui_state.current_window] = new_idx
elif new_idx >= ui_state.start_index[ui_state.current_window] + visible_height: # Moving below the visible area
ui_state.start_index[ui_state.current_window] = new_idx - visible_height + 1
# Ensure start_index is within bounds
ui_state.start_index[ui_state.current_window] = max(
0, min(ui_state.start_index[ui_state.current_window], max_index - visible_height + 1)
)
highlight_line(menu_win, menu_pad, old_idx, new_idx, visible_height)
if ui_state.current_window == 0: # hack to fix max_index
max_index += 1
draw_main_arrows(menu_win, max_index, window=ui_state.current_window)
menu_win.refresh()
def highlight_line(
menu_win: curses.window, menu_pad: curses.window, old_idx: int, new_idx: int, visible_height: int
) -> None:
if ui_state.current_window == 0:
color_old = (
get_color("channel_selected") if old_idx == ui_state.selected_channel else get_color("channel_list")
)
color_new = get_color("channel_list", reverse=True) if True else get_color("channel_list", reverse=True)
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, color_old)
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, color_new)
elif ui_state.current_window == 2:
menu_pad.chgat(old_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(old_idx))
menu_pad.chgat(new_idx, 1, menu_pad.getmaxyx()[1] - 4, get_node_color(new_idx, reverse=True))
menu_win.refresh()
# Refresh pad only if scrolling is needed
menu_pad.refresh(
ui_state.start_index[ui_state.current_window],
0,
menu_win.getbegyx()[0] + 1,
menu_win.getbegyx()[1] + 1,
menu_win.getbegyx()[0] + visible_height,
menu_win.getbegyx()[1] + menu_win.getmaxyx()[1] - 3,
)
def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None:
height, width = win.getmaxyx()
usable_height = height - 2
usable_width = width - 2
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
if max_index - ui_state.start_index[window] - 1 >= usable_height:
win.addstr(usable_height, usable_width, "", get_color("settings_default"))
else:
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
else:
win.addstr(1, usable_width, " ", get_color("settings_default"))
win.addstr(usable_height, usable_width, " ", get_color("settings_default"))
def get_msg_window_lines(messages_win, packetlog_win) -> None:
packetlog_height = packetlog_win.getmaxyx()[0] - 1 if ui_state.display_log else 0
return messages_win.getmaxyx()[0] - 2 - packetlog_height

View File

@@ -1,44 +1,11 @@
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@dataclass
class MenuState:
menu_index: List[int] = field(default_factory=list)
start_index: List[int] = field(default_factory=lambda: [0])
selected_index: int = 0
current_menu: Union[Dict[str, Any], List[Any], str, int] = field(default_factory=dict)
menu_path: List[str] = field(default_factory=list)
show_save_option: bool = False
need_redraw: bool = False
@dataclass
class ChatUIState:
display_log: bool = False
channel_list: List[str] = field(default_factory=list)
all_messages: Dict[str, List[str]] = field(default_factory=dict)
notifications: List[str] = field(default_factory=list)
packet_buffer: List[str] = field(default_factory=list)
node_list: List[str] = field(default_factory=list)
selected_channel: int = 0
selected_message: int = 0
selected_node: int = 0
current_window: int = 0
last_sent_time: float = 0.0
selected_index: int = 0
start_index: List[int] = field(default_factory=lambda: [0, 0, 0])
show_save_option: bool = False
menu_path: List[str] = field(default_factory=list)
@dataclass
class InterfaceState:
interface: Any = None
myNodeNum: int = 0
@dataclass
class AppState:
lock: Any = None
def __init__(self):
self.menu_index: List[int] = [] # Row we left the previous menus
self.start_index: List[int] = [0] # Row to start the menu if it doesn't all fit
self.selected_index: int = 0 # Selected Row
self.current_menu: Union[Dict[str, Any], List[Any], str, int] = {} # Contents of the current menu
self.menu_path: List[str] = [] # Menu Path
self.show_save_option: bool = False # Display 'Save'

View File

@@ -4,10 +4,9 @@ import curses
from typing import Any, List, Dict
from contact.ui.colors import get_color, setup_colors, COLOR_MAP
import contact.ui.default_config as config
from contact.ui.default_config import format_json_single_line_arrays, loaded_config
from contact.ui.nav_utils import move_highlight, draw_arrows
from contact.utilities.input_handlers import get_list_input
from contact.utilities.singleton import menu_state
width = 80
@@ -54,19 +53,12 @@ def edit_value(key: str, current_value: str) -> str:
# Handle theme selection dynamically
if key == "theme":
# Load theme names dynamically from the JSON
theme_options = [
k.split("_", 2)[2].lower() for k in config.loaded_config.keys() if k.startswith("COLOR_CONFIG")
]
theme_options = [k.split("_", 2)[2].lower() for k in loaded_config.keys() if k.startswith("COLOR_CONFIG")]
return get_list_input("Select Theme", current_value, theme_options)
elif key == "node_sort":
sort_options = ["lastHeard", "name", "hops"]
return get_list_input("Sort By", current_value, sort_options)
elif key == "notification_sound":
sound_options = ["True", "False"]
return get_list_input("Notification Sound", current_value, sound_options)
# Standard Input Mode (Scrollable)
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
curses.curs_set(1)
@@ -75,64 +67,41 @@ def edit_value(key: str, current_value: str) -> str:
user_input = ""
input_position = (7, 13) # Tuple for row and column
row, col = input_position # Unpack tuple
while True:
if menu_state.need_redraw:
curses.update_lines_cols()
menu_state.need_redraw = False
# Re-create the window to fully reset state
edit_win = curses.newwin(height, width, start_y, start_x)
edit_win.timeout(200)
edit_win.bkgd(get_color("background"))
edit_win.attrset(get_color("window_frame"))
edit_win.border()
# Redraw static content
edit_win.addstr(1, 2, f"Editing {key}", get_color("settings_default", bold=True))
edit_win.addstr(3, 2, "Current Value:", get_color("settings_default"))
for i, line in enumerate(wrapped_lines[:4]):
edit_win.addstr(4 + i, 2, line, get_color("settings_default"))
edit_win.addstr(7, 2, "New Value: ", get_color("settings_default"))
visible_text = user_input[scroll_offset : scroll_offset + input_width]
edit_win.addstr(row, col, " " * input_width, get_color("settings_default"))
edit_win.addstr(row, col, visible_text, get_color("settings_default"))
visible_text = user_input[scroll_offset : scroll_offset + input_width] # Only show what fits
edit_win.addstr(row, col, " " * input_width, get_color("settings_default")) # Clear previous text
edit_win.addstr(row, col, visible_text, get_color("settings_default")) # Display text
edit_win.refresh()
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width))
edit_win.move(row, col + min(len(user_input) - scroll_offset, input_width)) # Adjust cursor position
key = edit_win.get_wch()
try:
key = edit_win.get_wch()
except curses.error:
continue # window not ready — skip this loop
if key in (chr(27), curses.KEY_LEFT):
if key in (chr(27), curses.KEY_LEFT): # ESC or Left Arrow
curses.curs_set(0)
return current_value
return current_value # Exit without returning a value
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
break
elif key in (curses.KEY_BACKSPACE, chr(127)):
if user_input:
elif key in (curses.KEY_BACKSPACE, chr(127)): # Backspace
if user_input: # Only process if there's something to delete
user_input = user_input[:-1]
if scroll_offset > 0 and len(user_input) < scroll_offset + input_width:
scroll_offset -= 1
scroll_offset -= 1 # Move back if text is shorter than scrolled area
else:
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
if len(user_input) > input_width:
if len(user_input) > input_width: # Scroll if input exceeds visible area
scroll_offset += 1
curses.curs_set(0)
return user_input if user_input else current_value
def display_menu() -> tuple[Any, Any, List[str]]:
def display_menu(menu_state: Any) -> tuple[Any, Any, List[str]]:
"""
Render the configuration menu with a Save button directly added to the window.
"""
@@ -215,7 +184,6 @@ def display_menu() -> tuple[Any, Any, List[str]]:
def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.selected_index = 0 # Track the selected option
made_changes = False # Track if any changes were made
script_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.abspath(os.path.join(script_dir, os.pardir))
@@ -238,18 +206,16 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
menu_state.current_menu = data # Track the current level of the menu
# Render the menu
menu_win, menu_pad, options = display_menu()
menu_state.need_redraw = True
menu_win, menu_pad, options = display_menu(menu_state)
need_redraw = True
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
menu_win, menu_pad, options = display_menu()
if need_redraw:
menu_win, menu_pad, options = display_menu(menu_state)
menu_win.refresh()
need_redraw = False
max_index = len(options) + (1 if menu_state.show_save_option else 0) - 1
menu_win.timeout(200)
key = menu_win.getch()
if key == curses.KEY_UP:
@@ -277,7 +243,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
elif key in (curses.KEY_RIGHT, 10, 13): # 10 = \n, 13 = carriage return
menu_state.need_redraw = True
need_redraw = True
menu_win.erase()
menu_win.refresh()
@@ -298,14 +264,11 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
if isinstance(selected_data, list) and len(selected_data) == 2:
# Edit color pair
old = selected_data
new_value = edit_color_pair(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.start_index.pop()
menu_state.menu_index.pop()
menu_state.current_menu[selected_key] = new_value
if new_value != old:
made_changes = True
elif isinstance(selected_data, (dict, list)):
# Navigate into nested data
@@ -314,26 +277,22 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# General value editing
old = selected_data
new_value = edit_value(selected_key, selected_data)
menu_state.menu_path.pop()
menu_state.menu_index.pop()
menu_state.start_index.pop()
menu_state.current_menu[selected_key] = new_value
menu_state.need_redraw = True
if new_value != old:
made_changes = True
need_redraw = True
else:
# Save button selected
save_json(file_path, data)
stdscr.refresh()
# config.reload() # This isn't refreshing the file paths as expected
continue
elif key in (27, curses.KEY_LEFT): # Escape or Left Arrow
menu_state.need_redraw = True
need_redraw = True
menu_win.erase()
menu_win.refresh()
@@ -354,19 +313,6 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
else:
# Exit the editor
if made_changes:
save_prompt = get_list_input(
"You have unsaved changes. Save before exiting?",
None,
["Yes", "No", "Cancel"],
mandatory=True,
)
if save_prompt == "Cancel":
continue # Stay in the menu without doing anything
elif save_prompt == "Yes":
save_json(file_path, data)
made_changes = False
menu_win.clear()
menu_win.refresh()
@@ -374,7 +320,7 @@ def json_editor(stdscr: curses.window, menu_state: Any) -> None:
def save_json(file_path: str, data: Dict[str, Any]) -> None:
formatted_json = config.format_json_single_line_arrays(data)
formatted_json = format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
setup_colors(reinit=True)
@@ -383,6 +329,7 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
def main(stdscr: curses.window) -> None:
from contact.ui.ui_state import MenuState
menu_state = MenuState()
if len(menu_state.menu_path) == 0:
menu_state.menu_path = ["App Settings"] # Initialize if not set

View File

@@ -1,6 +1,5 @@
import yaml
import logging
import time
from typing import List
from google.protobuf.json_format import MessageToDict
from meshtastic import mt_config
@@ -134,29 +133,24 @@ def config_import(interface, filename):
logging.info(f"Setting device owner to {configuration['owner']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(configuration["owner"])
time.sleep(0.5)
if "owner_short" in configuration:
logging.info(f"Setting device owner short to {configuration['owner_short']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["owner_short"])
time.sleep(0.5)
if "ownerShort" in configuration:
logging.info(f"Setting device owner short to {configuration['ownerShort']}")
waitForAckNak = True
interface.getNode("^local", False).setOwner(long_name=None, short_name=configuration["ownerShort"])
time.sleep(0.5)
if "channel_url" in configuration:
logging.info(f"Setting channel url to {configuration['channel_url']}")
interface.getNode("^local").setURL(configuration["channel_url"])
time.sleep(0.5)
if "channelUrl" in configuration:
logging.info(f"Setting channel url to {configuration['channelUrl']}")
interface.getNode("^local").setURL(configuration["channelUrl"])
time.sleep(0.5)
if "location" in configuration:
alt = 0
@@ -175,14 +169,12 @@ def config_import(interface, filename):
logging.info(f"Fixing longitude at {lon} degrees")
logging.info("Setting device position")
interface.localNode.setFixedPosition(lat, lon, alt)
time.sleep(0.5)
if "config" in configuration:
localConfig = interface.getNode("^local").localConfig
for section in configuration["config"]:
traverseConfig(section, configuration["config"][section], localConfig)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
if "module_config" in configuration:
moduleConfig = interface.getNode("^local").moduleConfig
@@ -193,7 +185,6 @@ def config_import(interface, filename):
moduleConfig,
)
interface.getNode("^local").writeConfig(camel_to_snake(section))
time.sleep(0.5)
interface.getNode("^local", False).commitSettingsTransaction()
logging.info("Writing modified configuration to device")

View File

@@ -6,14 +6,12 @@ from typing import Optional, Union, Dict
from contact.utilities.utils import decimal_to_hex
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
import contact.globals as globals
def get_table_name(channel: str) -> str:
# Construct the table name
table_name = f"{str(interface_state.myNodeNum)}_{channel}_messages"
table_name = f"{str(globals.myNodeNum)}_{channel}_messages"
quoted_table_name = f'"{table_name}"' # Quote the table name becuase we begin with numerics and contain spaces
return quoted_table_name
@@ -63,7 +61,7 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
message_text = ?
"""
db_cursor.execute(update_query, (ack, str(interface_state.myNodeNum), timestamp, message))
db_cursor.execute(update_query, (ack, str(globals.myNodeNum), timestamp, message))
db_connection.commit()
except sqlite3.Error as e:
@@ -74,13 +72,13 @@ def update_ack_nak(channel: str, timestamp: int, message: str, ack: str) -> None
def load_messages_from_db() -> None:
"""Load messages from the database for all channels and update ui_state.all_messages and ui_state.channel_list."""
"""Load messages from the database for all channels and update globals.all_messages and globals.channel_list."""
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
query = "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?"
db_cursor.execute(query, (f"{str(interface_state.myNodeNum)}_%_messages",))
db_cursor.execute(query, (f"{str(globals.myNodeNum)}_%_messages",))
tables = [row[0] for row in db_cursor.fetchall()]
# Iterate through each table and fetch its messages
@@ -106,24 +104,17 @@ def load_messages_from_db() -> None:
# Convert the channel to an integer if it's numeric, otherwise keep it as a string (nodenum vs channel name)
channel = int(channel) if channel.isdigit() else channel
# Add the channel to ui_state.channel_list if not already present
if channel not in ui_state.channel_list and not is_chat_archived(channel):
ui_state.channel_list.append(channel)
# Add the channel to globals.channel_list if not already present
if channel not in globals.channel_list and not is_chat_archived(channel):
globals.channel_list.append(channel)
# Ensure the channel exists in ui_state.all_messages
if channel not in ui_state.all_messages:
ui_state.all_messages[channel] = []
# Ensure the channel exists in globals.all_messages
if channel not in globals.all_messages:
globals.all_messages[channel] = []
# Add messages to ui_state.all_messages grouped by hourly timestamp
# Add messages to globals.all_messages grouped by hourly timestamp
hourly_messages = {}
for row in db_messages:
user_id, message, timestamp, ack_type = row
# Only ack_type is allowed to be None
if user_id is None or message is None or timestamp is None:
logging.warning(f"Skipping row with NULL required field(s): {row}")
continue
for user_id, message, timestamp, ack_type in db_messages:
hour = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:00")
if hour not in hourly_messages:
hourly_messages[hour] = []
@@ -136,22 +127,20 @@ def load_messages_from_db() -> None:
elif ack_type == "Nak":
ack_str = config.nak_str
if user_id == str(interface_state.myNodeNum):
sanitized_message = message.replace("\x00", "")
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", sanitized_message)
if user_id == str(globals.myNodeNum):
formatted_message = (f"{config.sent_message_prefix}{ack_str}: ", message)
else:
sanitized_message = message.replace("\x00", "")
formatted_message = (
f"{config.message_prefix} {get_name_from_database(int(user_id), 'short')}: ",
sanitized_message,
message,
)
hourly_messages[hour].append(formatted_message)
# Flatten the hourly messages into ui_state.all_messages[channel]
# Flatten the hourly messages into globals.all_messages[channel]
for hour, messages in sorted(hourly_messages.items()):
ui_state.all_messages[channel].append((f"-- {hour} --", ""))
ui_state.all_messages[channel].extend(messages)
globals.all_messages[channel].append((f"-- {hour} --", ""))
globals.all_messages[channel].extend(messages)
except sqlite3.Error as e:
logging.error(f"SQLite error while loading messages from table '{table_name}': {e}")
@@ -164,11 +153,11 @@ def init_nodedb() -> None:
"""Initialize the node database and update it with nodes from the interface."""
try:
if not interface_state.interface.nodes:
if not globals.interface.nodes:
return # No nodes to initialize
ensure_node_table_exists() # Ensure the table exists before insertion
nodes_snapshot = list(interface_state.interface.nodes.values())
nodes_snapshot = list(globals.interface.nodes.values())
# Insert or update all nodes
for node in nodes_snapshot:
@@ -225,7 +214,7 @@ def update_node_info_in_db(
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote in case of numeric names
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote in case of numeric names
table_columns = [i[1] for i in db_cursor.execute(f"PRAGMA table_info({table_name})")]
if "chat_archived" not in table_columns:
@@ -289,7 +278,7 @@ def update_node_info_in_db(
def ensure_node_table_exists() -> None:
"""Ensure the node database table exists."""
table_name = f'"{interface_state.myNodeNum}_nodedb"' # Quote for safety
table_name = f'"{globals.myNodeNum}_nodedb"' # Quote for safety
schema = """
user_id TEXT PRIMARY KEY,
long_name TEXT,
@@ -330,7 +319,7 @@ def get_name_from_database(user_id: int, type: str = "long") -> str:
db_cursor = db_connection.cursor()
# Construct table name
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"' # Quote table name for safety
# Determine the correct column to fetch
@@ -356,7 +345,7 @@ def is_chat_archived(user_id: int) -> int:
try:
with sqlite3.connect(config.db_file_path) as db_connection:
db_cursor = db_connection.cursor()
table_name = f"{str(interface_state.myNodeNum)}_nodedb"
table_name = f"{str(globals.myNodeNum)}_nodedb"
nodeinfo_table = f'"{table_name}"'
query = f"SELECT chat_archived FROM {nodeinfo_table} WHERE user_id = ?"
db_cursor.execute(query, (user_id,))

View File

@@ -6,44 +6,10 @@ from typing import Any, Optional, List
from contact.ui.colors import get_color
from contact.ui.nav_utils import move_highlight, draw_arrows, wrap_text
from contact.ui.dialog import dialog
from contact.utilities.validation_rules import get_validation_for
from contact.utilities.singleton import menu_state
def invalid_input(window: curses.window, message: str, redraw_func: Optional[callable] = None) -> None:
"""Displays an invalid input message in the given window and redraws if needed."""
cursor_y, cursor_x = window.getyx()
curses.curs_set(0)
dialog("Invalid Input", message)
if redraw_func:
redraw_func() # Redraw the original window content that got obscured
else:
window.refresh()
window.move(cursor_y, cursor_x)
curses.curs_set(1)
def get_text_input(prompt: str, selected_config: str, input_type: str) -> Optional[str]:
def get_text_input(prompt: str) -> Optional[str]:
"""Handles user input with wrapped text for long prompts."""
def redraw_input_win():
"""Redraw the input window with the current prompt and user input."""
input_win.erase()
input_win.border()
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
if row >= height - 3:
break
input_win.addstr(row + 1, margin, prompt_text, get_color("settings_default"))
input_win.addstr(row + 1, col_start, user_input[:first_line_width], get_color("settings_default"))
for i, line in enumerate(wrap_text(user_input[first_line_width:], wrap_width=input_width)):
if row + 2 + i < height - 1:
input_win.addstr(row + 2 + i, margin, line[:input_width], get_color("settings_default"))
input_win.refresh()
height = 8
width = 80
margin = 2 # Left and right margin
@@ -54,7 +20,6 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
start_x = (curses.COLS - width) // 2
input_win = curses.newwin(height, width, start_y, start_x)
input_win.timeout(200)
input_win.bkgd(get_color("background"))
input_win.attrset(get_color("window_frame"))
input_win.border()
@@ -62,7 +27,6 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
# Wrap the prompt text
wrapped_prompt = wrap_text(prompt, wrap_width=input_width)
row = 1
for line in wrapped_prompt:
input_win.addstr(row, margin, line[:input_width], get_color("settings_default", bold=True))
row += 1
@@ -75,125 +39,34 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
input_win.refresh()
curses.curs_set(1)
min_value = 0
max_value = 4294967295
min_length = 0
max_length = None
if selected_config is not None:
validation = get_validation_for(selected_config) or {}
min_value = validation.get("min_value", 0)
max_value = validation.get("max_value", 4294967295)
min_length = validation.get("min_length", 0)
max_length = validation.get("max_length")
max_length = 4 if "shortName" in prompt else None
user_input = ""
# Start user input after the prompt text
col_start = margin + len(prompt_text)
first_line_width = input_width - len(prompt_text)
first_line_width = input_width - len(prompt_text) # Available space for first line
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw_input_win()
key = input_win.get_wch() # Waits for user input
try:
key = input_win.get_wch()
except curses.error:
continue
if key == chr(27) or key == curses.KEY_LEFT:
if key == chr(27) or key == curses.KEY_LEFT: # ESC or Left Arrow
input_win.erase()
input_win.refresh()
curses.curs_set(0)
menu_state.need_redraw = True
return None
return None # Exit without saving
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)):
menu_state.need_redraw = True
if not user_input.strip():
invalid_input(input_win, "Value cannot be empty.", redraw_func=redraw_input_win)
continue
length = len(user_input)
if min_length == max_length and max_length is not None:
if length != min_length:
invalid_input(
input_win, f"Value must be exactly {min_length} characters long.", redraw_func=redraw_input_win
)
continue
else:
if length < min_length:
invalid_input(
input_win,
f"Value must be at least {min_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if max_length is not None and length > max_length:
invalid_input(
input_win,
f"Value must be no more than {max_length} characters long.",
redraw_func=redraw_input_win,
)
continue
if input_type is int:
if not user_input.isdigit():
invalid_input(input_win, "Only numeric digits (09) allowed.", redraw_func=redraw_input_win)
continue
int_val = int(user_input)
if not (min_value <= int_val <= max_value):
invalid_input(
input_win, f"Enter a number between {min_value} and {max_value}.", redraw_func=redraw_input_win
)
continue
curses.curs_set(0)
return int_val
elif input_type is float:
try:
float_val = float(user_input)
if not (min_value <= float_val <= max_value):
invalid_input(
input_win,
f"Enter a number between {min_value} and {max_value}.",
redraw_func=redraw_input_win,
)
continue
except ValueError:
invalid_input(input_win, "Must be a valid floating point number.", redraw_func=redraw_input_win)
continue
else:
curses.curs_set(0)
return float_val
else:
break
elif key in (chr(curses.KEY_ENTER), chr(10), chr(13)): # Enter key
break
elif key in (curses.KEY_BACKSPACE, chr(127)): # Handle Backspace
if user_input:
user_input = user_input[:-1] # Remove last character
elif max_length is None or len(user_input) < max_length:
try:
char = chr(key) if not isinstance(key, str) else key
if input_type is int:
if char.isdigit() or (char == "-" and len(user_input) == 0):
user_input += char
elif input_type is float:
if (
char.isdigit()
or (char == "." and "." not in user_input)
or (char == "-" and len(user_input) == 0)
):
user_input += char
else:
user_input += char
except ValueError:
pass # Ignore invalid input
elif max_length is None or len(user_input) < max_length: # Enforce max length
if isinstance(key, str):
user_input += key
else:
user_input += chr(key)
# First line must be manually handled before using wrap_text()
first_line = user_input[:first_line_width] # Cut to max first line width
@@ -222,24 +95,20 @@ def get_text_input(prompt: str, selected_config: str, input_type: str) -> Option
curses.curs_set(0)
input_win.erase()
input_win.refresh()
return user_input.strip()
return user_input
def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
"""Handles user input for editing up to 3 Admin Keys in Base64 format."""
def to_base64(byte_strings):
"""Convert byte values to Base64-encoded strings."""
return [base64.b64encode(b).decode() for b in byte_strings]
def is_valid_base64(s):
"""Check if a string is valid Base64 or blank."""
if s == "":
return True
"""Check if a string is valid Base64."""
try:
decoded = base64.b64decode(s, validate=True)
return len(decoded) == 32 # Ensure it's exactly 32 bytes
except (binascii.Error, ValueError):
except binascii.Error:
return False
cvalue = to_base64(current_value) # Convert current values to Base64
@@ -248,11 +117,10 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
start_y = (curses.LINES - height) // 2
start_x = (curses.COLS - width) // 2
admin_key_win = curses.newwin(height, width, start_y, start_x)
admin_key_win.timeout(200)
admin_key_win.bkgd(get_color("background"))
admin_key_win.attrset(get_color("window_frame"))
admin_key_win.keypad(True) # Enable keypad for special keys
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True) # Enable keypad for special keys
curses.echo()
curses.curs_set(1)
@@ -260,48 +128,46 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
# Editable list of values (max 3 values)
user_values = cvalue[:3] + [""] * (3 - len(cvalue)) # Ensure always 3 fields
cursor_pos = 0 # Track which value is being edited
invalid_input = ""
error_message = ""
while True:
admin_key_win.erase()
admin_key_win.border()
admin_key_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Admin Keys:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " " # Highlight the current line
admin_key_win.addstr(
repeated_win.addstr(
3 + i, 2, f"{prefix}Admin Key {i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
)
admin_key_win.addstr(3 + i, 18, line) # Align text for easier editing
repeated_win.addstr(3 + i, 18, line) # Align text for easier editing
# Move cursor to the correct position inside the field
curses.curs_set(1)
admin_key_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if invalid_input:
admin_key_win.addstr(7, 2, invalid_input, get_color("settings_default", bold=True))
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
admin_key_win.refresh()
key = admin_key_win.getch()
repeated_win.refresh()
key = repeated_win.getch()
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
admin_key_win.erase()
admin_key_win.refresh()
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return None
elif key == ord("\n"): # Enter key to save and return
menu_state.need_redraw = True
if all(is_valid_base64(val) for val in user_values): # Ensure all values are valid Base64 and 32 bytes
curses.noecho()
curses.curs_set(0)
return user_values # Return the edited Base64 values
else:
invalid_input = "Error: Each key must be valid Base64 and 32 bytes long!"
error_message = "Error: Each key must be valid Base64 and 32 bytes long!"
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
@@ -312,14 +178,11 @@ def get_admin_key_input(current_value: List[bytes]) -> Optional[List[str]]:
else:
try:
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
invalid_input = "" # Clear error if user starts fixing input
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
from contact.utilities.singleton import menu_state # Required if not already imported
def get_repeated_input(current_value: List[str]) -> Optional[str]:
height = 9
width = 80
@@ -327,82 +190,71 @@ def get_repeated_input(current_value: List[str]) -> Optional[str]:
start_x = (curses.COLS - width) // 2
repeated_win = curses.newwin(height, width, start_y, start_x)
repeated_win.timeout(200)
repeated_win.bkgd(get_color("background"))
repeated_win.attrset(get_color("window_frame"))
repeated_win.keypad(True)
repeated_win.keypad(True) # Enable keypad for special keys
curses.echo()
curses.curs_set(1)
curses.curs_set(1) # Show the cursor
user_values = current_value[:3] + [""] * (3 - len(current_value)) # Always 3 fields
cursor_pos = 0
invalid_input = ""
# Editable list of values (max 3 values)
user_values = current_value[:3]
cursor_pos = 0 # Track which value is being edited
error_message = ""
def redraw():
while True:
repeated_win.erase()
repeated_win.border()
repeated_win.addstr(1, 2, "Edit up to 3 Values:", get_color("settings_default", bold=True))
# Display current values, allowing editing
for i, line in enumerate(user_values):
prefix = "" if i == cursor_pos else " "
prefix = "" if i == cursor_pos else " " # Highlight the current line
repeated_win.addstr(
3 + i, 2, f"{prefix}Value{i + 1}: ", get_color("settings_default", bold=(i == cursor_pos))
)
repeated_win.addstr(3 + i, 18, line[: width - 20]) # Prevent overflow
repeated_win.addstr(3 + i, 18, line)
if invalid_input:
repeated_win.addstr(7, 2, invalid_input[: width - 4], get_color("settings_default", bold=True))
# Move cursor to the correct position inside the field
curses.curs_set(1)
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos])) # Position cursor at end of text
# Show error message if needed
if error_message:
repeated_win.addstr(7, 2, error_message, get_color("settings_default", bold=True))
repeated_win.move(3 + cursor_pos, 18 + len(user_values[cursor_pos]))
repeated_win.refresh()
key = repeated_win.getch()
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw()
redraw()
try:
key = repeated_win.get_wch()
except curses.error:
continue # ignore timeout or input issues
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow -> Cancel and return original
repeated_win.erase()
repeated_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return None
elif key in ("\n", curses.KEY_ENTER):
elif key == ord("\n"): # Enter key to save and return
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return ", ".join(user_values).strip()
elif key == curses.KEY_UP:
cursor_pos = (cursor_pos - 1) % 3
elif key == curses.KEY_DOWN:
cursor_pos = (cursor_pos + 1) % 3
elif key in (curses.KEY_BACKSPACE, 127):
user_values[cursor_pos] = user_values[cursor_pos][:-1]
return ", ".join(user_values)
elif key == curses.KEY_UP: # Move cursor up
cursor_pos = (cursor_pos - 1) % len(user_values)
elif key == curses.KEY_DOWN: # Move cursor down
cursor_pos = (cursor_pos + 1) % len(user_values)
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
if len(user_values[cursor_pos]) > 0:
user_values[cursor_pos] = user_values[cursor_pos][:-1] # Remove last character
else:
try:
ch = chr(key) if isinstance(key, int) else key
if ch.isprintable():
user_values[cursor_pos] += ch
invalid_input = ""
except Exception:
pass
from contact.utilities.singleton import menu_state # Ensure this is imported
user_values[cursor_pos] += chr(key) # Append valid character input to the selected field
error_message = "" # Clear error if user starts fixing input
except ValueError:
pass # Ignore invalid character inputs
def get_fixed32_input(current_value: int) -> int:
original_value = current_value
ip_string = str(ipaddress.IPv4Address(current_value))
cvalue = current_value
current_value = str(ipaddress.IPv4Address(current_value))
height = 10
width = 80
start_y = (curses.LINES - height) // 2
@@ -412,73 +264,54 @@ def get_fixed32_input(current_value: int) -> int:
fixed32_win.bkgd(get_color("background"))
fixed32_win.attrset(get_color("window_frame"))
fixed32_win.keypad(True)
fixed32_win.timeout(200)
curses.echo()
curses.curs_set(1)
user_input = ""
def redraw():
while True:
fixed32_win.erase()
fixed32_win.border()
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", get_color("settings_default", bold=True))
fixed32_win.addstr(3, 2, f"Current: {ip_string}", get_color("settings_default"))
fixed32_win.addstr(5, 2, f"New value: {user_input}", get_color("settings_default"))
fixed32_win.addstr(1, 2, "Enter an IP address (xxx.xxx.xxx.xxx):", curses.A_BOLD)
fixed32_win.addstr(3, 2, f"Current: {current_value}")
fixed32_win.addstr(5, 2, f"New value: {user_input}")
fixed32_win.refresh()
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw()
key = fixed32_win.getch()
redraw()
try:
key = fixed32_win.get_wch()
except curses.error:
continue # ignore timeout
if key in (27, curses.KEY_LEFT): # ESC or Left Arrow to cancel
if key == 27 or key == curses.KEY_LEFT: # Escape or Left Arrow to cancel
fixed32_win.erase()
fixed32_win.refresh()
curses.noecho()
curses.curs_set(0)
menu_state.need_redraw = True
return original_value
elif key in ("\n", curses.KEY_ENTER):
return cvalue # Return the current value unchanged
elif key == ord("\n"): # Enter key to validate and save
# Validate IP address
octets = user_input.split(".")
menu_state.need_redraw = True
if len(octets) == 4 and all(octet.isdigit() and 0 <= int(octet) <= 255 for octet in octets):
curses.noecho()
curses.curs_set(0)
return int(ipaddress.ip_address(user_input))
fixed32_address = ipaddress.ip_address(user_input)
return int(fixed32_address) # Return the valid IP address
else:
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", get_color("settings_default", bold=True))
fixed32_win.addstr(7, 2, "Invalid IP address. Try again.", curses.A_BOLD | curses.color_pair(5))
fixed32_win.refresh()
curses.napms(1500)
user_input = ""
elif key in (curses.KEY_BACKSPACE, 127):
curses.napms(1500) # Wait for 1.5 seconds before refreshing
user_input = "" # Clear invalid input
elif key == curses.KEY_BACKSPACE or key == 127: # Backspace key
user_input = user_input[:-1]
else:
try:
ch = chr(key) if isinstance(key, int) else key
if ch.isdigit() or ch == ".":
user_input += ch
except Exception:
pass # Ignore unprintable inputs
char = chr(key)
if char.isdigit() or char == ".":
user_input += char # Append only valid characters (digits or dots)
except ValueError:
pass # Ignore invalid inputs
from typing import List, Optional # ensure Optional is imported
def get_list_input(
prompt: str, current_option: Optional[str], list_options: List[str], mandatory: bool = False
) -> Optional[str]:
def get_list_input(prompt: str, current_option: Optional[str], list_options: List[str]) -> Optional[str]:
"""
List selector.
Displays a scrollable list of list_options for the user to choose from.
"""
selected_index = list_options.index(current_option) if current_option in list_options else 0
@@ -488,7 +321,6 @@ def get_list_input(
start_x = (curses.COLS - width) // 2
list_win = curses.newwin(height, width, start_y, start_x)
list_win.timeout(200)
list_win.bkgd(get_color("background"))
list_win.attrset(get_color("window_frame"))
list_win.keypad(True)
@@ -496,62 +328,50 @@ def get_list_input(
list_pad = curses.newpad(len(list_options) + 1, width - 8)
list_pad.bkgd(get_color("background"))
# Render header
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
# Render options on the pad
for idx, color in enumerate(list_options):
if idx == selected_index:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default", reverse=True))
else:
list_pad.addstr(idx, 0, color.ljust(width - 8), get_color("settings_default"))
# Initial refresh
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
max_index = len(list_options) - 1
visible_height = list_win.getmaxyx()[0] - 5
def redraw_list_ui():
list_win.erase()
list_win.border()
list_win.addstr(1, 2, prompt, get_color("settings_default", bold=True))
for idx, item in enumerate(list_options):
color = get_color("settings_default", reverse=(idx == selected_index))
list_pad.addstr(idx, 0, item.ljust(width - 8), color)
list_win.refresh()
list_pad.refresh(
0,
0,
list_win.getbegyx()[0] + 3,
list_win.getbegyx()[1] + 4,
list_win.getbegyx()[0] + list_win.getmaxyx()[0] - 2,
list_win.getbegyx()[1] + list_win.getmaxyx()[1] - 4,
)
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False)
# Initial draw
redraw_list_ui()
draw_arrows(list_win, visible_height, max_index, [0], show_save_option=False) # Initial call to draw arrows
while True:
if menu_state.need_redraw:
menu_state.need_redraw = False
redraw_list_ui()
try:
key = list_win.getch()
except curses.error:
continue
key = list_win.getch()
if key == curses.KEY_UP:
old_selected_index = selected_index
selected_index = max(0, selected_index - 1)
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
elif key == curses.KEY_DOWN:
old_selected_index = selected_index
selected_index = min(len(list_options) - 1, selected_index + 1)
move_highlight(old_selected_index, list_options, list_win, list_pad, selected_index=selected_index)
elif key == ord("\n"): # Enter
elif key == ord("\n"): # Enter key
list_win.clear()
list_win.refresh()
menu_state.need_redraw = True
return list_options[selected_index]
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left
if mandatory:
continue
elif key == 27 or key == curses.KEY_LEFT: # ESC or Left Arrow
list_win.clear()
list_win.refresh()
menu_state.need_redraw = True
return current_option

View File

@@ -1,6 +0,0 @@
from contact.ui.ui_state import ChatUIState, InterfaceState, AppState, MenuState
ui_state = ChatUIState()
interface_state = InterfaceState()
app_state = AppState()
menu_state = MenuState()

View File

@@ -1,18 +1,16 @@
import contact.globals as globals
import datetime
import time
from meshtastic.protobuf import config_pb2
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state
def get_channels():
"""Retrieve channels from the node and update ui_state.channel_list and ui_state.all_messages."""
node = interface_state.interface.getNode("^local")
"""Retrieve channels from the node and update globals.channel_list and globals.all_messages."""
node = globals.interface.getNode("^local")
device_channels = node.channels
# Clear and rebuild channel list
# ui_state.channel_list = []
# globals.channel_list = []
for device_channel in device_channels:
if device_channel.role:
@@ -28,20 +26,20 @@ def get_channels():
].name
channel_name = convert_to_camel_case(modem_preset_string)
# Add channel to ui_state.channel_list if not already present
if channel_name not in ui_state.channel_list:
ui_state.channel_list.append(channel_name)
# Add channel to globals.channel_list if not already present
if channel_name not in globals.channel_list:
globals.channel_list.append(channel_name)
# Initialize ui_state.all_messages[channel_name] if it doesn't exist
if channel_name not in ui_state.all_messages:
ui_state.all_messages[channel_name] = []
# Initialize globals.all_messages[channel_name] if it doesn't exist
if channel_name not in globals.all_messages:
globals.all_messages[channel_name] = []
return ui_state.channel_list
return globals.channel_list
def get_node_list():
if interface_state.interface.nodes:
my_node_num = interface_state.myNodeNum
if globals.interface.nodes:
my_node_num = globals.myNodeNum
def node_sort(node):
if config.node_sort == "lastHeard":
@@ -53,7 +51,7 @@ def get_node_list():
else:
return node
sorted_nodes = sorted(interface_state.interface.nodes.values(), key=node_sort)
sorted_nodes = sorted(globals.interface.nodes.values(), key=node_sort)
# Move favorite nodes to the beginning
sorted_nodes = sorted(
@@ -70,14 +68,14 @@ def get_node_list():
def refresh_node_list():
new_node_list = get_node_list()
if new_node_list != ui_state.node_list:
ui_state.node_list = new_node_list
if new_node_list != globals.node_list:
globals.node_list = new_node_list
return True
return False
def get_nodeNum():
myinfo = interface_state.interface.getMyNodeInfo()
myinfo = globals.interface.getMyNodeInfo()
myNodeNum = myinfo["num"]
return myNodeNum
@@ -135,31 +133,3 @@ def get_time_ago(timestamp):
if unit != "s":
return f"{value} {unit} ago"
return "now"
def add_new_message(channel_id, prefix, message):
if channel_id not in ui_state.all_messages:
ui_state.all_messages[channel_id] = []
# Timestamp handling
current_timestamp = time.time()
current_hour = datetime.datetime.fromtimestamp(current_timestamp).strftime("%Y-%m-%d %H:00")
# Retrieve the last timestamp if available
channel_messages = ui_state.all_messages[channel_id]
if channel_messages:
# Check the last entry for a timestamp
for entry in reversed(channel_messages):
if entry[0].startswith("--"):
last_hour = entry[0].strip("- ").strip()
break
else:
last_hour = None
else:
last_hour = None
# Add a new timestamp if it's a new hour
if last_hour != current_hour:
ui_state.all_messages[channel_id].append((f"-- {current_hour} --", ""))
# Add the message
ui_state.all_messages[channel_id].append((prefix,message))

View File

@@ -1,23 +0,0 @@
validation_rules = {
"shortName": {"max_length": 4},
"longName": {"max_length": 32},
"fixed_pin": {"min_length": 6, "max_length": 6},
"position_flags": {"max_length": 3},
"enabled_protocols": {"max_value": 2},
"hop_limit": {"max_value": 7},
"latitude": {"min_value": -90, "max_value": 90},
"longitude": {"min_value": -180, "max_value": 180},
"altitude": {"min_value": -4294967295, "max_value": 4294967295},
"red": {"max_value": 255},
"green": {"max_value": 255},
"blue": {"max_value": 255},
"current": {"max_value": 255},
"position_precision": {"max_value": 32},
}
def get_validation_for(key: str) -> dict:
for rule_key, config in validation_rules.items():
if rule_key in key:
return config
return {}

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.3.16"
version = "1.3.8"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}