Compare commits

..

1 Commits
tests ... queue

Author SHA1 Message Date
pdxlocations
d2e559ed04 Implement receive queue and UI shutdown handling in app state 2026-02-20 21:47:19 -07:00
31 changed files with 285 additions and 1606 deletions

View File

@@ -15,6 +15,7 @@ import curses
import io import io
import logging import logging
import os import os
import queue
import subprocess import subprocess
import sys import sys
import threading import threading
@@ -32,7 +33,6 @@ from contact.ui.contact_ui import main_ui
from contact.ui.splash import draw_splash from contact.ui.splash import draw_splash
from contact.utilities.arg_parser import setup_parser from contact.utilities.arg_parser import setup_parser
from contact.utilities.db_handler import init_nodedb, load_messages_from_db from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.demo_data import build_demo_interface, configure_demo_database, seed_demo_messages
from contact.utilities.input_handlers import get_list_input from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t from contact.utilities.i18n import t
from contact.ui.dialog import dialog from contact.ui.dialog import dialog
@@ -55,6 +55,8 @@ logging.basicConfig(
) )
app_state.lock = threading.Lock() app_state.lock = threading.Lock()
app_state.rx_queue = queue.SimpleQueue()
app_state.ui_shutdown = False
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@@ -69,18 +71,9 @@ def prompt_region_if_unset(args: object) -> None:
interface_state.interface = initialize_interface(args) interface_state.interface = initialize_interface(args)
def initialize_globals(seed_demo: bool = False) -> None: def initialize_globals() -> None:
"""Initializes interface and shared globals.""" """Initializes interface and shared globals."""
ui_state.channel_list = []
ui_state.all_messages = {}
ui_state.notifications = []
ui_state.packet_buffer = []
ui_state.node_list = []
ui_state.selected_channel = 0
ui_state.selected_message = 0
ui_state.selected_node = 0
ui_state.start_index = [0, 0, 0]
interface_state.myNodeNum = get_nodeNum() interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels() ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list() ui_state.node_list = get_node_list()
@@ -88,18 +81,9 @@ def initialize_globals(seed_demo: bool = False) -> None:
pub.subscribe(on_receive, "meshtastic.receive") pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb() init_nodedb()
if seed_demo:
seed_demo_messages()
load_messages_from_db() load_messages_from_db()
def initialize_runtime_interface(args: object):
if getattr(args, "demo_screenshot", False):
configure_demo_database()
return build_demo_interface()
return initialize_interface(args)
def main(stdscr: curses.window) -> None: def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI.""" """Main entry point for the curses UI."""
@@ -117,12 +101,12 @@ def main(stdscr: curses.window) -> None:
logging.info("Initializing interface...") logging.info("Initializing interface...")
with app_state.lock: with app_state.lock:
interface_state.interface = initialize_runtime_interface(args) interface_state.interface = initialize_interface(args)
if not getattr(args, "demo_screenshot", False) and interface_state.interface.localNode.localConfig.lora.region == 0: if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args) prompt_region_if_unset(args)
initialize_globals(seed_demo=getattr(args, "demo_screenshot", False)) initialize_globals()
logging.info("Starting main UI") logging.info("Starting main UI")
stdscr.clear() stdscr.clear()
@@ -168,11 +152,10 @@ def start() -> None:
sys.exit(0) sys.exit(0)
try: try:
app_state.ui_shutdown = False
curses.wrapper(main) curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("User exited with Ctrl+C") logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
logging.critical("Fatal error", exc_info=True) logging.critical("Fatal error", exc_info=True)
@@ -183,6 +166,13 @@ def start() -> None:
print("Fatal error:", e) print("Fatal error:", e)
traceback.print_exc() traceback.print_exc()
sys.exit(1) sys.exit(1)
finally:
app_state.ui_shutdown = True
try:
if interface_state.interface is not None:
interface_state.interface.close()
except Exception:
logging.exception("Error while closing interface")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -77,7 +77,7 @@ help.node_info, "F5 = Full node info", ""
help.archive_chat, "Ctrl+D = Archive chat / remove node", "" help.archive_chat, "Ctrl+D = Archive chat / remove node", ""
help.favorite, "Ctrl+F = Favorite", "" help.favorite, "Ctrl+F = Favorite", ""
help.ignore, "Ctrl+G = Ignore", "" help.ignore, "Ctrl+G = Ignore", ""
help.search, "Ctrl+/ or / = Search", "" help.search, "Ctrl+/ = Search", ""
help.help, "Ctrl+K = Help", "" help.help, "Ctrl+K = Help", ""
help.no_help, "No help available.", "" help.no_help, "No help available.", ""
confirm.remove_from_nodedb, "Remove {name} from nodedb?", "" confirm.remove_from_nodedb, "Remove {name} from nodedb?", ""

View File

@@ -77,7 +77,7 @@ help.node_info, "F5 = Полная информация об узле", ""
help.archive_chat, "Ctrl+D = Архив чата / удалить узел", "" help.archive_chat, "Ctrl+D = Архив чата / удалить узел", ""
help.favorite, "Ctrl+F = Избранное", "" help.favorite, "Ctrl+F = Избранное", ""
help.ignore, "Ctrl+G = Игнорировать", "" help.ignore, "Ctrl+G = Игнорировать", ""
help.search, "Ctrl+/ или / = Поиск", "" help.search, "Ctrl+/ = Поиск", ""
help.help, "Ctrl+K = Справка", "" help.help, "Ctrl+K = Справка", ""
help.no_help, "Нет справки.", "" help.no_help, "Нет справки.", ""
confirm.remove_from_nodedb, "Удалить {name} из базы узлов?", "" confirm.remove_from_nodedb, "Удалить {name} из базы узлов?", ""

View File

@@ -2,30 +2,36 @@ import logging
import os import os
import platform import platform
import shutil import shutil
import time
import subprocess import subprocess
import threading import threading
from typing import Any, Dict, Optional import time
# Debounce notification sounds so a burst of queued messages only plays once. from typing import Any, Dict
import contact.ui.default_config as config
from contact.utilities.db_handler import (
get_name_from_database,
maybe_store_nodeinfo_in_db,
save_message_to_db,
update_node_info_in_db,
)
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
from contact.utilities.utils import add_new_message, refresh_node_list
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8 _SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: Optional[threading.Timer] = None _sound_timer: threading.Timer | None = None
_sound_timer_lock = threading.Lock() _sound_timer_lock = threading.Lock()
_last_sound_request = 0.0 _last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period. """Schedule a notification sound after a short quiet period."""
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
global _sound_timer, _last_sound_request global _sound_timer, _last_sound_request
now = time.monotonic() now = time.monotonic()
with _sound_timer_lock: with _sound_timer_lock:
_last_sound_request = now _last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None: if _sound_timer is not None:
try: try:
_sound_timer.cancel() _sound_timer.cancel()
@@ -34,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = None _sound_timer = None
def _fire(expected_request_time: float) -> None: def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock: with _sound_timer_lock:
if expected_request_time != _last_sound_request: if expected_request_time != _last_sound_request:
return return
@@ -43,42 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,)) _sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True _sound_timer.daemon = True
_sound_timer.start() _sound_timer.start()
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound(): def play_sound() -> None:
try: try:
system = platform.system() system = platform.system()
sound_path = None sound_path = None
executable = None executable = None
if system == "Darwin": # macOS if system == "Darwin":
sound_path = "/System/Library/Sounds/Ping.aiff" sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay" executable = "afplay"
elif system == "Linux": elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga" ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
if shutil.which("paplay") and os.path.exists(ogg_path): if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay" executable = "paplay"
sound_path = ogg_path sound_path = ogg_path
@@ -95,102 +78,127 @@ def play_sound():
cmd = [executable, sound_path] cmd = [executable, sound_path]
if executable == "ffplay": if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path] cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return except subprocess.CalledProcessError as exc:
logging.error("Sound playback failed: %s", exc)
except Exception as exc:
logging.error("Unexpected error while playing sound: %s", exc)
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}") def _decode_message_payload(payload: Any) -> str:
except Exception as e: if isinstance(payload, bytes):
logging.error(f"Unexpected error: {e}") return payload.decode("utf-8", errors="replace")
if isinstance(payload, str):
return payload
return str(payload)
def process_receive_event(packet: Dict[str, Any]) -> None:
"""Process a queued packet on the UI thread and perform all UI updates."""
# Local import prevents module-level circular import.
from contact.ui.contact_ui import (
add_notification,
draw_channel_list,
draw_messages_window,
draw_node_list,
draw_packetlog_win,
)
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return
changed = refresh_node_list()
if changed:
draw_node_list()
portnum = decoded.get("portnum")
if portnum == "NODEINFO_APP":
user = decoded.get("user")
if isinstance(user, dict) and "longName" in user:
maybe_store_nodeinfo_in_db(packet)
return
if portnum != "TEXT_MESSAGE_APP":
return
hop_start = packet.get("hopStart", 0)
hop_limit = packet.get("hopLimit", 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_string = _decode_message_payload(decoded.get("payload"))
if not ui_state.channel_list:
return
refresh_channels = False
refresh_messages = False
channel_number = packet.get("channel", 0)
if not isinstance(channel_number, int):
channel_number = 0
if channel_number < 0:
channel_number = 0
packet_from = packet.get("from")
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
if packet_from not in ui_state.channel_list:
ui_state.channel_list.append(packet_from)
if packet_from not in ui_state.all_messages:
ui_state.all_messages[packet_from] = []
update_node_info_in_db(packet_from, chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet_from)
if channel_number >= len(ui_state.channel_list):
channel_number = 0
channel_id = ui_state.channel_list[channel_number]
if ui_state.selected_channel >= len(ui_state.channel_list):
ui_state.selected_channel = 0
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
if packet_from is None:
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
return
message_from_string = get_name_from_database(packet_from, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, packet_from, message_string)
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def on_receive(packet: Dict[str, Any], interface: Any) -> None:
""" """Enqueue packet to be processed on the main curses thread."""
Handles an incoming packet from a Meshtastic interface. if app_state.ui_shutdown:
return
Args: if not isinstance(packet, dict):
packet: The received Meshtastic packet as a dictionary. return
interface: The Meshtastic interface instance that received the packet. try:
""" app_state.rx_queue.put(packet)
with app_state.lock: except Exception:
# Update packet log logging.exception("Failed to enqueue packet for UI processing")
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
try:
if "decoded" not in packet:
return
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list()
if changed:
draw_node_list()
if packet["decoded"]["portnum"] == "NODEINFO_APP":
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0)
hop_limit = packet.get('hopLimit', 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
refresh_channels = False
refresh_messages = False
if packet.get("channel"):
channel_number = packet["channel"]
else:
channel_number = 0
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
# Add received message to the messages list
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,9 +1,11 @@
import curses import curses
import logging import logging
from queue import Empty
import time import time
import traceback import traceback
from typing import Union from typing import Union
from contact.message_handlers.rx_handler import process_receive_event
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute from contact.message_handlers.tx_handler import send_message, send_traceroute
@@ -12,17 +14,14 @@ from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t from contact.utilities.i18n import t
from contact.utilities.emoji_utils import normalize_message_text
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.ui.dialog import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
MIN_COL = 1 # "effectively zero" without breaking curses MIN_COL = 1 # "effectively zero" without breaking curses
RESIZE_DEBOUNCE_MS = 250
root_win = None root_win = None
nodes_pad = None
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes). # Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
@@ -65,72 +64,6 @@ def paint_frame(win, selected: bool) -> None:
win.refresh() win.refresh()
def get_channel_row_color(index: int) -> int:
if index == ui_state.selected_channel:
if ui_state.current_window == 0:
return get_color("channel_list", reverse=True)
return get_color("channel_selected")
return get_color("channel_list")
def get_node_row_color(index: int, highlight: bool = False) -> int:
node_num = ui_state.node_list[index]
node = interface_state.interface.nodesByNum.get(node_num, {})
color = "node_list"
if node.get("isFavorite"):
color = "node_favorite"
if node.get("isIgnored"):
color = "node_ignored"
reverse = index == ui_state.selected_node and (ui_state.current_window == 2 or highlight)
return get_color(color, reverse=reverse)
def refresh_node_selection(old_index: int = -1, highlight: bool = False) -> None:
if nodes_pad is None or not ui_state.node_list:
return
width = max(0, nodes_pad.getmaxyx()[1] - 4)
if 0 <= old_index < len(ui_state.node_list):
try:
nodes_pad.chgat(old_index, 1, width, get_node_row_color(old_index, highlight=highlight))
except curses.error:
pass
if 0 <= ui_state.selected_node < len(ui_state.node_list):
try:
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node, highlight=highlight))
except curses.error:
pass
ui_state.start_index[2] = max(0, ui_state.selected_node - (nodes_win.getmaxyx()[0] - 3))
refresh_pad(2)
draw_window_arrows(2)
def refresh_main_window(window_id: int, selected: bool) -> None:
if window_id == 0:
paint_frame(channel_win, selected=selected)
if ui_state.channel_list:
width = max(0, channel_pad.getmaxyx()[1] - 4)
channel_pad.chgat(ui_state.selected_channel, 1, width, get_channel_row_color(ui_state.selected_channel))
refresh_pad(0)
elif window_id == 1:
paint_frame(messages_win, selected=selected)
refresh_pad(1)
elif window_id == 2:
paint_frame(nodes_win, selected=selected)
if ui_state.node_list and nodes_pad is not None:
width = max(0, nodes_pad.getmaxyx()[1] - 4)
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node))
refresh_pad(2)
def get_node_display_name(node_num: int, node: dict) -> str:
user = node.get("user") or {}
return user.get("longName") or get_name_from_database(node_num, "long")
def handle_resize(stdscr: curses.window, firstrun: bool) -> None: def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly.""" """Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
@@ -230,22 +163,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass pass
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]: def drain_receive_queue(max_events: int = 200) -> None:
"""Wait for resize events to settle and preserve one queued non-resize key.""" processed = 0
input_win.timeout(RESIZE_DEBOUNCE_MS) while processed < max_events:
try: try:
while True: packet = app_state.rx_queue.get(block=False)
try: except Empty:
next_char = input_win.get_wch() return
except curses.error: except Exception:
return None logging.exception("Error while draining receive queue")
return
if next_char == curses.KEY_RESIZE: try:
continue process_receive_event(packet)
except Exception:
return next_char logging.exception("Error while processing receive event")
finally: processed += 1
input_win.timeout(-1)
def main_ui(stdscr: curses.window) -> None: def main_ui(stdscr: curses.window) -> None:
@@ -255,20 +188,20 @@ def main_ui(stdscr: curses.window) -> None:
root_win = stdscr root_win = stdscr
input_text = "" input_text = ""
queued_char = None
stdscr.keypad(True) stdscr.keypad(True)
get_channels() get_channels()
handle_resize(stdscr, True) handle_resize(stdscr, True)
entry_win.timeout(75)
while True: while True:
drain_receive_queue()
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window # Get user input from entry window
if queued_char is None: try:
char = entry_win.get_wch() char = entry_win.get_wch()
else: except curses.error:
char = queued_char continue
queued_char = None
# draw_debug(f"Keypress: {char}") # draw_debug(f"Keypress: {char}")
@@ -316,16 +249,13 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE: elif char == curses.KEY_RESIZE:
input_text = "" input_text = ""
queued_char = drain_resize_events(entry_win)
handle_resize(stdscr, False) handle_resize(stdscr, False)
continue entry_win.timeout(75)
elif char == chr(4): # Ctrl + D to delete current channel or node elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d() handle_ctrl_d()
elif char == chr(31) or ( elif char == chr(31): # Ctrl + / to search
char == "/" and not input_text and ui_state.current_window in (0, 2)
): # Ctrl + / or / to search in channel/node lists
handle_ctrl_fslash() handle_ctrl_fslash()
elif char == chr(11): # Ctrl + K for Help elif char == chr(11): # Ctrl + K for Help
@@ -429,16 +359,31 @@ def handle_leftright(char: int) -> None:
delta = -1 if char == curses.KEY_LEFT else 1 delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3 ui_state.current_window = (ui_state.current_window + delta) % 3
if ui_state.single_pane_mode: handle_resize(root_win, False)
handle_resize(root_win, False)
return
refresh_main_window(old_window, selected=False) if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
if old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode: if not ui_state.single_pane_mode:
draw_window_arrows(old_window) draw_window_arrows(old_window)
refresh_main_window(ui_state.current_window, selected=True) if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window) draw_window_arrows(ui_state.current_window)
@@ -459,16 +404,31 @@ def handle_function_keys(char: int) -> None:
return return
ui_state.current_window = target ui_state.current_window = target
if ui_state.single_pane_mode: handle_resize(root_win, False)
handle_resize(root_win, False)
return
refresh_main_window(old_window, selected=False) if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
elif old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode: if not ui_state.single_pane_mode:
draw_window_arrows(old_window) draw_window_arrows(old_window)
refresh_main_window(ui_state.current_window, selected=True) if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window) draw_window_arrows(ui_state.current_window)
@@ -520,34 +480,34 @@ def handle_enter(input_text: str) -> str:
def handle_f5_key(stdscr: curses.window) -> None: def handle_f5_key(stdscr: curses.window) -> None:
if not ui_state.node_list: node = None
return try:
def build_node_details() -> tuple[str, list[str]]:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]] node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
message_parts = [] message_parts = []
message_parts.append("**📋 Basic Information:**") message_parts.append("**📋 Basic Information:**")
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}") message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}") message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}") message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
message_parts.append(f"• Role: {node.get('user', {}).get('role', 'Unknown')}")
message_parts.append(f"Public key: {node.get('user', {}).get('publicKey')}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
role = f"{node.get('user', {}).get('role', 'Unknown')}"
message_parts.append(f"• Role: {role}")
pk = f"{node.get('user', {}).get('publicKey')}"
message_parts.append(f"Public key: {pk}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
if "position" in node: if "position" in node:
pos = node["position"] pos = node["position"]
has_coords = pos.get("latitude") and pos.get("longitude") if pos.get("latitude") and pos.get("longitude"):
if has_coords:
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}") message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
if pos.get("altitude"): if pos.get("altitude"):
message_parts.append(f"• Altitude: {pos['altitude']}m") message_parts.append(f"• Altitude: {pos['altitude']}m")
if has_coords: message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]): if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
message_parts.append("") message_parts.append("\n**🌐 Network Metrics:**")
message_parts.append("**🌐 Network Metrics:**")
if "snr" in node: if "snr" in node:
snr = node["snr"] snr = node["snr"]
@@ -567,13 +527,12 @@ def handle_f5_key(stdscr: curses.window) -> None:
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else "" hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else ""
message_parts.append(f"• Hops away: {hop_emoji} {hops}") message_parts.append(f"• Hops away: {hop_emoji} {hops}")
if node.get("lastHeard"): if "lastHeard" in node and node["lastHeard"]:
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}") message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
if node.get("deviceMetrics"): if node.get("deviceMetrics"):
metrics = node["deviceMetrics"] metrics = node["deviceMetrics"]
message_parts.append("") message_parts.append("\n**📊 Device Metrics:**")
message_parts.append("**📊 Device Metrics:**")
if "batteryLevel" in metrics: if "batteryLevel" in metrics:
battery = metrics["batteryLevel"] battery = metrics["batteryLevel"]
@@ -594,128 +553,21 @@ def handle_f5_key(stdscr: curses.window) -> None:
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢" air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%") message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
title = t( message = "\n".join(message_parts)
"ui.dialog.node_details_title",
default="📡 Node Details: {name}", contact.ui.dialog.dialog(
name=node.get("user", {}).get("shortName", "Unknown"), t(
"ui.dialog.node_details_title",
default="📡 Node Details: {name}",
name=node.get("user", {}).get("shortName", "Unknown"),
),
message,
) )
return title, message_parts curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
previous_window = ui_state.current_window
ui_state.current_window = 4
scroll_offset = 0
dialog_win = None
curses.curs_set(0)
refresh_node_selection(highlight=True)
try:
while True:
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
title, message_lines = build_node_details()
max_line_length = max(len(title), *(len(line) for line in message_lines))
dialog_width = min(max(max_line_length + 4, 20), max(10, width - 2))
dialog_height = min(max(len(message_lines) + 4, 6), max(6, height - 2))
x = max(0, (width - dialog_width) // 2)
y = max(0, (height - dialog_height) // 2)
viewport_h = max(1, dialog_height - 4)
max_scroll = max(0, len(message_lines) - viewport_h)
scroll_offset = max(0, min(scroll_offset, max_scroll))
if dialog_win is None:
dialog_win = curses.newwin(dialog_height, dialog_width, y, x)
else:
dialog_win.erase()
dialog_win.refresh()
dialog_win.resize(dialog_height, dialog_width)
dialog_win.mvwin(y, x)
dialog_win.keypad(True)
dialog_win.bkgd(get_color("background"))
dialog_win.attrset(get_color("window_frame"))
dialog_win.border(0)
try:
dialog_win.addstr(0, 2, title[: max(0, dialog_width - 4)], get_color("settings_default"))
hint = f" {ui_state.selected_node + 1}/{len(ui_state.node_list)} "
dialog_win.addstr(0, max(2, dialog_width - len(hint) - 2), hint, get_color("commands"))
except curses.error:
pass
msg_win = dialog_win.derwin(viewport_h + 2, dialog_width - 2, 1, 1)
msg_win.erase()
for row, line in enumerate(message_lines[scroll_offset : scroll_offset + viewport_h], start=1):
trimmed = line[: max(0, dialog_width - 6)]
try:
msg_win.addstr(row, 1, trimmed, get_color("settings_default"))
except curses.error:
pass
if len(message_lines) > viewport_h:
old_index = ui_state.start_index[4] if len(ui_state.start_index) > 4 else 0
while len(ui_state.start_index) <= 4:
ui_state.start_index.append(0)
ui_state.start_index[4] = scroll_offset
draw_main_arrows(msg_win, len(message_lines) - 1, window=4)
ui_state.start_index[4] = old_index
try:
ok_text = " Up/Down: Nodes PgUp/PgDn: Scroll Esc: Close "
dialog_win.addstr(
dialog_height - 2,
max(1, (dialog_width - len(ok_text)) // 2),
ok_text[: max(0, dialog_width - 2)],
get_color("settings_default", reverse=True),
)
except curses.error:
pass
dialog_win.refresh()
msg_win.noutrefresh()
curses.doupdate()
dialog_win.timeout(200)
char = dialog_win.getch()
if menu_state.need_redraw:
menu_state.need_redraw = False
continue
if char in (27, curses.KEY_LEFT, curses.KEY_ENTER, 10, 13, 32):
break
if char == curses.KEY_UP:
old_selected_node = ui_state.selected_node
ui_state.selected_node = (ui_state.selected_node - 1) % len(ui_state.node_list)
scroll_offset = 0
refresh_node_selection(old_selected_node, highlight=True)
elif char == curses.KEY_DOWN:
old_selected_node = ui_state.selected_node
ui_state.selected_node = (ui_state.selected_node + 1) % len(ui_state.node_list)
scroll_offset = 0
refresh_node_selection(old_selected_node, highlight=True)
elif char == curses.KEY_PPAGE:
scroll_offset = max(0, scroll_offset - viewport_h)
elif char == curses.KEY_NPAGE:
scroll_offset = min(max_scroll, scroll_offset + viewport_h)
elif char == curses.KEY_HOME:
scroll_offset = 0
elif char == curses.KEY_END:
scroll_offset = max_scroll
elif char == curses.KEY_RESIZE:
continue
except KeyError: except KeyError:
return return
finally:
if dialog_win is not None:
dialog_win.erase()
dialog_win.refresh()
ui_state.current_window = previous_window
curses.curs_set(1)
handle_resize(stdscr, False)
def handle_ctrl_t(stdscr: curses.window) -> None: def handle_ctrl_t(stdscr: curses.window) -> None:
@@ -772,7 +624,6 @@ def handle_backtick(stdscr: curses.window) -> None:
ui_state.current_window = 4 ui_state.current_window = 4
settings_menu(stdscr, interface_state.interface) settings_menu(stdscr, interface_state.interface)
ui_state.current_window = previous_window ui_state.current_window = previous_window
ui_state.single_pane_mode = config.single_pane_mode.lower() == "true"
curses.curs_set(1) curses.curs_set(1)
refresh_node_list() refresh_node_list()
handle_resize(stdscr, False) handle_resize(stdscr, False)
@@ -1014,7 +865,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0 row = 0
for prefix, message in messages: for prefix, message in messages:
full_message = normalize_message_text(f"{prefix}{message}") full_message = f"{prefix}{message}"
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2) wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines) msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1]) messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
@@ -1056,8 +907,10 @@ def draw_node_list() -> None:
if ui_state.current_window != 2 and ui_state.single_pane_mode: if ui_state.current_window != 2 and ui_state.single_pane_mode:
return return
if nodes_pad is None: # This didn't work, for some reason an error is thown on startup, so we just create the pad every time
nodes_pad = curses.newpad(1, 1) # if nodes_pad is None:
# nodes_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1, 1)
try: try:
nodes_pad.erase() nodes_pad.erase()
@@ -1071,12 +924,28 @@ def draw_node_list() -> None:
node = interface_state.interface.nodesByNum[node_num] node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"] secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
status_icon = "🔐" if secure else "🔓" status_icon = "🔐" if secure else "🔓"
node_name = get_node_display_name(node_num, node) node_name = get_name_from_database(node_num, "long")
user_name = node["user"]["shortName"]
uptime_str = ""
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]:
uptime_str = f" / Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
last_heard_str = f"{get_time_ago(node['lastHeard'])}" if node.get("lastHeard") else ""
hops_str = f" ■ Hops: {node['hopsAway']}" if "hopsAway" in node else ""
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
# Future node name custom formatting possible # Future node name custom formatting possible
node_str = f"{status_icon} {node_name}" node_str = f"{status_icon} {node_name}"
node_str = node_str.ljust(box_width - 4)[: box_width - 2] node_str = node_str.ljust(box_width - 4)[: box_width - 2]
nodes_pad.addstr(i, 1, node_str, get_node_row_color(i)) color = "node_list"
if "isFavorite" in node and node["isFavorite"]:
color = "node_favorite"
if "isIgnored" in node and node["isIgnored"]:
color = "node_ignored"
nodes_pad.addstr(
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
)
paint_frame(nodes_win, selected=(ui_state.current_window == 2)) paint_frame(nodes_win, selected=(ui_state.current_window == 2))
nodes_win.refresh() nodes_win.refresh()

View File

@@ -56,13 +56,6 @@ config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations # Load translations
field_mapping, help_text = parse_ini_file(translation_file) field_mapping, help_text = parse_ini_file(translation_file)
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def reload_translations() -> None: def reload_translations() -> None:
global translation_file, field_mapping, help_text global translation_file, field_mapping, help_text
@@ -571,7 +564,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
new_value = new_value == "True" or new_value is True new_value = new_value == "True" or new_value is True
menu_state.start_index.pop() menu_state.start_index.pop()
elif _is_repeated_field(field): # Handle repeated field - Not currently used elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value) new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ") new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop() menu_state.start_index.pop()

View File

@@ -1,3 +1,4 @@
from queue import SimpleQueue
from typing import Any, Union, List, Dict from typing import Any, Union, List, Dict
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -44,3 +45,5 @@ class InterfaceState:
@dataclass @dataclass
class AppState: class AppState:
lock: Any = None lock: Any = None
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
ui_shutdown: bool = False

View File

@@ -566,7 +566,7 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
formatted_json = config.format_json_single_line_arrays(data) formatted_json = config.format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f: with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json) f.write(formatted_json)
config.reload_config() setup_colors(reinit=True)
reload_translations(data.get("language")) reload_translations(data.get("language"))

View File

@@ -35,10 +35,5 @@ def setup_parser() -> ArgumentParser:
parser.add_argument( parser.add_argument(
"--settings", "--set", "--control", "-c", help="Launch directly into the settings", action="store_true" "--settings", "--set", "--control", "-c", help="Launch directly into the settings", action="store_true"
) )
parser.add_argument(
"--demo-screenshot",
help="Launch with a fake interface and seeded demo data for screenshots/testing.",
action="store_true",
)
return parser return parser

View File

@@ -9,17 +9,6 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main # defs are from meshtastic/python/main
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
checking `label == LABEL_REPEATED`.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def traverseConfig(config_root, config, interface_config) -> bool: def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference""" """Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root) snake_name = camel_to_snake(config_root)
@@ -100,7 +89,7 @@ def setPref(config, comp_name, raw_val) -> bool:
return False return False
# repeating fields need to be handled with append, not setattr # repeating fields need to be handled with append, not setattr
if not _is_repeated_field(pref): if pref.label != pref.LABEL_REPEATED:
try: try:
if config_type.message_type is not None: if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name) config_values = getattr(config_part, config_type.name)

View File

@@ -1,226 +0,0 @@
import os
import sqlite3
import tempfile
from dataclasses import dataclass
from typing import Dict, List, Tuple, Union
import contact.ui.default_config as config
from contact.utilities.db_handler import get_table_name
from contact.utilities.singleton import interface_state
DEMO_DB_FILENAME = "contact_demo_client.db"
DEMO_LOCAL_NODE_NUM = 0xC0DEC0DE
DEMO_BASE_TIMESTAMP = 1738717200 # 2025-02-04 17:00:00 UTC
DEMO_CHANNELS = ["MediumFast", "Another Channel"]
@dataclass
class DemoChannelSettings:
name: str
@dataclass
class DemoChannel:
role: bool
settings: DemoChannelSettings
@dataclass
class DemoLoRaConfig:
region: int = 1
modem_preset: int = 0
@dataclass
class DemoLocalConfig:
lora: DemoLoRaConfig
class DemoLocalNode:
def __init__(self, interface: "DemoInterface", channels: List[DemoChannel]) -> None:
self._interface = interface
self.channels = channels
self.localConfig = DemoLocalConfig(lora=DemoLoRaConfig())
def setFavorite(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isFavorite"] = True
def removeFavorite(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isFavorite"] = False
def setIgnored(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isIgnored"] = True
def removeIgnored(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isIgnored"] = False
def removeNode(self, node_num: int) -> None:
self._interface.nodesByNum.pop(node_num, None)
class DemoInterface:
def __init__(self, nodes: Dict[int, Dict[str, object]], channels: List[DemoChannel]) -> None:
self.nodesByNum = nodes
self.nodes = self.nodesByNum
self.localNode = DemoLocalNode(self, channels)
def getMyNodeInfo(self) -> Dict[str, int]:
return {"num": DEMO_LOCAL_NODE_NUM}
def getNode(self, selector: str) -> DemoLocalNode:
if selector != "^local":
raise KeyError(selector)
return self.localNode
def close(self) -> None:
return
def build_demo_interface() -> DemoInterface:
channels = [DemoChannel(role=True, settings=DemoChannelSettings(name=name)) for name in DEMO_CHANNELS]
nodes = {
DEMO_LOCAL_NODE_NUM: _build_node(
DEMO_LOCAL_NODE_NUM,
"Meshtastic fb3c",
"fb3c",
hops=0,
snr=13.7,
last_heard_offset=5,
battery=88,
voltage=4.1,
favorite=True,
),
0xA1000001: _build_node(0xA1000001, "KG7NDX-N2", "N2", hops=1, last_heard_offset=18, battery=79, voltage=4.0),
0xA1000002: _build_node(0xA1000002, "Satellite II Repeater", "SAT2", hops=2, last_heard_offset=31),
0xA1000003: _build_node(0xA1000003, "Search for Discord/Meshtastic", "DISC", hops=1, last_heard_offset=46),
0xA1000004: _build_node(0xA1000004, "K7EOK Mobile", "MOBL", hops=1, last_heard_offset=63, battery=52),
0xA1000005: _build_node(0xA1000005, "Turtle", "TRTL", hops=3, last_heard_offset=87),
0xA1000006: _build_node(0xA1000006, "CARS Trewvilliger Plaza", "CARS", hops=2, last_heard_offset=121),
0xA1000007: _build_node(0xA1000007, "No Hands!", "NHDS", hops=1, last_heard_offset=155),
0xA1000008: _build_node(0xA1000008, "McCutie", "MCCU", hops=2, last_heard_offset=211, ignored=True),
0xA1000009: _build_node(0xA1000009, "K1PDX", "K1PX", hops=2, last_heard_offset=267),
0xA100000A: _build_node(0xA100000A, "Arnold Creek", "ARND", hops=1, last_heard_offset=301),
0xA100000B: _build_node(0xA100000B, "Nansen", "NANS", hops=1, last_heard_offset=355),
0xA100000C: _build_node(0xA100000C, "Kodin 1", "KOD1", hops=2, last_heard_offset=402),
0xA100000D: _build_node(0xA100000D, "PH1", "PH1", hops=3, last_heard_offset=470),
0xA100000E: _build_node(0xA100000E, "Luna", "LUNA", hops=1, last_heard_offset=501),
0xA100000F: _build_node(0xA100000F, "sputnik1", "SPUT", hops=1, last_heard_offset=550),
0xA1000010: _build_node(0xA1000010, "K7EOK Maplewood West", "MAPL", hops=2, last_heard_offset=602),
0xA1000011: _build_node(0xA1000011, "KE7YVU 2", "YVU2", hops=2, last_heard_offset=655),
0xA1000012: _build_node(0xA1000012, "DNET", "DNET", hops=1, last_heard_offset=702),
0xA1000013: _build_node(0xA1000013, "Green Bluff", "GBLF", hops=1, last_heard_offset=780),
0xA1000014: _build_node(0xA1000014, "Council Crest Solar", "CCST", hops=2, last_heard_offset=830),
0xA1000015: _build_node(0xA1000015, "Meshtastic 61c7", "61c7", hops=1, last_heard_offset=901),
0xA1000016: _build_node(0xA1000016, "Bella", "BELA", hops=2, last_heard_offset=950),
0xA1000017: _build_node(0xA1000017, "Mojo Solar Base 4f12", "MOJO", hops=1, last_heard_offset=1010, favorite=True),
}
return DemoInterface(nodes=nodes, channels=channels)
def configure_demo_database(base_dir: str = "") -> str:
if not base_dir:
base_dir = tempfile.mkdtemp(prefix="contact_demo_")
os.makedirs(base_dir, exist_ok=True)
db_path = os.path.join(base_dir, DEMO_DB_FILENAME)
if os.path.exists(db_path):
os.remove(db_path)
config.db_file_path = db_path
return db_path
def seed_demo_messages() -> None:
schema = """
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
"""
with sqlite3.connect(config.db_file_path) as db_connection:
cursor = db_connection.cursor()
for channel_name, rows in _demo_messages().items():
table_name = get_table_name(channel_name)
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})")
cursor.executemany(
f"""
INSERT INTO {table_name} (user_id, message_text, timestamp, ack_type)
VALUES (?, ?, ?, ?)
""",
rows,
)
db_connection.commit()
def _build_node(
node_num: int,
long_name: str,
short_name: str,
*,
hops: int,
last_heard_offset: int,
snr: float = 0.0,
battery: int = 0,
voltage: float = 0.0,
favorite: bool = False,
ignored: bool = False,
) -> Dict[str, object]:
node = {
"num": node_num,
"user": {
"longName": long_name,
"shortName": short_name,
"hwModel": "TBEAM",
"role": "CLIENT",
"publicKey": f"pk-{node_num:08x}",
"isLicensed": True,
},
"lastHeard": DEMO_BASE_TIMESTAMP + 3600 - last_heard_offset,
"hopsAway": hops,
"isFavorite": favorite,
"isIgnored": ignored,
}
if snr:
node["snr"] = snr
if battery:
node["deviceMetrics"] = {
"batteryLevel": battery,
"voltage": voltage or 4.0,
"uptimeSeconds": 86400 + node_num % 10000,
"channelUtilization": 12.5 + (node_num % 7),
"airUtilTx": 4.5 + (node_num % 5),
}
if node_num % 3 == 0:
node["position"] = {
"latitude": 45.5231 + ((node_num % 50) * 0.0001),
"longitude": -122.6765 - ((node_num % 50) * 0.0001),
"altitude": 85 + (node_num % 20),
}
return node
def _demo_messages() -> Dict[Union[str, int], List[Tuple[str, str, int, Union[str, None]]]]:
return {
"MediumFast": [
(str(DEMO_LOCAL_NODE_NUM), "Help, I'm stuck in a ditch!", DEMO_BASE_TIMESTAMP + 45, "Ack"),
("2701131778", "Do you require a alpinist?", DEMO_BASE_TIMESTAMP + 80, None),
(str(DEMO_LOCAL_NODE_NUM), "I don't know what that is.", DEMO_BASE_TIMESTAMP + 104, "Implicit"),
],
"Another Channel": [
("2701131788", "Weather is holding for the summit push.", DEMO_BASE_TIMESTAMP + 220, None),
(str(DEMO_LOCAL_NODE_NUM), "Copy that. Keep me posted.", DEMO_BASE_TIMESTAMP + 260, "Ack"),
],
2701131788: [
("2701131788", "Ping me when you are back at the trailhead.", DEMO_BASE_TIMESTAMP + 330, None),
(str(DEMO_LOCAL_NODE_NUM), "Will do.", DEMO_BASE_TIMESTAMP + 350, "Ack"),
],
}

View File

@@ -1,54 +0,0 @@
"""Helpers for normalizing emoji sequences in width-sensitive message rendering."""
# Strip zero-width and presentation modifiers that make terminal cell width inconsistent.
EMOJI_MODIFIER_REPLACEMENTS = {
"\u200d": "",
"\u20e3": "",
"\ufe0e": "",
"\ufe0f": "",
"\U0001F3FB": "",
"\U0001F3FC": "",
"\U0001F3FD": "",
"\U0001F3FE": "",
"\U0001F3FF": "",
}
_EMOJI_MODIFIER_TRANSLATION = str.maketrans(EMOJI_MODIFIER_REPLACEMENTS)
_REGIONAL_INDICATOR_START = ord("\U0001F1E6")
_REGIONAL_INDICATOR_END = ord("\U0001F1FF")
def _regional_indicator_to_letter(char: str) -> str:
return chr(ord("A") + ord(char) - _REGIONAL_INDICATOR_START)
def _normalize_flag_emoji(text: str) -> str:
"""Convert flag emoji built from regional indicators into ASCII country codes."""
normalized = []
index = 0
while index < len(text):
current = text[index]
current_ord = ord(current)
if _REGIONAL_INDICATOR_START <= current_ord <= _REGIONAL_INDICATOR_END and index + 1 < len(text):
next_char = text[index + 1]
next_ord = ord(next_char)
if _REGIONAL_INDICATOR_START <= next_ord <= _REGIONAL_INDICATOR_END:
normalized.append(_regional_indicator_to_letter(current))
normalized.append(_regional_indicator_to_letter(next_char))
index += 2
continue
normalized.append(current)
index += 1
return "".join(normalized)
def normalize_message_text(text: str) -> str:
"""Strip modifiers and rewrite flag emoji into stable terminal-friendly text."""
if not text:
return text
return _normalize_flag_emoji(text.translate(_EMOJI_MODIFIER_TRANSLATION))

View File

@@ -68,19 +68,19 @@ def get_chunks(data):
# Leave it string as last resort # Leave it string as last resort
value = value value = value
# Python 3.9-compatible alternative to match/case. match key:
if key == "uptime_seconds":
# convert seconds to hours, for our sanity # convert seconds to hours, for our sanity
value = round(value / 60 / 60, 1) case "uptime_seconds":
elif key in ("longitude_i", "latitude_i"): value = round(value / 60 / 60, 1)
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry # Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate # truncate to 6th digit after floating point, which would be still accurate
value = round(value * 1e-7, 6) case "longitude_i" | "latitude_i":
elif key == "wind_direction": value = round(value * 1e-7, 6)
# Convert wind direction from degrees to abbreviation # Convert wind direction from degrees to abbreviation
value = humanize_wind_direction(value) case "wind_direction":
elif key == "time": value = humanize_wind_direction(value)
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m") case "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors: if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} " parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "contact" name = "contact"
version = "1.5.0" version = "1.4.14"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores." description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [ authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"} {name = "Ben Lipsey",email = "ben@pdxlocations.com"}

View File

@@ -1 +0,0 @@

View File

@@ -1,13 +0,0 @@
import unittest
from contact.utilities.arg_parser import setup_parser
class ArgParserTests(unittest.TestCase):
def test_demo_screenshot_flag_is_supported(self) -> None:
args = setup_parser().parse_args(["--demo-screenshot"])
self.assertTrue(args.demo_screenshot)
def test_demo_screenshot_defaults_to_false(self) -> None:
args = setup_parser().parse_args([])
self.assertFalse(args.demo_screenshot)

View File

@@ -1,21 +0,0 @@
import unittest
from contact.utilities.config_io import _is_repeated_field, splitCompoundName
class ConfigIoTests(unittest.TestCase):
def test_split_compound_name_preserves_multi_part_values(self) -> None:
self.assertEqual(splitCompoundName("config.device.role"), ["config", "device", "role"])
def test_split_compound_name_duplicates_single_part_values(self) -> None:
self.assertEqual(splitCompoundName("owner"), ["owner", "owner"])
def test_is_repeated_field_prefers_new_style_attribute(self) -> None:
field = type("Field", (), {"is_repeated": True})()
self.assertTrue(_is_repeated_field(field))
def test_is_repeated_field_falls_back_to_label_comparison(self) -> None:
field_type = type("Field", (), {"label": 3, "LABEL_REPEATED": 3})
self.assertTrue(_is_repeated_field(field_type()))

View File

@@ -1,15 +0,0 @@
import unittest
from contact.utilities.control_utils import transform_menu_path
class ControlUtilsTests(unittest.TestCase):
def test_transform_menu_path_applies_replacements_and_normalization(self) -> None:
transformed = transform_menu_path(["Main Menu", "Radio Settings", "Channel 2", "Detail"])
self.assertEqual(transformed, ["config", "channel", "Detail"])
def test_transform_menu_path_preserves_unmatched_entries(self) -> None:
transformed = transform_menu_path(["Main Menu", "Module Settings", "WiFi"])
self.assertEqual(transformed, ["module", "WiFi"])

View File

@@ -1,121 +0,0 @@
import os
import sqlite3
import tempfile
import unittest
import contact.ui.default_config as config
from contact.utilities import db_handler
from contact.utilities.demo_data import DEMO_LOCAL_NODE_NUM, build_demo_interface
from contact.utilities.singleton import interface_state, ui_state
from contact.utilities.utils import decimal_to_hex
from tests.test_support import reset_singletons, restore_config, snapshot_config
class DbHandlerTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config(
"db_file_path",
"message_prefix",
"sent_message_prefix",
"ack_str",
"ack_implicit_str",
"ack_unknown_str",
"nak_str",
)
self.tempdir = tempfile.TemporaryDirectory()
config.db_file_path = os.path.join(self.tempdir.name, "client.db")
interface_state.myNodeNum = 123
def tearDown(self) -> None:
self.tempdir.cleanup()
restore_config(self.saved_config)
reset_singletons()
def test_save_message_to_db_and_update_ack_roundtrip(self) -> None:
timestamp = db_handler.save_message_to_db("Primary", "123", "hello")
self.assertIsInstance(timestamp, int)
db_handler.update_ack_nak("Primary", timestamp, "hello", "Ack")
with sqlite3.connect(config.db_file_path) as conn:
row = conn.execute('SELECT user_id, message_text, ack_type FROM "123_Primary_messages"').fetchone()
self.assertEqual(row, ("123", "hello", "Ack"))
def test_update_node_info_in_db_fills_defaults_and_preserves_existing_values(self) -> None:
db_handler.update_node_info_in_db(999, short_name="ABCD")
original_long_name = db_handler.get_name_from_database(999, "long")
self.assertTrue(original_long_name.startswith("Meshtastic "))
self.assertEqual(db_handler.get_name_from_database(999, "short"), "ABCD")
self.assertEqual(db_handler.is_chat_archived(999), 0)
db_handler.update_node_info_in_db(999, chat_archived=1)
self.assertEqual(db_handler.get_name_from_database(999, "long"), original_long_name)
self.assertEqual(db_handler.get_name_from_database(999, "short"), "ABCD")
self.assertEqual(db_handler.is_chat_archived(999), 1)
def test_get_name_from_database_returns_hex_when_user_is_missing(self) -> None:
user_id = 0x1234ABCD
db_handler.ensure_node_table_exists()
self.assertEqual(db_handler.get_name_from_database(user_id, "short"), decimal_to_hex(user_id))
self.assertEqual(db_handler.is_chat_archived(user_id), 0)
def test_load_messages_from_db_populates_channels_and_messages(self) -> None:
db_handler.update_node_info_in_db(123, long_name="Local Node", short_name="ME")
db_handler.update_node_info_in_db(456, long_name="Remote Node", short_name="RM")
db_handler.update_node_info_in_db(789, long_name="Archived", short_name="AR", chat_archived=1)
db_handler.ensure_table_exists(
'"123_Primary_messages"',
"""
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
""",
)
db_handler.ensure_table_exists(
'"123_789_messages"',
"""
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
""",
)
with sqlite3.connect(config.db_file_path) as conn:
conn.execute('INSERT INTO "123_Primary_messages" VALUES (?, ?, ?, ?)', ("123", "sent", 1700000000, "Ack"))
conn.execute('INSERT INTO "123_Primary_messages" VALUES (?, ?, ?, ?)', ("456", "reply", 1700000001, None))
conn.execute('INSERT INTO "123_789_messages" VALUES (?, ?, ?, ?)', ("789", "hidden", 1700000002, None))
conn.commit()
ui_state.channel_list = []
ui_state.all_messages = {}
db_handler.load_messages_from_db()
self.assertIn("Primary", ui_state.channel_list)
self.assertNotIn(789, ui_state.channel_list)
self.assertIn("Primary", ui_state.all_messages)
self.assertIn(789, ui_state.all_messages)
messages = ui_state.all_messages["Primary"]
self.assertTrue(messages[0][0].startswith("-- "))
self.assertTrue(any(config.sent_message_prefix in prefix and config.ack_str in prefix for prefix, _ in messages))
self.assertTrue(any("RM:" in prefix for prefix, _ in messages))
self.assertEqual(ui_state.all_messages[789][-1][1], "hidden")
def test_init_nodedb_inserts_nodes_from_interface(self) -> None:
interface_state.interface = build_demo_interface()
interface_state.myNodeNum = DEMO_LOCAL_NODE_NUM
db_handler.init_nodedb()
self.assertEqual(db_handler.get_name_from_database(2701131778, "short"), "SAT2")

View File

@@ -1,38 +0,0 @@
import tempfile
import unittest
from contact.ui import default_config
class DefaultConfigTests(unittest.TestCase):
def test_get_localisation_options_filters_hidden_and_non_ini_files(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
for filename in ("en.ini", "ru.ini", ".hidden.ini", "notes.txt"):
with open(f"{tmpdir}/{filename}", "w", encoding="utf-8") as handle:
handle.write("")
self.assertEqual(default_config.get_localisation_options(tmpdir), ["en", "ru"])
def test_get_localisation_file_normalizes_extensions_and_falls_back_to_english(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
for filename in ("en.ini", "ru.ini"):
with open(f"{tmpdir}/{filename}", "w", encoding="utf-8") as handle:
handle.write("")
self.assertTrue(default_config.get_localisation_file("RU.ini", tmpdir).endswith("/ru.ini"))
self.assertTrue(default_config.get_localisation_file("missing", tmpdir).endswith("/en.ini"))
def test_update_dict_only_adds_missing_values(self) -> None:
default = {"theme": "dark", "nested": {"language": "en", "sound": True}}
actual = {"nested": {"language": "ru"}}
updated = default_config.update_dict(default, actual)
self.assertTrue(updated)
self.assertEqual(actual, {"theme": "dark", "nested": {"language": "ru", "sound": True}})
def test_format_json_single_line_arrays_keeps_arrays_inline(self) -> None:
rendered = default_config.format_json_single_line_arrays({"items": [1, 2], "nested": {"flags": ["a", "b"]}})
self.assertIn('"items": [1, 2]', rendered)
self.assertIn('"flags": ["a", "b"]', rendered)

View File

@@ -1,51 +0,0 @@
import tempfile
import unittest
from unittest import mock
import contact.__main__ as entrypoint
import contact.ui.default_config as config
from contact.utilities.db_handler import get_name_from_database
from contact.utilities.demo_data import DEMO_CHANNELS, DEMO_LOCAL_NODE_NUM, build_demo_interface, configure_demo_database
from contact.utilities.singleton import interface_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class DemoDataTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("db_file_path", "node_sort", "single_pane_mode")
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_build_demo_interface_exposes_expected_shape(self) -> None:
interface = build_demo_interface()
self.assertEqual(interface.getMyNodeInfo()["num"], DEMO_LOCAL_NODE_NUM)
self.assertEqual([channel.settings.name for channel in interface.getNode("^local").channels], DEMO_CHANNELS)
self.assertIn(DEMO_LOCAL_NODE_NUM, interface.nodesByNum)
def test_initialize_globals_seed_demo_populates_ui_state_and_db(self) -> None:
interface_state.interface = build_demo_interface()
with tempfile.TemporaryDirectory() as tmpdir:
demo_db_path = configure_demo_database(tmpdir)
with mock.patch.object(entrypoint.pub, "subscribe"):
entrypoint.initialize_globals(seed_demo=True)
self.assertEqual(config.db_file_path, demo_db_path)
self.assertIn("MediumFast", ui_state.channel_list)
self.assertIn("Another Channel", ui_state.channel_list)
self.assertIn(2701131788, ui_state.channel_list)
self.assertEqual(ui_state.node_list[0], DEMO_LOCAL_NODE_NUM)
self.assertEqual(get_name_from_database(2701131778, "short"), "SAT2")
medium_fast = ui_state.all_messages["MediumFast"]
self.assertTrue(medium_fast[0][0].startswith("-- "))
self.assertTrue(any(config.sent_message_prefix in prefix and config.ack_str in prefix for prefix, _ in medium_fast))
self.assertTrue(any("SAT2:" in prefix for prefix, _ in medium_fast))
direct_messages = ui_state.all_messages[2701131788]
self.assertEqual(len(direct_messages), 3)

View File

@@ -1,11 +0,0 @@
import unittest
from contact.utilities.emoji_utils import normalize_message_text
class EmojiUtilsTests(unittest.TestCase):
def test_strips_modifiers_from_keycaps_and_skin_tones(self) -> None:
self.assertEqual(normalize_message_text("👍🏽 7"), "👍 7")
def test_rewrites_flag_emoji_to_country_codes(self) -> None:
self.assertEqual(normalize_message_text("🇺🇸 hello 🇩🇪"), "US hello DE")

View File

@@ -1,57 +0,0 @@
import os
import tempfile
import unittest
from unittest import mock
import contact.ui.default_config as config
from contact.utilities import i18n
from tests.test_support import restore_config, snapshot_config
class I18nTests(unittest.TestCase):
def setUp(self) -> None:
self.saved_config = snapshot_config("language")
i18n._translations = {}
i18n._language = None
def tearDown(self) -> None:
restore_config(self.saved_config)
i18n._translations = {}
i18n._language = None
def test_t_loads_translation_file_and_formats_placeholders(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
translation_file = os.path.join(tmpdir, "xx.ini")
with open(translation_file, "w", encoding="utf-8") as handle:
handle.write('[ui]\n')
handle.write('greeting,"Hello {name}"\n')
config.language = "xx"
with mock.patch.object(config, "get_localisation_file", return_value=translation_file):
self.assertEqual(i18n.t("ui.greeting", name="Ben"), "Hello Ben")
def test_t_falls_back_to_default_and_returns_unformatted_text_on_error(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
translation_file = os.path.join(tmpdir, "xx.ini")
with open(translation_file, "w", encoding="utf-8") as handle:
handle.write('[ui]\n')
handle.write('greeting,"Hello {name}"\n')
config.language = "xx"
with mock.patch.object(config, "get_localisation_file", return_value=translation_file):
self.assertEqual(i18n.t("ui.greeting"), "Hello {name}")
self.assertEqual(i18n.t("ui.missing", default="Fallback"), "Fallback")
self.assertEqual(i18n.t_text("Literal {value}", value=7), "Literal 7")
def test_loader_cache_is_reused_until_language_changes(self) -> None:
config.language = "en"
with mock.patch.object(i18n, "parse_ini_file", return_value=({"key": "value"}, {})) as parse_ini_file:
self.assertEqual(i18n.t("key"), "value")
self.assertEqual(i18n.t("key"), "value")
self.assertEqual(parse_ini_file.call_count, 1)
config.language = "ru"
self.assertEqual(i18n.t("missing", default="fallback"), "fallback")
self.assertEqual(parse_ini_file.call_count, 2)

View File

@@ -1,40 +0,0 @@
import os
import tempfile
import unittest
from unittest import mock
from contact.utilities.ini_utils import parse_ini_file
class IniUtilsTests(unittest.TestCase):
def test_parse_ini_file_reads_sections_fields_and_help_text(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
ini_path = os.path.join(tmpdir, "settings.ini")
with open(ini_path, "w", encoding="utf-8") as handle:
handle.write('; comment\n')
handle.write('[config.device]\n')
handle.write('title,"Device","Device help"\n')
handle.write('name,"Node Name","Node help"\n')
handle.write('empty_help,"Fallback",""\n')
with mock.patch("contact.utilities.ini_utils.i18n.t", return_value="No help available."):
mapping, help_text = parse_ini_file(ini_path)
self.assertEqual(mapping["config.device"], "Device")
self.assertEqual(help_text["config.device"], "Device help")
self.assertEqual(mapping["config.device.name"], "Node Name")
self.assertEqual(help_text["config.device.name"], "Node help")
self.assertEqual(help_text["config.device.empty_help"], "No help available.")
def test_parse_ini_file_uses_builtin_help_fallback_when_i18n_fails(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
ini_path = os.path.join(tmpdir, "settings.ini")
with open(ini_path, "w", encoding="utf-8") as handle:
handle.write('[section]\n')
handle.write('name,"Name"\n')
with mock.patch("contact.utilities.ini_utils.i18n.t", side_effect=RuntimeError("boom")):
mapping, help_text = parse_ini_file(ini_path)
self.assertEqual(mapping["section.name"], "Name")
self.assertEqual(help_text["section.name"], "No help available.")

View File

@@ -1,178 +0,0 @@
from argparse import Namespace
import unittest
from unittest import mock
import contact.__main__ as entrypoint
import contact.ui.default_config as config
from contact.utilities.singleton import interface_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class MainRuntimeTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("single_pane_mode")
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_initialize_runtime_interface_uses_demo_branch(self) -> None:
args = Namespace(demo_screenshot=True)
with mock.patch.object(entrypoint, "configure_demo_database") as configure_demo_database:
with mock.patch.object(entrypoint, "build_demo_interface", return_value="demo-interface") as build_demo:
with mock.patch.object(entrypoint, "initialize_interface") as initialize_interface:
result = entrypoint.initialize_runtime_interface(args)
self.assertEqual(result, "demo-interface")
configure_demo_database.assert_called_once_with()
build_demo.assert_called_once_with()
initialize_interface.assert_not_called()
def test_initialize_runtime_interface_uses_live_branch_without_demo_flag(self) -> None:
args = Namespace(demo_screenshot=False)
with mock.patch.object(entrypoint, "initialize_interface", return_value="live-interface") as initialize_interface:
result = entrypoint.initialize_runtime_interface(args)
self.assertEqual(result, "live-interface")
initialize_interface.assert_called_once_with(args)
def test_prompt_region_if_unset_reinitializes_interface_after_confirmation(self) -> None:
args = Namespace()
old_interface = mock.Mock()
new_interface = mock.Mock()
interface_state.interface = old_interface
with mock.patch.object(entrypoint, "get_list_input", return_value="Yes"):
with mock.patch.object(entrypoint, "set_region") as set_region:
with mock.patch.object(entrypoint, "initialize_interface", return_value=new_interface) as initialize:
entrypoint.prompt_region_if_unset(args)
set_region.assert_called_once_with(old_interface)
old_interface.close.assert_called_once_with()
initialize.assert_called_once_with(args)
self.assertIs(interface_state.interface, new_interface)
def test_prompt_region_if_unset_leaves_interface_unchanged_when_declined(self) -> None:
args = Namespace()
interface = mock.Mock()
interface_state.interface = interface
with mock.patch.object(entrypoint, "get_list_input", return_value="No"):
with mock.patch.object(entrypoint, "set_region") as set_region:
with mock.patch.object(entrypoint, "initialize_interface") as initialize:
entrypoint.prompt_region_if_unset(args)
set_region.assert_not_called()
initialize.assert_not_called()
interface.close.assert_not_called()
self.assertIs(interface_state.interface, interface)
def test_initialize_globals_resets_and_populates_runtime_state(self) -> None:
ui_state.channel_list = ["stale"]
ui_state.all_messages = {"stale": [("old", "message")]}
ui_state.notifications = [1]
ui_state.packet_buffer = ["packet"]
ui_state.node_list = [99]
ui_state.selected_channel = 3
ui_state.selected_message = 4
ui_state.selected_node = 5
ui_state.start_index = [9, 9, 9]
config.single_pane_mode = "True"
with mock.patch.object(entrypoint, "get_nodeNum", return_value=123):
with mock.patch.object(entrypoint, "get_channels", return_value=["Primary"]) as get_channels:
with mock.patch.object(entrypoint, "get_node_list", return_value=[123, 456]) as get_node_list:
with mock.patch.object(entrypoint.pub, "subscribe") as subscribe:
with mock.patch.object(entrypoint, "init_nodedb") as init_nodedb:
with mock.patch.object(entrypoint, "seed_demo_messages") as seed_demo_messages:
with mock.patch.object(entrypoint, "load_messages_from_db") as load_messages:
entrypoint.initialize_globals(seed_demo=True)
self.assertEqual(ui_state.channel_list, ["Primary"])
self.assertEqual(ui_state.all_messages, {})
self.assertEqual(ui_state.notifications, [])
self.assertEqual(ui_state.packet_buffer, [])
self.assertEqual(ui_state.node_list, [123, 456])
self.assertEqual(ui_state.selected_channel, 0)
self.assertEqual(ui_state.selected_message, 0)
self.assertEqual(ui_state.selected_node, 0)
self.assertEqual(ui_state.start_index, [0, 0, 0])
self.assertTrue(ui_state.single_pane_mode)
self.assertEqual(interface_state.myNodeNum, 123)
get_channels.assert_called_once_with()
get_node_list.assert_called_once_with()
subscribe.assert_called_once_with(entrypoint.on_receive, "meshtastic.receive")
init_nodedb.assert_called_once_with()
seed_demo_messages.assert_called_once_with()
load_messages.assert_called_once_with()
def test_ensure_min_rows_retries_until_terminal_is_large_enough(self) -> None:
stdscr = mock.Mock()
stdscr.getmaxyx.side_effect = [(10, 80), (11, 80)]
with mock.patch.object(entrypoint, "dialog") as dialog:
with mock.patch.object(entrypoint.curses, "update_lines_cols") as update_lines_cols:
entrypoint.ensure_min_rows(stdscr, min_rows=11)
dialog.assert_called_once()
update_lines_cols.assert_called_once_with()
stdscr.clear.assert_called_once_with()
stdscr.refresh.assert_called_once_with()
def test_start_prints_help_and_exits_zero(self) -> None:
parser = mock.Mock()
with mock.patch.object(entrypoint.sys, "argv", ["contact", "--help"]):
with mock.patch.object(entrypoint, "setup_parser", return_value=parser):
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(0)) as exit_mock:
with self.assertRaises(SystemExit) as raised:
entrypoint.start()
self.assertEqual(raised.exception.code, 0)
parser.print_help.assert_called_once_with()
exit_mock.assert_called_once_with(0)
def test_start_runs_curses_wrapper_and_closes_interface(self) -> None:
interface = mock.Mock()
interface_state.interface = interface
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
with mock.patch.object(entrypoint.curses, "wrapper") as wrapper:
entrypoint.start()
wrapper.assert_called_once_with(entrypoint.main)
interface.close.assert_called_once_with()
def test_start_handles_keyboard_interrupt(self) -> None:
interface = mock.Mock()
interface_state.interface = interface
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
with mock.patch.object(entrypoint.curses, "wrapper", side_effect=KeyboardInterrupt):
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(0)) as exit_mock:
with self.assertRaises(SystemExit) as raised:
entrypoint.start()
self.assertEqual(raised.exception.code, 0)
interface.close.assert_called_once_with()
exit_mock.assert_called_once_with(0)
def test_start_handles_fatal_exception_and_exits_one(self) -> None:
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
with mock.patch.object(entrypoint.curses, "wrapper", side_effect=RuntimeError("boom")):
with mock.patch.object(entrypoint.curses, "endwin") as endwin:
with mock.patch.object(entrypoint.traceback, "print_exc") as print_exc:
with mock.patch("builtins.print") as print_mock:
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(1)) as exit_mock:
with self.assertRaises(SystemExit) as raised:
entrypoint.start()
self.assertEqual(raised.exception.code, 1)
endwin.assert_called_once_with()
print_exc.assert_called_once_with()
print_mock.assert_any_call("Fatal error:", mock.ANY)
exit_mock.assert_called_once_with(1)

View File

@@ -1,96 +0,0 @@
import unittest
from unittest import mock
import contact.ui.default_config as config
from contact.message_handlers import rx_handler
from contact.utilities.singleton import interface_state, menu_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class RxHandlerTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("notification_sound", "message_prefix")
config.notification_sound = "False"
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_on_receive_text_message_refreshes_selected_channel(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.all_messages = {"Primary": []}
ui_state.selected_channel = 0
packet = {
"from": 222,
"to": 999,
"channel": 0,
"hopStart": 3,
"hopLimit": 1,
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"},
}
with mock.patch.object(rx_handler, "refresh_node_list", return_value=True):
with mock.patch.object(rx_handler, "draw_node_list") as draw_node_list:
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
draw_node_list.assert_called_once_with()
draw_messages_window.assert_called_once_with(True)
draw_channel_list.assert_not_called()
add_notification.assert_not_called()
save_message_to_db.assert_called_once_with("Primary", 222, "hello")
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
self.assertIn("SAT2:", ui_state.all_messages["Primary"][-1][0])
self.assertIn("[2]", ui_state.all_messages["Primary"][-1][0])
def test_on_receive_direct_message_adds_channel_and_notification(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.all_messages = {"Primary": []}
ui_state.selected_channel = 0
packet = {
"from": 222,
"to": 111,
"hopStart": 1,
"hopLimit": 1,
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"dm"},
}
with mock.patch.object(rx_handler, "refresh_node_list", return_value=False):
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "update_node_info_in_db") as update_node_info_in_db:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
self.assertIn(222, ui_state.channel_list)
self.assertIn(222, ui_state.all_messages)
draw_messages_window.assert_not_called()
draw_channel_list.assert_called_once_with()
add_notification.assert_called_once_with(1)
update_node_info_in_db.assert_called_once_with(222, chat_archived=False)
save_message_to_db.assert_called_once_with(222, 222, "dm")
def test_on_receive_trims_packet_buffer_even_when_packet_is_undecoded(self) -> None:
ui_state.packet_buffer = list(range(25))
ui_state.display_log = True
ui_state.current_window = 4
with mock.patch.object(rx_handler, "draw_packetlog_win") as draw_packetlog_win:
rx_handler.on_receive({"id": "new"}, interface=None)
draw_packetlog_win.assert_called_once_with()
self.assertEqual(len(ui_state.packet_buffer), 20)
self.assertEqual(ui_state.packet_buffer[-1], {"id": "new"})
self.assertTrue(menu_state.need_redraw)

View File

@@ -1,27 +0,0 @@
import threading
import contact.ui.default_config as config
from contact.ui.ui_state import AppState, ChatUIState, InterfaceState, MenuState
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
def reset_singletons() -> None:
_reset_instance(ui_state, ChatUIState())
_reset_instance(interface_state, InterfaceState())
_reset_instance(menu_state, MenuState())
_reset_instance(app_state, AppState())
app_state.lock = threading.Lock()
def restore_config(saved: dict) -> None:
for key, value in saved.items():
setattr(config, key, value)
def snapshot_config(*keys: str) -> dict:
return {key: getattr(config, key) for key in keys}
def _reset_instance(target: object, replacement: object) -> None:
target.__dict__.clear()
target.__dict__.update(replacement.__dict__)

View File

@@ -1,27 +0,0 @@
import unittest
from unittest import mock
from contact.utilities.telemetry_beautifier import get_chunks, humanize_wind_direction
class TelemetryBeautifierTests(unittest.TestCase):
def test_humanize_wind_direction_handles_boundaries(self) -> None:
self.assertEqual(humanize_wind_direction(0), "N")
self.assertEqual(humanize_wind_direction(90), "E")
self.assertEqual(humanize_wind_direction(225), "SW")
self.assertIsNone(humanize_wind_direction(-1))
def test_get_chunks_formats_known_and_unknown_values(self) -> None:
rendered = get_chunks("uptime_seconds:7200\nwind_direction:90\nlatitude_i:123456789\nunknown:abc\n")
self.assertIn("🆙 2.0h", rendered)
self.assertIn("⮆ E", rendered)
self.assertIn("🌍 12.345679", rendered)
self.assertIn("unknown:abc", rendered)
def test_get_chunks_formats_time_values(self) -> None:
with mock.patch("contact.utilities.telemetry_beautifier.datetime.datetime") as mocked_datetime:
mocked_datetime.fromtimestamp.return_value.strftime.return_value = "01.01.1970 00:00"
rendered = get_chunks("time:0\n")
self.assertIn("🕔 01.01.1970 00:00", rendered)

View File

@@ -1,107 +0,0 @@
from types import SimpleNamespace
import unittest
from unittest import mock
from meshtastic import BROADCAST_NUM
import contact.ui.default_config as config
from contact.message_handlers import tx_handler
from contact.utilities.singleton import interface_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class TxHandlerTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
tx_handler.ack_naks.clear()
self.saved_config = snapshot_config("sent_message_prefix", "ack_str", "ack_implicit_str", "nak_str", "ack_unknown_str")
def tearDown(self) -> None:
tx_handler.ack_naks.clear()
restore_config(self.saved_config)
reset_singletons()
def test_send_message_on_named_channel_tracks_ack_request(self) -> None:
interface = mock.Mock()
interface.sendText.return_value = SimpleNamespace(id="req-1")
interface_state.interface = interface
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.all_messages = {"Primary": []}
with mock.patch.object(tx_handler, "save_message_to_db", return_value=999) as save_message_to_db:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[00:00:00] "):
tx_handler.send_message("hello", channel=0)
interface.sendText.assert_called_once_with(
text="hello",
destinationId=BROADCAST_NUM,
wantAck=True,
wantResponse=False,
onResponse=tx_handler.onAckNak,
channelIndex=0,
)
save_message_to_db.assert_called_once_with("Primary", 111, "hello")
self.assertEqual(tx_handler.ack_naks["req-1"]["channel"], "Primary")
self.assertEqual(tx_handler.ack_naks["req-1"]["messageIndex"], 1)
self.assertEqual(tx_handler.ack_naks["req-1"]["timestamp"], 999)
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
def test_send_message_to_direct_node_uses_node_as_destination(self) -> None:
interface = mock.Mock()
interface.sendText.return_value = SimpleNamespace(id="req-2")
interface_state.interface = interface
interface_state.myNodeNum = 111
ui_state.channel_list = [222]
ui_state.all_messages = {222: []}
with mock.patch.object(tx_handler, "save_message_to_db", return_value=123):
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[00:00:00] "):
tx_handler.send_message("dm", channel=0)
interface.sendText.assert_called_once_with(
text="dm",
destinationId=222,
wantAck=True,
wantResponse=False,
onResponse=tx_handler.onAckNak,
channelIndex=0,
)
self.assertEqual(tx_handler.ack_naks["req-2"]["channel"], 222)
def test_on_ack_nak_updates_message_for_explicit_ack(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.selected_channel = 0
ui_state.all_messages = {"Primary": [("pending", "hello")]}
tx_handler.ack_naks["req"] = {"channel": "Primary", "messageIndex": 0, "timestamp": 55}
packet = {"from": 222, "decoded": {"requestId": "req", "routing": {"errorReason": "NONE"}}}
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
with mock.patch("contact.ui.contact_ui.draw_messages_window") as draw_messages_window:
tx_handler.onAckNak(packet)
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Ack")
draw_messages_window.assert_called_once_with()
self.assertIn(config.sent_message_prefix, ui_state.all_messages["Primary"][0][0])
self.assertIn(config.ack_str, ui_state.all_messages["Primary"][0][0])
def test_on_ack_nak_uses_implicit_marker_for_self_ack(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.selected_channel = 0
ui_state.all_messages = {"Primary": [("pending", "hello")]}
tx_handler.ack_naks["req"] = {"channel": "Primary", "messageIndex": 0, "timestamp": 55}
packet = {"from": 111, "decoded": {"requestId": "req", "routing": {"errorReason": "NONE"}}}
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
with mock.patch("contact.ui.contact_ui.draw_messages_window"):
tx_handler.onAckNak(packet)
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Implicit")
self.assertIn(config.ack_implicit_str, ui_state.all_messages["Primary"][0][0])

View File

@@ -1,71 +0,0 @@
import unittest
from unittest import mock
import contact.ui.default_config as config
from contact.utilities.demo_data import DEMO_LOCAL_NODE_NUM, build_demo_interface
from contact.utilities.singleton import interface_state, ui_state
from contact.utilities.utils import add_new_message, get_channels, get_node_list, parse_protobuf
from tests.test_support import reset_singletons, restore_config, snapshot_config
class UtilsTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("node_sort")
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_get_node_list_keeps_local_first_and_ignored_last(self) -> None:
config.node_sort = "lastHeard"
interface = build_demo_interface()
interface_state.interface = interface
interface_state.myNodeNum = DEMO_LOCAL_NODE_NUM
node_list = get_node_list()
self.assertEqual(node_list[0], DEMO_LOCAL_NODE_NUM)
self.assertEqual(node_list[-1], 0xA1000008)
def test_add_new_message_groups_messages_by_hour(self) -> None:
ui_state.all_messages = {"MediumFast": []}
with mock.patch("contact.utilities.utils.time.time", side_effect=[1000, 1000]):
with mock.patch("contact.utilities.utils.time.strftime", return_value="[00:16:40] "):
with mock.patch("contact.utilities.utils.datetime.datetime") as mocked_datetime:
mocked_datetime.fromtimestamp.return_value.strftime.return_value = "2025-02-04 17:00"
add_new_message("MediumFast", ">> Test: ", "First")
add_new_message("MediumFast", ">> Test: ", "Second")
self.assertEqual(
ui_state.all_messages["MediumFast"],
[
("-- 2025-02-04 17:00 --", ""),
("[00:16:40] >> Test: ", "First"),
("[00:16:40] >> Test: ", "Second"),
],
)
def test_get_channels_populates_message_buckets_for_device_channels(self) -> None:
interface_state.interface = build_demo_interface()
ui_state.channel_list = []
ui_state.all_messages = {}
channels = get_channels()
self.assertIn("MediumFast", channels)
self.assertIn("Another Channel", channels)
self.assertIn("MediumFast", ui_state.all_messages)
self.assertIn("Another Channel", ui_state.all_messages)
def test_parse_protobuf_returns_string_payload_unchanged(self) -> None:
packet = {"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": "hello"}}
self.assertEqual(parse_protobuf(packet), "hello")
def test_parse_protobuf_returns_placeholder_for_text_messages(self) -> None:
packet = {"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"}}
self.assertEqual(parse_protobuf(packet), "✉️")

View File

@@ -1,14 +0,0 @@
import unittest
from contact.utilities.validation_rules import get_validation_for
class ValidationRulesTests(unittest.TestCase):
def test_get_validation_for_matches_exact_keys(self) -> None:
self.assertEqual(get_validation_for("shortName"), {"max_length": 4})
def test_get_validation_for_matches_substrings(self) -> None:
self.assertEqual(get_validation_for("config.position.latitude"), {"min_value": -90, "max_value": 90})
def test_get_validation_for_returns_empty_dict_for_unknown_key(self) -> None:
self.assertEqual(get_validation_for("totally_unknown"), {})