Compare commits

..

1 Commits
tests ... queue

Author SHA1 Message Date
pdxlocations
d2e559ed04 Implement receive queue and UI shutdown handling in app state 2026-02-20 21:47:19 -07:00
31 changed files with 285 additions and 1606 deletions

View File

@@ -15,6 +15,7 @@ import curses
import io
import logging
import os
import queue
import subprocess
import sys
import threading
@@ -32,7 +33,6 @@ from contact.ui.contact_ui import main_ui
from contact.ui.splash import draw_splash
from contact.utilities.arg_parser import setup_parser
from contact.utilities.db_handler import init_nodedb, load_messages_from_db
from contact.utilities.demo_data import build_demo_interface, configure_demo_database, seed_demo_messages
from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t
from contact.ui.dialog import dialog
@@ -55,6 +55,8 @@ logging.basicConfig(
)
app_state.lock = threading.Lock()
app_state.rx_queue = queue.SimpleQueue()
app_state.ui_shutdown = False
# ------------------------------------------------------------------------------
@@ -69,18 +71,9 @@ def prompt_region_if_unset(args: object) -> None:
interface_state.interface = initialize_interface(args)
def initialize_globals(seed_demo: bool = False) -> None:
def initialize_globals() -> None:
"""Initializes interface and shared globals."""
ui_state.channel_list = []
ui_state.all_messages = {}
ui_state.notifications = []
ui_state.packet_buffer = []
ui_state.node_list = []
ui_state.selected_channel = 0
ui_state.selected_message = 0
ui_state.selected_node = 0
ui_state.start_index = [0, 0, 0]
interface_state.myNodeNum = get_nodeNum()
ui_state.channel_list = get_channels()
ui_state.node_list = get_node_list()
@@ -88,18 +81,9 @@ def initialize_globals(seed_demo: bool = False) -> None:
pub.subscribe(on_receive, "meshtastic.receive")
init_nodedb()
if seed_demo:
seed_demo_messages()
load_messages_from_db()
def initialize_runtime_interface(args: object):
if getattr(args, "demo_screenshot", False):
configure_demo_database()
return build_demo_interface()
return initialize_interface(args)
def main(stdscr: curses.window) -> None:
"""Main entry point for the curses UI."""
@@ -117,12 +101,12 @@ def main(stdscr: curses.window) -> None:
logging.info("Initializing interface...")
with app_state.lock:
interface_state.interface = initialize_runtime_interface(args)
interface_state.interface = initialize_interface(args)
if not getattr(args, "demo_screenshot", False) and interface_state.interface.localNode.localConfig.lora.region == 0:
if interface_state.interface.localNode.localConfig.lora.region == 0:
prompt_region_if_unset(args)
initialize_globals(seed_demo=getattr(args, "demo_screenshot", False))
initialize_globals()
logging.info("Starting main UI")
stdscr.clear()
@@ -168,11 +152,10 @@ def start() -> None:
sys.exit(0)
try:
app_state.ui_shutdown = False
curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
@@ -183,6 +166,13 @@ def start() -> None:
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
finally:
app_state.ui_shutdown = True
try:
if interface_state.interface is not None:
interface_state.interface.close()
except Exception:
logging.exception("Error while closing interface")
if __name__ == "__main__":

View File

@@ -77,7 +77,7 @@ help.node_info, "F5 = Full node info", ""
help.archive_chat, "Ctrl+D = Archive chat / remove node", ""
help.favorite, "Ctrl+F = Favorite", ""
help.ignore, "Ctrl+G = Ignore", ""
help.search, "Ctrl+/ or / = Search", ""
help.search, "Ctrl+/ = Search", ""
help.help, "Ctrl+K = Help", ""
help.no_help, "No help available.", ""
confirm.remove_from_nodedb, "Remove {name} from nodedb?", ""

View File

@@ -77,7 +77,7 @@ help.node_info, "F5 = Полная информация об узле", ""
help.archive_chat, "Ctrl+D = Архив чата / удалить узел", ""
help.favorite, "Ctrl+F = Избранное", ""
help.ignore, "Ctrl+G = Игнорировать", ""
help.search, "Ctrl+/ или / = Поиск", ""
help.search, "Ctrl+/ = Поиск", ""
help.help, "Ctrl+K = Справка", ""
help.no_help, "Нет справки.", ""
confirm.remove_from_nodedb, "Удалить {name} из базы узлов?", ""

View File

@@ -2,30 +2,36 @@ import logging
import os
import platform
import shutil
import time
import subprocess
import threading
from typing import Any, Dict, Optional
# Debounce notification sounds so a burst of queued messages only plays once.
import time
from typing import Any, Dict
import contact.ui.default_config as config
from contact.utilities.db_handler import (
get_name_from_database,
maybe_store_nodeinfo_in_db,
save_message_to_db,
update_node_info_in_db,
)
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
from contact.utilities.utils import add_new_message, refresh_node_list
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: Optional[threading.Timer] = None
_sound_timer: threading.Timer | None = None
_sound_timer_lock = threading.Lock()
_last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period.
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
"""Schedule a notification sound after a short quiet period."""
global _sound_timer, _last_sound_request
now = time.monotonic()
with _sound_timer_lock:
_last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None:
try:
_sound_timer.cancel()
@@ -34,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = None
def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock:
if expected_request_time != _last_sound_request:
return
@@ -43,42 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True
_sound_timer.start()
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound():
def play_sound() -> None:
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
if system == "Darwin":
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
@@ -95,102 +78,127 @@ def play_sound():
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as exc:
logging.error("Sound playback failed: %s", exc)
except Exception as exc:
logging.error("Unexpected error while playing sound: %s", exc)
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def _decode_message_payload(payload: Any) -> str:
if isinstance(payload, bytes):
return payload.decode("utf-8", errors="replace")
if isinstance(payload, str):
return payload
return str(payload)
def process_receive_event(packet: Dict[str, Any]) -> None:
"""Process a queued packet on the UI thread and perform all UI updates."""
# Local import prevents module-level circular import.
from contact.ui.contact_ui import (
add_notification,
draw_channel_list,
draw_messages_window,
draw_node_list,
draw_packetlog_win,
)
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return
changed = refresh_node_list()
if changed:
draw_node_list()
portnum = decoded.get("portnum")
if portnum == "NODEINFO_APP":
user = decoded.get("user")
if isinstance(user, dict) and "longName" in user:
maybe_store_nodeinfo_in_db(packet)
return
if portnum != "TEXT_MESSAGE_APP":
return
hop_start = packet.get("hopStart", 0)
hop_limit = packet.get("hopLimit", 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_string = _decode_message_payload(decoded.get("payload"))
if not ui_state.channel_list:
return
refresh_channels = False
refresh_messages = False
channel_number = packet.get("channel", 0)
if not isinstance(channel_number, int):
channel_number = 0
if channel_number < 0:
channel_number = 0
packet_from = packet.get("from")
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
if packet_from not in ui_state.channel_list:
ui_state.channel_list.append(packet_from)
if packet_from not in ui_state.all_messages:
ui_state.all_messages[packet_from] = []
update_node_info_in_db(packet_from, chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet_from)
if channel_number >= len(ui_state.channel_list):
channel_number = 0
channel_id = ui_state.channel_list[channel_number]
if ui_state.selected_channel >= len(ui_state.channel_list):
ui_state.selected_channel = 0
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
if packet_from is None:
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
return
message_from_string = get_name_from_database(packet_from, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, packet_from, message_string)
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""
Handles an incoming packet from a Meshtastic interface.
Args:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with app_state.lock:
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
try:
if "decoded" not in packet:
return
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list()
if changed:
draw_node_list()
if packet["decoded"]["portnum"] == "NODEINFO_APP":
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0)
hop_limit = packet.get('hopLimit', 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
refresh_channels = False
refresh_messages = False
if packet.get("channel"):
channel_number = packet["channel"]
else:
channel_number = 0
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
# Add received message to the messages list
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")
"""Enqueue packet to be processed on the main curses thread."""
if app_state.ui_shutdown:
return
if not isinstance(packet, dict):
return
try:
app_state.rx_queue.put(packet)
except Exception:
logging.exception("Failed to enqueue packet for UI processing")

View File

@@ -1,9 +1,11 @@
import curses
import logging
from queue import Empty
import time
import traceback
from typing import Union
from contact.message_handlers.rx_handler import process_receive_event
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute
@@ -12,17 +14,14 @@ from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t
from contact.utilities.emoji_utils import normalize_message_text
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state
from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
MIN_COL = 1 # "effectively zero" without breaking curses
RESIZE_DEBOUNCE_MS = 250
root_win = None
nodes_pad = None
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
@@ -65,72 +64,6 @@ def paint_frame(win, selected: bool) -> None:
win.refresh()
def get_channel_row_color(index: int) -> int:
if index == ui_state.selected_channel:
if ui_state.current_window == 0:
return get_color("channel_list", reverse=True)
return get_color("channel_selected")
return get_color("channel_list")
def get_node_row_color(index: int, highlight: bool = False) -> int:
node_num = ui_state.node_list[index]
node = interface_state.interface.nodesByNum.get(node_num, {})
color = "node_list"
if node.get("isFavorite"):
color = "node_favorite"
if node.get("isIgnored"):
color = "node_ignored"
reverse = index == ui_state.selected_node and (ui_state.current_window == 2 or highlight)
return get_color(color, reverse=reverse)
def refresh_node_selection(old_index: int = -1, highlight: bool = False) -> None:
if nodes_pad is None or not ui_state.node_list:
return
width = max(0, nodes_pad.getmaxyx()[1] - 4)
if 0 <= old_index < len(ui_state.node_list):
try:
nodes_pad.chgat(old_index, 1, width, get_node_row_color(old_index, highlight=highlight))
except curses.error:
pass
if 0 <= ui_state.selected_node < len(ui_state.node_list):
try:
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node, highlight=highlight))
except curses.error:
pass
ui_state.start_index[2] = max(0, ui_state.selected_node - (nodes_win.getmaxyx()[0] - 3))
refresh_pad(2)
draw_window_arrows(2)
def refresh_main_window(window_id: int, selected: bool) -> None:
if window_id == 0:
paint_frame(channel_win, selected=selected)
if ui_state.channel_list:
width = max(0, channel_pad.getmaxyx()[1] - 4)
channel_pad.chgat(ui_state.selected_channel, 1, width, get_channel_row_color(ui_state.selected_channel))
refresh_pad(0)
elif window_id == 1:
paint_frame(messages_win, selected=selected)
refresh_pad(1)
elif window_id == 2:
paint_frame(nodes_win, selected=selected)
if ui_state.node_list and nodes_pad is not None:
width = max(0, nodes_pad.getmaxyx()[1] - 4)
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node))
refresh_pad(2)
def get_node_display_name(node_num: int, node: dict) -> str:
user = node.get("user") or {}
return user.get("longName") or get_name_from_database(node_num, "long")
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
@@ -230,22 +163,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
"""Wait for resize events to settle and preserve one queued non-resize key."""
input_win.timeout(RESIZE_DEBOUNCE_MS)
try:
while True:
try:
next_char = input_win.get_wch()
except curses.error:
return None
def drain_receive_queue(max_events: int = 200) -> None:
processed = 0
while processed < max_events:
try:
packet = app_state.rx_queue.get(block=False)
except Empty:
return
except Exception:
logging.exception("Error while draining receive queue")
return
if next_char == curses.KEY_RESIZE:
continue
return next_char
finally:
input_win.timeout(-1)
try:
process_receive_event(packet)
except Exception:
logging.exception("Error while processing receive event")
processed += 1
def main_ui(stdscr: curses.window) -> None:
@@ -255,20 +188,20 @@ def main_ui(stdscr: curses.window) -> None:
root_win = stdscr
input_text = ""
queued_char = None
stdscr.keypad(True)
get_channels()
handle_resize(stdscr, True)
entry_win.timeout(75)
while True:
drain_receive_queue()
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
if queued_char is None:
try:
char = entry_win.get_wch()
else:
char = queued_char
queued_char = None
except curses.error:
continue
# draw_debug(f"Keypress: {char}")
@@ -316,16 +249,13 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE:
input_text = ""
queued_char = drain_resize_events(entry_win)
handle_resize(stdscr, False)
continue
entry_win.timeout(75)
elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d()
elif char == chr(31) or (
char == "/" and not input_text and ui_state.current_window in (0, 2)
): # Ctrl + / or / to search in channel/node lists
elif char == chr(31): # Ctrl + / to search
handle_ctrl_fslash()
elif char == chr(11): # Ctrl + K for Help
@@ -429,16 +359,31 @@ def handle_leftright(char: int) -> None:
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if ui_state.single_pane_mode:
handle_resize(root_win, False)
return
handle_resize(root_win, False)
refresh_main_window(old_window, selected=False)
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
if old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
refresh_main_window(ui_state.current_window, selected=True)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window)
@@ -459,16 +404,31 @@ def handle_function_keys(char: int) -> None:
return
ui_state.current_window = target
if ui_state.single_pane_mode:
handle_resize(root_win, False)
return
handle_resize(root_win, False)
refresh_main_window(old_window, selected=False)
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
elif old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
refresh_main_window(ui_state.current_window, selected=True)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window)
@@ -520,34 +480,34 @@ def handle_enter(input_text: str) -> str:
def handle_f5_key(stdscr: curses.window) -> None:
if not ui_state.node_list:
return
def build_node_details() -> tuple[str, list[str]]:
node = None
try:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
message_parts = []
message_parts.append("**📋 Basic Information:**")
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
message_parts.append(f"• Role: {node.get('user', {}).get('role', 'Unknown')}")
message_parts.append(f"Public key: {node.get('user', {}).get('publicKey')}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
role = f"{node.get('user', {}).get('role', 'Unknown')}"
message_parts.append(f"• Role: {role}")
pk = f"{node.get('user', {}).get('publicKey')}"
message_parts.append(f"Public key: {pk}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
if "position" in node:
pos = node["position"]
has_coords = pos.get("latitude") and pos.get("longitude")
if has_coords:
if pos.get("latitude") and pos.get("longitude"):
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
if pos.get("altitude"):
message_parts.append(f"• Altitude: {pos['altitude']}m")
if has_coords:
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
message_parts.append("")
message_parts.append("**🌐 Network Metrics:**")
message_parts.append("\n**🌐 Network Metrics:**")
if "snr" in node:
snr = node["snr"]
@@ -567,13 +527,12 @@ def handle_f5_key(stdscr: curses.window) -> None:
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else ""
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
if node.get("lastHeard"):
if "lastHeard" in node and node["lastHeard"]:
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
if node.get("deviceMetrics"):
metrics = node["deviceMetrics"]
message_parts.append("")
message_parts.append("**📊 Device Metrics:**")
message_parts.append("\n**📊 Device Metrics:**")
if "batteryLevel" in metrics:
battery = metrics["batteryLevel"]
@@ -594,128 +553,21 @@ def handle_f5_key(stdscr: curses.window) -> None:
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
title = t(
"ui.dialog.node_details_title",
default="📡 Node Details: {name}",
name=node.get("user", {}).get("shortName", "Unknown"),
message = "\n".join(message_parts)
contact.ui.dialog.dialog(
t(
"ui.dialog.node_details_title",
default="📡 Node Details: {name}",
name=node.get("user", {}).get("shortName", "Unknown"),
),
message,
)
return title, message_parts
previous_window = ui_state.current_window
ui_state.current_window = 4
scroll_offset = 0
dialog_win = None
curses.curs_set(0)
refresh_node_selection(highlight=True)
try:
while True:
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
title, message_lines = build_node_details()
max_line_length = max(len(title), *(len(line) for line in message_lines))
dialog_width = min(max(max_line_length + 4, 20), max(10, width - 2))
dialog_height = min(max(len(message_lines) + 4, 6), max(6, height - 2))
x = max(0, (width - dialog_width) // 2)
y = max(0, (height - dialog_height) // 2)
viewport_h = max(1, dialog_height - 4)
max_scroll = max(0, len(message_lines) - viewport_h)
scroll_offset = max(0, min(scroll_offset, max_scroll))
if dialog_win is None:
dialog_win = curses.newwin(dialog_height, dialog_width, y, x)
else:
dialog_win.erase()
dialog_win.refresh()
dialog_win.resize(dialog_height, dialog_width)
dialog_win.mvwin(y, x)
dialog_win.keypad(True)
dialog_win.bkgd(get_color("background"))
dialog_win.attrset(get_color("window_frame"))
dialog_win.border(0)
try:
dialog_win.addstr(0, 2, title[: max(0, dialog_width - 4)], get_color("settings_default"))
hint = f" {ui_state.selected_node + 1}/{len(ui_state.node_list)} "
dialog_win.addstr(0, max(2, dialog_width - len(hint) - 2), hint, get_color("commands"))
except curses.error:
pass
msg_win = dialog_win.derwin(viewport_h + 2, dialog_width - 2, 1, 1)
msg_win.erase()
for row, line in enumerate(message_lines[scroll_offset : scroll_offset + viewport_h], start=1):
trimmed = line[: max(0, dialog_width - 6)]
try:
msg_win.addstr(row, 1, trimmed, get_color("settings_default"))
except curses.error:
pass
if len(message_lines) > viewport_h:
old_index = ui_state.start_index[4] if len(ui_state.start_index) > 4 else 0
while len(ui_state.start_index) <= 4:
ui_state.start_index.append(0)
ui_state.start_index[4] = scroll_offset
draw_main_arrows(msg_win, len(message_lines) - 1, window=4)
ui_state.start_index[4] = old_index
try:
ok_text = " Up/Down: Nodes PgUp/PgDn: Scroll Esc: Close "
dialog_win.addstr(
dialog_height - 2,
max(1, (dialog_width - len(ok_text)) // 2),
ok_text[: max(0, dialog_width - 2)],
get_color("settings_default", reverse=True),
)
except curses.error:
pass
dialog_win.refresh()
msg_win.noutrefresh()
curses.doupdate()
dialog_win.timeout(200)
char = dialog_win.getch()
if menu_state.need_redraw:
menu_state.need_redraw = False
continue
if char in (27, curses.KEY_LEFT, curses.KEY_ENTER, 10, 13, 32):
break
if char == curses.KEY_UP:
old_selected_node = ui_state.selected_node
ui_state.selected_node = (ui_state.selected_node - 1) % len(ui_state.node_list)
scroll_offset = 0
refresh_node_selection(old_selected_node, highlight=True)
elif char == curses.KEY_DOWN:
old_selected_node = ui_state.selected_node
ui_state.selected_node = (ui_state.selected_node + 1) % len(ui_state.node_list)
scroll_offset = 0
refresh_node_selection(old_selected_node, highlight=True)
elif char == curses.KEY_PPAGE:
scroll_offset = max(0, scroll_offset - viewport_h)
elif char == curses.KEY_NPAGE:
scroll_offset = min(max_scroll, scroll_offset + viewport_h)
elif char == curses.KEY_HOME:
scroll_offset = 0
elif char == curses.KEY_END:
scroll_offset = max_scroll
elif char == curses.KEY_RESIZE:
continue
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
except KeyError:
return
finally:
if dialog_win is not None:
dialog_win.erase()
dialog_win.refresh()
ui_state.current_window = previous_window
curses.curs_set(1)
handle_resize(stdscr, False)
def handle_ctrl_t(stdscr: curses.window) -> None:
@@ -772,7 +624,6 @@ def handle_backtick(stdscr: curses.window) -> None:
ui_state.current_window = 4
settings_menu(stdscr, interface_state.interface)
ui_state.current_window = previous_window
ui_state.single_pane_mode = config.single_pane_mode.lower() == "true"
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
@@ -1014,7 +865,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0
for prefix, message in messages:
full_message = normalize_message_text(f"{prefix}{message}")
full_message = f"{prefix}{message}"
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
@@ -1056,8 +907,10 @@ def draw_node_list() -> None:
if ui_state.current_window != 2 and ui_state.single_pane_mode:
return
if nodes_pad is None:
nodes_pad = curses.newpad(1, 1)
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
# if nodes_pad is None:
# nodes_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1, 1)
try:
nodes_pad.erase()
@@ -1071,12 +924,28 @@ def draw_node_list() -> None:
node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
status_icon = "🔐" if secure else "🔓"
node_name = get_node_display_name(node_num, node)
node_name = get_name_from_database(node_num, "long")
user_name = node["user"]["shortName"]
uptime_str = ""
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]:
uptime_str = f" / Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
last_heard_str = f"{get_time_ago(node['lastHeard'])}" if node.get("lastHeard") else ""
hops_str = f" ■ Hops: {node['hopsAway']}" if "hopsAway" in node else ""
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
# Future node name custom formatting possible
node_str = f"{status_icon} {node_name}"
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
nodes_pad.addstr(i, 1, node_str, get_node_row_color(i))
color = "node_list"
if "isFavorite" in node and node["isFavorite"]:
color = "node_favorite"
if "isIgnored" in node and node["isIgnored"]:
color = "node_ignored"
nodes_pad.addstr(
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
)
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
nodes_win.refresh()

View File

@@ -56,13 +56,6 @@ config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def reload_translations() -> None:
global translation_file, field_mapping, help_text
@@ -571,7 +564,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
new_value = new_value == "True" or new_value is True
menu_state.start_index.pop()
elif _is_repeated_field(field): # Handle repeated field - Not currently used
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop()

View File

@@ -1,3 +1,4 @@
from queue import SimpleQueue
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@@ -44,3 +45,5 @@ class InterfaceState:
@dataclass
class AppState:
lock: Any = None
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
ui_shutdown: bool = False

View File

@@ -566,7 +566,7 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
formatted_json = config.format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
config.reload_config()
setup_colors(reinit=True)
reload_translations(data.get("language"))

View File

@@ -35,10 +35,5 @@ def setup_parser() -> ArgumentParser:
parser.add_argument(
"--settings", "--set", "--control", "-c", help="Launch directly into the settings", action="store_true"
)
parser.add_argument(
"--demo-screenshot",
help="Launch with a fake interface and seeded demo data for screenshots/testing.",
action="store_true",
)
return parser

View File

@@ -9,17 +9,6 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
checking `label == LABEL_REPEATED`.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root)
@@ -100,7 +89,7 @@ def setPref(config, comp_name, raw_val) -> bool:
return False
# repeating fields need to be handled with append, not setattr
if not _is_repeated_field(pref):
if pref.label != pref.LABEL_REPEATED:
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)

View File

@@ -1,226 +0,0 @@
import os
import sqlite3
import tempfile
from dataclasses import dataclass
from typing import Dict, List, Tuple, Union
import contact.ui.default_config as config
from contact.utilities.db_handler import get_table_name
from contact.utilities.singleton import interface_state
DEMO_DB_FILENAME = "contact_demo_client.db"
DEMO_LOCAL_NODE_NUM = 0xC0DEC0DE
DEMO_BASE_TIMESTAMP = 1738717200 # 2025-02-04 17:00:00 UTC
DEMO_CHANNELS = ["MediumFast", "Another Channel"]
@dataclass
class DemoChannelSettings:
name: str
@dataclass
class DemoChannel:
role: bool
settings: DemoChannelSettings
@dataclass
class DemoLoRaConfig:
region: int = 1
modem_preset: int = 0
@dataclass
class DemoLocalConfig:
lora: DemoLoRaConfig
class DemoLocalNode:
def __init__(self, interface: "DemoInterface", channels: List[DemoChannel]) -> None:
self._interface = interface
self.channels = channels
self.localConfig = DemoLocalConfig(lora=DemoLoRaConfig())
def setFavorite(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isFavorite"] = True
def removeFavorite(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isFavorite"] = False
def setIgnored(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isIgnored"] = True
def removeIgnored(self, node_num: int) -> None:
self._interface.nodesByNum[node_num]["isIgnored"] = False
def removeNode(self, node_num: int) -> None:
self._interface.nodesByNum.pop(node_num, None)
class DemoInterface:
def __init__(self, nodes: Dict[int, Dict[str, object]], channels: List[DemoChannel]) -> None:
self.nodesByNum = nodes
self.nodes = self.nodesByNum
self.localNode = DemoLocalNode(self, channels)
def getMyNodeInfo(self) -> Dict[str, int]:
return {"num": DEMO_LOCAL_NODE_NUM}
def getNode(self, selector: str) -> DemoLocalNode:
if selector != "^local":
raise KeyError(selector)
return self.localNode
def close(self) -> None:
return
def build_demo_interface() -> DemoInterface:
channels = [DemoChannel(role=True, settings=DemoChannelSettings(name=name)) for name in DEMO_CHANNELS]
nodes = {
DEMO_LOCAL_NODE_NUM: _build_node(
DEMO_LOCAL_NODE_NUM,
"Meshtastic fb3c",
"fb3c",
hops=0,
snr=13.7,
last_heard_offset=5,
battery=88,
voltage=4.1,
favorite=True,
),
0xA1000001: _build_node(0xA1000001, "KG7NDX-N2", "N2", hops=1, last_heard_offset=18, battery=79, voltage=4.0),
0xA1000002: _build_node(0xA1000002, "Satellite II Repeater", "SAT2", hops=2, last_heard_offset=31),
0xA1000003: _build_node(0xA1000003, "Search for Discord/Meshtastic", "DISC", hops=1, last_heard_offset=46),
0xA1000004: _build_node(0xA1000004, "K7EOK Mobile", "MOBL", hops=1, last_heard_offset=63, battery=52),
0xA1000005: _build_node(0xA1000005, "Turtle", "TRTL", hops=3, last_heard_offset=87),
0xA1000006: _build_node(0xA1000006, "CARS Trewvilliger Plaza", "CARS", hops=2, last_heard_offset=121),
0xA1000007: _build_node(0xA1000007, "No Hands!", "NHDS", hops=1, last_heard_offset=155),
0xA1000008: _build_node(0xA1000008, "McCutie", "MCCU", hops=2, last_heard_offset=211, ignored=True),
0xA1000009: _build_node(0xA1000009, "K1PDX", "K1PX", hops=2, last_heard_offset=267),
0xA100000A: _build_node(0xA100000A, "Arnold Creek", "ARND", hops=1, last_heard_offset=301),
0xA100000B: _build_node(0xA100000B, "Nansen", "NANS", hops=1, last_heard_offset=355),
0xA100000C: _build_node(0xA100000C, "Kodin 1", "KOD1", hops=2, last_heard_offset=402),
0xA100000D: _build_node(0xA100000D, "PH1", "PH1", hops=3, last_heard_offset=470),
0xA100000E: _build_node(0xA100000E, "Luna", "LUNA", hops=1, last_heard_offset=501),
0xA100000F: _build_node(0xA100000F, "sputnik1", "SPUT", hops=1, last_heard_offset=550),
0xA1000010: _build_node(0xA1000010, "K7EOK Maplewood West", "MAPL", hops=2, last_heard_offset=602),
0xA1000011: _build_node(0xA1000011, "KE7YVU 2", "YVU2", hops=2, last_heard_offset=655),
0xA1000012: _build_node(0xA1000012, "DNET", "DNET", hops=1, last_heard_offset=702),
0xA1000013: _build_node(0xA1000013, "Green Bluff", "GBLF", hops=1, last_heard_offset=780),
0xA1000014: _build_node(0xA1000014, "Council Crest Solar", "CCST", hops=2, last_heard_offset=830),
0xA1000015: _build_node(0xA1000015, "Meshtastic 61c7", "61c7", hops=1, last_heard_offset=901),
0xA1000016: _build_node(0xA1000016, "Bella", "BELA", hops=2, last_heard_offset=950),
0xA1000017: _build_node(0xA1000017, "Mojo Solar Base 4f12", "MOJO", hops=1, last_heard_offset=1010, favorite=True),
}
return DemoInterface(nodes=nodes, channels=channels)
def configure_demo_database(base_dir: str = "") -> str:
if not base_dir:
base_dir = tempfile.mkdtemp(prefix="contact_demo_")
os.makedirs(base_dir, exist_ok=True)
db_path = os.path.join(base_dir, DEMO_DB_FILENAME)
if os.path.exists(db_path):
os.remove(db_path)
config.db_file_path = db_path
return db_path
def seed_demo_messages() -> None:
schema = """
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
"""
with sqlite3.connect(config.db_file_path) as db_connection:
cursor = db_connection.cursor()
for channel_name, rows in _demo_messages().items():
table_name = get_table_name(channel_name)
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({schema})")
cursor.executemany(
f"""
INSERT INTO {table_name} (user_id, message_text, timestamp, ack_type)
VALUES (?, ?, ?, ?)
""",
rows,
)
db_connection.commit()
def _build_node(
node_num: int,
long_name: str,
short_name: str,
*,
hops: int,
last_heard_offset: int,
snr: float = 0.0,
battery: int = 0,
voltage: float = 0.0,
favorite: bool = False,
ignored: bool = False,
) -> Dict[str, object]:
node = {
"num": node_num,
"user": {
"longName": long_name,
"shortName": short_name,
"hwModel": "TBEAM",
"role": "CLIENT",
"publicKey": f"pk-{node_num:08x}",
"isLicensed": True,
},
"lastHeard": DEMO_BASE_TIMESTAMP + 3600 - last_heard_offset,
"hopsAway": hops,
"isFavorite": favorite,
"isIgnored": ignored,
}
if snr:
node["snr"] = snr
if battery:
node["deviceMetrics"] = {
"batteryLevel": battery,
"voltage": voltage or 4.0,
"uptimeSeconds": 86400 + node_num % 10000,
"channelUtilization": 12.5 + (node_num % 7),
"airUtilTx": 4.5 + (node_num % 5),
}
if node_num % 3 == 0:
node["position"] = {
"latitude": 45.5231 + ((node_num % 50) * 0.0001),
"longitude": -122.6765 - ((node_num % 50) * 0.0001),
"altitude": 85 + (node_num % 20),
}
return node
def _demo_messages() -> Dict[Union[str, int], List[Tuple[str, str, int, Union[str, None]]]]:
return {
"MediumFast": [
(str(DEMO_LOCAL_NODE_NUM), "Help, I'm stuck in a ditch!", DEMO_BASE_TIMESTAMP + 45, "Ack"),
("2701131778", "Do you require a alpinist?", DEMO_BASE_TIMESTAMP + 80, None),
(str(DEMO_LOCAL_NODE_NUM), "I don't know what that is.", DEMO_BASE_TIMESTAMP + 104, "Implicit"),
],
"Another Channel": [
("2701131788", "Weather is holding for the summit push.", DEMO_BASE_TIMESTAMP + 220, None),
(str(DEMO_LOCAL_NODE_NUM), "Copy that. Keep me posted.", DEMO_BASE_TIMESTAMP + 260, "Ack"),
],
2701131788: [
("2701131788", "Ping me when you are back at the trailhead.", DEMO_BASE_TIMESTAMP + 330, None),
(str(DEMO_LOCAL_NODE_NUM), "Will do.", DEMO_BASE_TIMESTAMP + 350, "Ack"),
],
}

View File

@@ -1,54 +0,0 @@
"""Helpers for normalizing emoji sequences in width-sensitive message rendering."""
# Strip zero-width and presentation modifiers that make terminal cell width inconsistent.
EMOJI_MODIFIER_REPLACEMENTS = {
"\u200d": "",
"\u20e3": "",
"\ufe0e": "",
"\ufe0f": "",
"\U0001F3FB": "",
"\U0001F3FC": "",
"\U0001F3FD": "",
"\U0001F3FE": "",
"\U0001F3FF": "",
}
_EMOJI_MODIFIER_TRANSLATION = str.maketrans(EMOJI_MODIFIER_REPLACEMENTS)
_REGIONAL_INDICATOR_START = ord("\U0001F1E6")
_REGIONAL_INDICATOR_END = ord("\U0001F1FF")
def _regional_indicator_to_letter(char: str) -> str:
return chr(ord("A") + ord(char) - _REGIONAL_INDICATOR_START)
def _normalize_flag_emoji(text: str) -> str:
"""Convert flag emoji built from regional indicators into ASCII country codes."""
normalized = []
index = 0
while index < len(text):
current = text[index]
current_ord = ord(current)
if _REGIONAL_INDICATOR_START <= current_ord <= _REGIONAL_INDICATOR_END and index + 1 < len(text):
next_char = text[index + 1]
next_ord = ord(next_char)
if _REGIONAL_INDICATOR_START <= next_ord <= _REGIONAL_INDICATOR_END:
normalized.append(_regional_indicator_to_letter(current))
normalized.append(_regional_indicator_to_letter(next_char))
index += 2
continue
normalized.append(current)
index += 1
return "".join(normalized)
def normalize_message_text(text: str) -> str:
"""Strip modifiers and rewrite flag emoji into stable terminal-friendly text."""
if not text:
return text
return _normalize_flag_emoji(text.translate(_EMOJI_MODIFIER_TRANSLATION))

View File

@@ -68,19 +68,19 @@ def get_chunks(data):
# Leave it string as last resort
value = value
# Python 3.9-compatible alternative to match/case.
if key == "uptime_seconds":
match key:
# convert seconds to hours, for our sanity
value = round(value / 60 / 60, 1)
elif key in ("longitude_i", "latitude_i"):
case "uptime_seconds":
value = round(value / 60 / 60, 1)
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate
value = round(value * 1e-7, 6)
elif key == "wind_direction":
case "longitude_i" | "latitude_i":
value = round(value * 1e-7, 6)
# Convert wind direction from degrees to abbreviation
value = humanize_wind_direction(value)
elif key == "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
case "wind_direction":
value = humanize_wind_direction(value)
case "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.5.0"
version = "1.4.14"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}

View File

@@ -1 +0,0 @@

View File

@@ -1,13 +0,0 @@
import unittest
from contact.utilities.arg_parser import setup_parser
class ArgParserTests(unittest.TestCase):
def test_demo_screenshot_flag_is_supported(self) -> None:
args = setup_parser().parse_args(["--demo-screenshot"])
self.assertTrue(args.demo_screenshot)
def test_demo_screenshot_defaults_to_false(self) -> None:
args = setup_parser().parse_args([])
self.assertFalse(args.demo_screenshot)

View File

@@ -1,21 +0,0 @@
import unittest
from contact.utilities.config_io import _is_repeated_field, splitCompoundName
class ConfigIoTests(unittest.TestCase):
def test_split_compound_name_preserves_multi_part_values(self) -> None:
self.assertEqual(splitCompoundName("config.device.role"), ["config", "device", "role"])
def test_split_compound_name_duplicates_single_part_values(self) -> None:
self.assertEqual(splitCompoundName("owner"), ["owner", "owner"])
def test_is_repeated_field_prefers_new_style_attribute(self) -> None:
field = type("Field", (), {"is_repeated": True})()
self.assertTrue(_is_repeated_field(field))
def test_is_repeated_field_falls_back_to_label_comparison(self) -> None:
field_type = type("Field", (), {"label": 3, "LABEL_REPEATED": 3})
self.assertTrue(_is_repeated_field(field_type()))

View File

@@ -1,15 +0,0 @@
import unittest
from contact.utilities.control_utils import transform_menu_path
class ControlUtilsTests(unittest.TestCase):
def test_transform_menu_path_applies_replacements_and_normalization(self) -> None:
transformed = transform_menu_path(["Main Menu", "Radio Settings", "Channel 2", "Detail"])
self.assertEqual(transformed, ["config", "channel", "Detail"])
def test_transform_menu_path_preserves_unmatched_entries(self) -> None:
transformed = transform_menu_path(["Main Menu", "Module Settings", "WiFi"])
self.assertEqual(transformed, ["module", "WiFi"])

View File

@@ -1,121 +0,0 @@
import os
import sqlite3
import tempfile
import unittest
import contact.ui.default_config as config
from contact.utilities import db_handler
from contact.utilities.demo_data import DEMO_LOCAL_NODE_NUM, build_demo_interface
from contact.utilities.singleton import interface_state, ui_state
from contact.utilities.utils import decimal_to_hex
from tests.test_support import reset_singletons, restore_config, snapshot_config
class DbHandlerTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config(
"db_file_path",
"message_prefix",
"sent_message_prefix",
"ack_str",
"ack_implicit_str",
"ack_unknown_str",
"nak_str",
)
self.tempdir = tempfile.TemporaryDirectory()
config.db_file_path = os.path.join(self.tempdir.name, "client.db")
interface_state.myNodeNum = 123
def tearDown(self) -> None:
self.tempdir.cleanup()
restore_config(self.saved_config)
reset_singletons()
def test_save_message_to_db_and_update_ack_roundtrip(self) -> None:
timestamp = db_handler.save_message_to_db("Primary", "123", "hello")
self.assertIsInstance(timestamp, int)
db_handler.update_ack_nak("Primary", timestamp, "hello", "Ack")
with sqlite3.connect(config.db_file_path) as conn:
row = conn.execute('SELECT user_id, message_text, ack_type FROM "123_Primary_messages"').fetchone()
self.assertEqual(row, ("123", "hello", "Ack"))
def test_update_node_info_in_db_fills_defaults_and_preserves_existing_values(self) -> None:
db_handler.update_node_info_in_db(999, short_name="ABCD")
original_long_name = db_handler.get_name_from_database(999, "long")
self.assertTrue(original_long_name.startswith("Meshtastic "))
self.assertEqual(db_handler.get_name_from_database(999, "short"), "ABCD")
self.assertEqual(db_handler.is_chat_archived(999), 0)
db_handler.update_node_info_in_db(999, chat_archived=1)
self.assertEqual(db_handler.get_name_from_database(999, "long"), original_long_name)
self.assertEqual(db_handler.get_name_from_database(999, "short"), "ABCD")
self.assertEqual(db_handler.is_chat_archived(999), 1)
def test_get_name_from_database_returns_hex_when_user_is_missing(self) -> None:
user_id = 0x1234ABCD
db_handler.ensure_node_table_exists()
self.assertEqual(db_handler.get_name_from_database(user_id, "short"), decimal_to_hex(user_id))
self.assertEqual(db_handler.is_chat_archived(user_id), 0)
def test_load_messages_from_db_populates_channels_and_messages(self) -> None:
db_handler.update_node_info_in_db(123, long_name="Local Node", short_name="ME")
db_handler.update_node_info_in_db(456, long_name="Remote Node", short_name="RM")
db_handler.update_node_info_in_db(789, long_name="Archived", short_name="AR", chat_archived=1)
db_handler.ensure_table_exists(
'"123_Primary_messages"',
"""
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
""",
)
db_handler.ensure_table_exists(
'"123_789_messages"',
"""
user_id TEXT,
message_text TEXT,
timestamp INTEGER,
ack_type TEXT
""",
)
with sqlite3.connect(config.db_file_path) as conn:
conn.execute('INSERT INTO "123_Primary_messages" VALUES (?, ?, ?, ?)', ("123", "sent", 1700000000, "Ack"))
conn.execute('INSERT INTO "123_Primary_messages" VALUES (?, ?, ?, ?)', ("456", "reply", 1700000001, None))
conn.execute('INSERT INTO "123_789_messages" VALUES (?, ?, ?, ?)', ("789", "hidden", 1700000002, None))
conn.commit()
ui_state.channel_list = []
ui_state.all_messages = {}
db_handler.load_messages_from_db()
self.assertIn("Primary", ui_state.channel_list)
self.assertNotIn(789, ui_state.channel_list)
self.assertIn("Primary", ui_state.all_messages)
self.assertIn(789, ui_state.all_messages)
messages = ui_state.all_messages["Primary"]
self.assertTrue(messages[0][0].startswith("-- "))
self.assertTrue(any(config.sent_message_prefix in prefix and config.ack_str in prefix for prefix, _ in messages))
self.assertTrue(any("RM:" in prefix for prefix, _ in messages))
self.assertEqual(ui_state.all_messages[789][-1][1], "hidden")
def test_init_nodedb_inserts_nodes_from_interface(self) -> None:
interface_state.interface = build_demo_interface()
interface_state.myNodeNum = DEMO_LOCAL_NODE_NUM
db_handler.init_nodedb()
self.assertEqual(db_handler.get_name_from_database(2701131778, "short"), "SAT2")

View File

@@ -1,38 +0,0 @@
import tempfile
import unittest
from contact.ui import default_config
class DefaultConfigTests(unittest.TestCase):
def test_get_localisation_options_filters_hidden_and_non_ini_files(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
for filename in ("en.ini", "ru.ini", ".hidden.ini", "notes.txt"):
with open(f"{tmpdir}/{filename}", "w", encoding="utf-8") as handle:
handle.write("")
self.assertEqual(default_config.get_localisation_options(tmpdir), ["en", "ru"])
def test_get_localisation_file_normalizes_extensions_and_falls_back_to_english(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
for filename in ("en.ini", "ru.ini"):
with open(f"{tmpdir}/{filename}", "w", encoding="utf-8") as handle:
handle.write("")
self.assertTrue(default_config.get_localisation_file("RU.ini", tmpdir).endswith("/ru.ini"))
self.assertTrue(default_config.get_localisation_file("missing", tmpdir).endswith("/en.ini"))
def test_update_dict_only_adds_missing_values(self) -> None:
default = {"theme": "dark", "nested": {"language": "en", "sound": True}}
actual = {"nested": {"language": "ru"}}
updated = default_config.update_dict(default, actual)
self.assertTrue(updated)
self.assertEqual(actual, {"theme": "dark", "nested": {"language": "ru", "sound": True}})
def test_format_json_single_line_arrays_keeps_arrays_inline(self) -> None:
rendered = default_config.format_json_single_line_arrays({"items": [1, 2], "nested": {"flags": ["a", "b"]}})
self.assertIn('"items": [1, 2]', rendered)
self.assertIn('"flags": ["a", "b"]', rendered)

View File

@@ -1,51 +0,0 @@
import tempfile
import unittest
from unittest import mock
import contact.__main__ as entrypoint
import contact.ui.default_config as config
from contact.utilities.db_handler import get_name_from_database
from contact.utilities.demo_data import DEMO_CHANNELS, DEMO_LOCAL_NODE_NUM, build_demo_interface, configure_demo_database
from contact.utilities.singleton import interface_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class DemoDataTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("db_file_path", "node_sort", "single_pane_mode")
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_build_demo_interface_exposes_expected_shape(self) -> None:
interface = build_demo_interface()
self.assertEqual(interface.getMyNodeInfo()["num"], DEMO_LOCAL_NODE_NUM)
self.assertEqual([channel.settings.name for channel in interface.getNode("^local").channels], DEMO_CHANNELS)
self.assertIn(DEMO_LOCAL_NODE_NUM, interface.nodesByNum)
def test_initialize_globals_seed_demo_populates_ui_state_and_db(self) -> None:
interface_state.interface = build_demo_interface()
with tempfile.TemporaryDirectory() as tmpdir:
demo_db_path = configure_demo_database(tmpdir)
with mock.patch.object(entrypoint.pub, "subscribe"):
entrypoint.initialize_globals(seed_demo=True)
self.assertEqual(config.db_file_path, demo_db_path)
self.assertIn("MediumFast", ui_state.channel_list)
self.assertIn("Another Channel", ui_state.channel_list)
self.assertIn(2701131788, ui_state.channel_list)
self.assertEqual(ui_state.node_list[0], DEMO_LOCAL_NODE_NUM)
self.assertEqual(get_name_from_database(2701131778, "short"), "SAT2")
medium_fast = ui_state.all_messages["MediumFast"]
self.assertTrue(medium_fast[0][0].startswith("-- "))
self.assertTrue(any(config.sent_message_prefix in prefix and config.ack_str in prefix for prefix, _ in medium_fast))
self.assertTrue(any("SAT2:" in prefix for prefix, _ in medium_fast))
direct_messages = ui_state.all_messages[2701131788]
self.assertEqual(len(direct_messages), 3)

View File

@@ -1,11 +0,0 @@
import unittest
from contact.utilities.emoji_utils import normalize_message_text
class EmojiUtilsTests(unittest.TestCase):
def test_strips_modifiers_from_keycaps_and_skin_tones(self) -> None:
self.assertEqual(normalize_message_text("👍🏽 7"), "👍 7")
def test_rewrites_flag_emoji_to_country_codes(self) -> None:
self.assertEqual(normalize_message_text("🇺🇸 hello 🇩🇪"), "US hello DE")

View File

@@ -1,57 +0,0 @@
import os
import tempfile
import unittest
from unittest import mock
import contact.ui.default_config as config
from contact.utilities import i18n
from tests.test_support import restore_config, snapshot_config
class I18nTests(unittest.TestCase):
def setUp(self) -> None:
self.saved_config = snapshot_config("language")
i18n._translations = {}
i18n._language = None
def tearDown(self) -> None:
restore_config(self.saved_config)
i18n._translations = {}
i18n._language = None
def test_t_loads_translation_file_and_formats_placeholders(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
translation_file = os.path.join(tmpdir, "xx.ini")
with open(translation_file, "w", encoding="utf-8") as handle:
handle.write('[ui]\n')
handle.write('greeting,"Hello {name}"\n')
config.language = "xx"
with mock.patch.object(config, "get_localisation_file", return_value=translation_file):
self.assertEqual(i18n.t("ui.greeting", name="Ben"), "Hello Ben")
def test_t_falls_back_to_default_and_returns_unformatted_text_on_error(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
translation_file = os.path.join(tmpdir, "xx.ini")
with open(translation_file, "w", encoding="utf-8") as handle:
handle.write('[ui]\n')
handle.write('greeting,"Hello {name}"\n')
config.language = "xx"
with mock.patch.object(config, "get_localisation_file", return_value=translation_file):
self.assertEqual(i18n.t("ui.greeting"), "Hello {name}")
self.assertEqual(i18n.t("ui.missing", default="Fallback"), "Fallback")
self.assertEqual(i18n.t_text("Literal {value}", value=7), "Literal 7")
def test_loader_cache_is_reused_until_language_changes(self) -> None:
config.language = "en"
with mock.patch.object(i18n, "parse_ini_file", return_value=({"key": "value"}, {})) as parse_ini_file:
self.assertEqual(i18n.t("key"), "value")
self.assertEqual(i18n.t("key"), "value")
self.assertEqual(parse_ini_file.call_count, 1)
config.language = "ru"
self.assertEqual(i18n.t("missing", default="fallback"), "fallback")
self.assertEqual(parse_ini_file.call_count, 2)

View File

@@ -1,40 +0,0 @@
import os
import tempfile
import unittest
from unittest import mock
from contact.utilities.ini_utils import parse_ini_file
class IniUtilsTests(unittest.TestCase):
def test_parse_ini_file_reads_sections_fields_and_help_text(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
ini_path = os.path.join(tmpdir, "settings.ini")
with open(ini_path, "w", encoding="utf-8") as handle:
handle.write('; comment\n')
handle.write('[config.device]\n')
handle.write('title,"Device","Device help"\n')
handle.write('name,"Node Name","Node help"\n')
handle.write('empty_help,"Fallback",""\n')
with mock.patch("contact.utilities.ini_utils.i18n.t", return_value="No help available."):
mapping, help_text = parse_ini_file(ini_path)
self.assertEqual(mapping["config.device"], "Device")
self.assertEqual(help_text["config.device"], "Device help")
self.assertEqual(mapping["config.device.name"], "Node Name")
self.assertEqual(help_text["config.device.name"], "Node help")
self.assertEqual(help_text["config.device.empty_help"], "No help available.")
def test_parse_ini_file_uses_builtin_help_fallback_when_i18n_fails(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
ini_path = os.path.join(tmpdir, "settings.ini")
with open(ini_path, "w", encoding="utf-8") as handle:
handle.write('[section]\n')
handle.write('name,"Name"\n')
with mock.patch("contact.utilities.ini_utils.i18n.t", side_effect=RuntimeError("boom")):
mapping, help_text = parse_ini_file(ini_path)
self.assertEqual(mapping["section.name"], "Name")
self.assertEqual(help_text["section.name"], "No help available.")

View File

@@ -1,178 +0,0 @@
from argparse import Namespace
import unittest
from unittest import mock
import contact.__main__ as entrypoint
import contact.ui.default_config as config
from contact.utilities.singleton import interface_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class MainRuntimeTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("single_pane_mode")
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_initialize_runtime_interface_uses_demo_branch(self) -> None:
args = Namespace(demo_screenshot=True)
with mock.patch.object(entrypoint, "configure_demo_database") as configure_demo_database:
with mock.patch.object(entrypoint, "build_demo_interface", return_value="demo-interface") as build_demo:
with mock.patch.object(entrypoint, "initialize_interface") as initialize_interface:
result = entrypoint.initialize_runtime_interface(args)
self.assertEqual(result, "demo-interface")
configure_demo_database.assert_called_once_with()
build_demo.assert_called_once_with()
initialize_interface.assert_not_called()
def test_initialize_runtime_interface_uses_live_branch_without_demo_flag(self) -> None:
args = Namespace(demo_screenshot=False)
with mock.patch.object(entrypoint, "initialize_interface", return_value="live-interface") as initialize_interface:
result = entrypoint.initialize_runtime_interface(args)
self.assertEqual(result, "live-interface")
initialize_interface.assert_called_once_with(args)
def test_prompt_region_if_unset_reinitializes_interface_after_confirmation(self) -> None:
args = Namespace()
old_interface = mock.Mock()
new_interface = mock.Mock()
interface_state.interface = old_interface
with mock.patch.object(entrypoint, "get_list_input", return_value="Yes"):
with mock.patch.object(entrypoint, "set_region") as set_region:
with mock.patch.object(entrypoint, "initialize_interface", return_value=new_interface) as initialize:
entrypoint.prompt_region_if_unset(args)
set_region.assert_called_once_with(old_interface)
old_interface.close.assert_called_once_with()
initialize.assert_called_once_with(args)
self.assertIs(interface_state.interface, new_interface)
def test_prompt_region_if_unset_leaves_interface_unchanged_when_declined(self) -> None:
args = Namespace()
interface = mock.Mock()
interface_state.interface = interface
with mock.patch.object(entrypoint, "get_list_input", return_value="No"):
with mock.patch.object(entrypoint, "set_region") as set_region:
with mock.patch.object(entrypoint, "initialize_interface") as initialize:
entrypoint.prompt_region_if_unset(args)
set_region.assert_not_called()
initialize.assert_not_called()
interface.close.assert_not_called()
self.assertIs(interface_state.interface, interface)
def test_initialize_globals_resets_and_populates_runtime_state(self) -> None:
ui_state.channel_list = ["stale"]
ui_state.all_messages = {"stale": [("old", "message")]}
ui_state.notifications = [1]
ui_state.packet_buffer = ["packet"]
ui_state.node_list = [99]
ui_state.selected_channel = 3
ui_state.selected_message = 4
ui_state.selected_node = 5
ui_state.start_index = [9, 9, 9]
config.single_pane_mode = "True"
with mock.patch.object(entrypoint, "get_nodeNum", return_value=123):
with mock.patch.object(entrypoint, "get_channels", return_value=["Primary"]) as get_channels:
with mock.patch.object(entrypoint, "get_node_list", return_value=[123, 456]) as get_node_list:
with mock.patch.object(entrypoint.pub, "subscribe") as subscribe:
with mock.patch.object(entrypoint, "init_nodedb") as init_nodedb:
with mock.patch.object(entrypoint, "seed_demo_messages") as seed_demo_messages:
with mock.patch.object(entrypoint, "load_messages_from_db") as load_messages:
entrypoint.initialize_globals(seed_demo=True)
self.assertEqual(ui_state.channel_list, ["Primary"])
self.assertEqual(ui_state.all_messages, {})
self.assertEqual(ui_state.notifications, [])
self.assertEqual(ui_state.packet_buffer, [])
self.assertEqual(ui_state.node_list, [123, 456])
self.assertEqual(ui_state.selected_channel, 0)
self.assertEqual(ui_state.selected_message, 0)
self.assertEqual(ui_state.selected_node, 0)
self.assertEqual(ui_state.start_index, [0, 0, 0])
self.assertTrue(ui_state.single_pane_mode)
self.assertEqual(interface_state.myNodeNum, 123)
get_channels.assert_called_once_with()
get_node_list.assert_called_once_with()
subscribe.assert_called_once_with(entrypoint.on_receive, "meshtastic.receive")
init_nodedb.assert_called_once_with()
seed_demo_messages.assert_called_once_with()
load_messages.assert_called_once_with()
def test_ensure_min_rows_retries_until_terminal_is_large_enough(self) -> None:
stdscr = mock.Mock()
stdscr.getmaxyx.side_effect = [(10, 80), (11, 80)]
with mock.patch.object(entrypoint, "dialog") as dialog:
with mock.patch.object(entrypoint.curses, "update_lines_cols") as update_lines_cols:
entrypoint.ensure_min_rows(stdscr, min_rows=11)
dialog.assert_called_once()
update_lines_cols.assert_called_once_with()
stdscr.clear.assert_called_once_with()
stdscr.refresh.assert_called_once_with()
def test_start_prints_help_and_exits_zero(self) -> None:
parser = mock.Mock()
with mock.patch.object(entrypoint.sys, "argv", ["contact", "--help"]):
with mock.patch.object(entrypoint, "setup_parser", return_value=parser):
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(0)) as exit_mock:
with self.assertRaises(SystemExit) as raised:
entrypoint.start()
self.assertEqual(raised.exception.code, 0)
parser.print_help.assert_called_once_with()
exit_mock.assert_called_once_with(0)
def test_start_runs_curses_wrapper_and_closes_interface(self) -> None:
interface = mock.Mock()
interface_state.interface = interface
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
with mock.patch.object(entrypoint.curses, "wrapper") as wrapper:
entrypoint.start()
wrapper.assert_called_once_with(entrypoint.main)
interface.close.assert_called_once_with()
def test_start_handles_keyboard_interrupt(self) -> None:
interface = mock.Mock()
interface_state.interface = interface
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
with mock.patch.object(entrypoint.curses, "wrapper", side_effect=KeyboardInterrupt):
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(0)) as exit_mock:
with self.assertRaises(SystemExit) as raised:
entrypoint.start()
self.assertEqual(raised.exception.code, 0)
interface.close.assert_called_once_with()
exit_mock.assert_called_once_with(0)
def test_start_handles_fatal_exception_and_exits_one(self) -> None:
with mock.patch.object(entrypoint.sys, "argv", ["contact"]):
with mock.patch.object(entrypoint.curses, "wrapper", side_effect=RuntimeError("boom")):
with mock.patch.object(entrypoint.curses, "endwin") as endwin:
with mock.patch.object(entrypoint.traceback, "print_exc") as print_exc:
with mock.patch("builtins.print") as print_mock:
with mock.patch.object(entrypoint.sys, "exit", side_effect=SystemExit(1)) as exit_mock:
with self.assertRaises(SystemExit) as raised:
entrypoint.start()
self.assertEqual(raised.exception.code, 1)
endwin.assert_called_once_with()
print_exc.assert_called_once_with()
print_mock.assert_any_call("Fatal error:", mock.ANY)
exit_mock.assert_called_once_with(1)

View File

@@ -1,96 +0,0 @@
import unittest
from unittest import mock
import contact.ui.default_config as config
from contact.message_handlers import rx_handler
from contact.utilities.singleton import interface_state, menu_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class RxHandlerTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("notification_sound", "message_prefix")
config.notification_sound = "False"
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_on_receive_text_message_refreshes_selected_channel(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.all_messages = {"Primary": []}
ui_state.selected_channel = 0
packet = {
"from": 222,
"to": 999,
"channel": 0,
"hopStart": 3,
"hopLimit": 1,
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"},
}
with mock.patch.object(rx_handler, "refresh_node_list", return_value=True):
with mock.patch.object(rx_handler, "draw_node_list") as draw_node_list:
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
draw_node_list.assert_called_once_with()
draw_messages_window.assert_called_once_with(True)
draw_channel_list.assert_not_called()
add_notification.assert_not_called()
save_message_to_db.assert_called_once_with("Primary", 222, "hello")
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
self.assertIn("SAT2:", ui_state.all_messages["Primary"][-1][0])
self.assertIn("[2]", ui_state.all_messages["Primary"][-1][0])
def test_on_receive_direct_message_adds_channel_and_notification(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.all_messages = {"Primary": []}
ui_state.selected_channel = 0
packet = {
"from": 222,
"to": 111,
"hopStart": 1,
"hopLimit": 1,
"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"dm"},
}
with mock.patch.object(rx_handler, "refresh_node_list", return_value=False):
with mock.patch.object(rx_handler, "draw_messages_window") as draw_messages_window:
with mock.patch.object(rx_handler, "draw_channel_list") as draw_channel_list:
with mock.patch.object(rx_handler, "add_notification") as add_notification:
with mock.patch.object(rx_handler, "update_node_info_in_db") as update_node_info_in_db:
with mock.patch.object(rx_handler, "save_message_to_db") as save_message_to_db:
with mock.patch.object(rx_handler, "get_name_from_database", return_value="SAT2"):
rx_handler.on_receive(packet, interface=None)
self.assertIn(222, ui_state.channel_list)
self.assertIn(222, ui_state.all_messages)
draw_messages_window.assert_not_called()
draw_channel_list.assert_called_once_with()
add_notification.assert_called_once_with(1)
update_node_info_in_db.assert_called_once_with(222, chat_archived=False)
save_message_to_db.assert_called_once_with(222, 222, "dm")
def test_on_receive_trims_packet_buffer_even_when_packet_is_undecoded(self) -> None:
ui_state.packet_buffer = list(range(25))
ui_state.display_log = True
ui_state.current_window = 4
with mock.patch.object(rx_handler, "draw_packetlog_win") as draw_packetlog_win:
rx_handler.on_receive({"id": "new"}, interface=None)
draw_packetlog_win.assert_called_once_with()
self.assertEqual(len(ui_state.packet_buffer), 20)
self.assertEqual(ui_state.packet_buffer[-1], {"id": "new"})
self.assertTrue(menu_state.need_redraw)

View File

@@ -1,27 +0,0 @@
import threading
import contact.ui.default_config as config
from contact.ui.ui_state import AppState, ChatUIState, InterfaceState, MenuState
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
def reset_singletons() -> None:
_reset_instance(ui_state, ChatUIState())
_reset_instance(interface_state, InterfaceState())
_reset_instance(menu_state, MenuState())
_reset_instance(app_state, AppState())
app_state.lock = threading.Lock()
def restore_config(saved: dict) -> None:
for key, value in saved.items():
setattr(config, key, value)
def snapshot_config(*keys: str) -> dict:
return {key: getattr(config, key) for key in keys}
def _reset_instance(target: object, replacement: object) -> None:
target.__dict__.clear()
target.__dict__.update(replacement.__dict__)

View File

@@ -1,27 +0,0 @@
import unittest
from unittest import mock
from contact.utilities.telemetry_beautifier import get_chunks, humanize_wind_direction
class TelemetryBeautifierTests(unittest.TestCase):
def test_humanize_wind_direction_handles_boundaries(self) -> None:
self.assertEqual(humanize_wind_direction(0), "N")
self.assertEqual(humanize_wind_direction(90), "E")
self.assertEqual(humanize_wind_direction(225), "SW")
self.assertIsNone(humanize_wind_direction(-1))
def test_get_chunks_formats_known_and_unknown_values(self) -> None:
rendered = get_chunks("uptime_seconds:7200\nwind_direction:90\nlatitude_i:123456789\nunknown:abc\n")
self.assertIn("🆙 2.0h", rendered)
self.assertIn("⮆ E", rendered)
self.assertIn("🌍 12.345679", rendered)
self.assertIn("unknown:abc", rendered)
def test_get_chunks_formats_time_values(self) -> None:
with mock.patch("contact.utilities.telemetry_beautifier.datetime.datetime") as mocked_datetime:
mocked_datetime.fromtimestamp.return_value.strftime.return_value = "01.01.1970 00:00"
rendered = get_chunks("time:0\n")
self.assertIn("🕔 01.01.1970 00:00", rendered)

View File

@@ -1,107 +0,0 @@
from types import SimpleNamespace
import unittest
from unittest import mock
from meshtastic import BROADCAST_NUM
import contact.ui.default_config as config
from contact.message_handlers import tx_handler
from contact.utilities.singleton import interface_state, ui_state
from tests.test_support import reset_singletons, restore_config, snapshot_config
class TxHandlerTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
tx_handler.ack_naks.clear()
self.saved_config = snapshot_config("sent_message_prefix", "ack_str", "ack_implicit_str", "nak_str", "ack_unknown_str")
def tearDown(self) -> None:
tx_handler.ack_naks.clear()
restore_config(self.saved_config)
reset_singletons()
def test_send_message_on_named_channel_tracks_ack_request(self) -> None:
interface = mock.Mock()
interface.sendText.return_value = SimpleNamespace(id="req-1")
interface_state.interface = interface
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.all_messages = {"Primary": []}
with mock.patch.object(tx_handler, "save_message_to_db", return_value=999) as save_message_to_db:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[00:00:00] "):
tx_handler.send_message("hello", channel=0)
interface.sendText.assert_called_once_with(
text="hello",
destinationId=BROADCAST_NUM,
wantAck=True,
wantResponse=False,
onResponse=tx_handler.onAckNak,
channelIndex=0,
)
save_message_to_db.assert_called_once_with("Primary", 111, "hello")
self.assertEqual(tx_handler.ack_naks["req-1"]["channel"], "Primary")
self.assertEqual(tx_handler.ack_naks["req-1"]["messageIndex"], 1)
self.assertEqual(tx_handler.ack_naks["req-1"]["timestamp"], 999)
self.assertEqual(ui_state.all_messages["Primary"][-1][1], "hello")
def test_send_message_to_direct_node_uses_node_as_destination(self) -> None:
interface = mock.Mock()
interface.sendText.return_value = SimpleNamespace(id="req-2")
interface_state.interface = interface
interface_state.myNodeNum = 111
ui_state.channel_list = [222]
ui_state.all_messages = {222: []}
with mock.patch.object(tx_handler, "save_message_to_db", return_value=123):
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[00:00:00] "):
tx_handler.send_message("dm", channel=0)
interface.sendText.assert_called_once_with(
text="dm",
destinationId=222,
wantAck=True,
wantResponse=False,
onResponse=tx_handler.onAckNak,
channelIndex=0,
)
self.assertEqual(tx_handler.ack_naks["req-2"]["channel"], 222)
def test_on_ack_nak_updates_message_for_explicit_ack(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.selected_channel = 0
ui_state.all_messages = {"Primary": [("pending", "hello")]}
tx_handler.ack_naks["req"] = {"channel": "Primary", "messageIndex": 0, "timestamp": 55}
packet = {"from": 222, "decoded": {"requestId": "req", "routing": {"errorReason": "NONE"}}}
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
with mock.patch("contact.ui.contact_ui.draw_messages_window") as draw_messages_window:
tx_handler.onAckNak(packet)
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Ack")
draw_messages_window.assert_called_once_with()
self.assertIn(config.sent_message_prefix, ui_state.all_messages["Primary"][0][0])
self.assertIn(config.ack_str, ui_state.all_messages["Primary"][0][0])
def test_on_ack_nak_uses_implicit_marker_for_self_ack(self) -> None:
interface_state.myNodeNum = 111
ui_state.channel_list = ["Primary"]
ui_state.selected_channel = 0
ui_state.all_messages = {"Primary": [("pending", "hello")]}
tx_handler.ack_naks["req"] = {"channel": "Primary", "messageIndex": 0, "timestamp": 55}
packet = {"from": 111, "decoded": {"requestId": "req", "routing": {"errorReason": "NONE"}}}
with mock.patch.object(tx_handler, "update_ack_nak") as update_ack_nak:
with mock.patch("contact.message_handlers.tx_handler.time.strftime", return_value="[01:02:03] "):
with mock.patch("contact.ui.contact_ui.draw_messages_window"):
tx_handler.onAckNak(packet)
update_ack_nak.assert_called_once_with("Primary", 55, "hello", "Implicit")
self.assertIn(config.ack_implicit_str, ui_state.all_messages["Primary"][0][0])

View File

@@ -1,71 +0,0 @@
import unittest
from unittest import mock
import contact.ui.default_config as config
from contact.utilities.demo_data import DEMO_LOCAL_NODE_NUM, build_demo_interface
from contact.utilities.singleton import interface_state, ui_state
from contact.utilities.utils import add_new_message, get_channels, get_node_list, parse_protobuf
from tests.test_support import reset_singletons, restore_config, snapshot_config
class UtilsTests(unittest.TestCase):
def setUp(self) -> None:
reset_singletons()
self.saved_config = snapshot_config("node_sort")
def tearDown(self) -> None:
restore_config(self.saved_config)
reset_singletons()
def test_get_node_list_keeps_local_first_and_ignored_last(self) -> None:
config.node_sort = "lastHeard"
interface = build_demo_interface()
interface_state.interface = interface
interface_state.myNodeNum = DEMO_LOCAL_NODE_NUM
node_list = get_node_list()
self.assertEqual(node_list[0], DEMO_LOCAL_NODE_NUM)
self.assertEqual(node_list[-1], 0xA1000008)
def test_add_new_message_groups_messages_by_hour(self) -> None:
ui_state.all_messages = {"MediumFast": []}
with mock.patch("contact.utilities.utils.time.time", side_effect=[1000, 1000]):
with mock.patch("contact.utilities.utils.time.strftime", return_value="[00:16:40] "):
with mock.patch("contact.utilities.utils.datetime.datetime") as mocked_datetime:
mocked_datetime.fromtimestamp.return_value.strftime.return_value = "2025-02-04 17:00"
add_new_message("MediumFast", ">> Test: ", "First")
add_new_message("MediumFast", ">> Test: ", "Second")
self.assertEqual(
ui_state.all_messages["MediumFast"],
[
("-- 2025-02-04 17:00 --", ""),
("[00:16:40] >> Test: ", "First"),
("[00:16:40] >> Test: ", "Second"),
],
)
def test_get_channels_populates_message_buckets_for_device_channels(self) -> None:
interface_state.interface = build_demo_interface()
ui_state.channel_list = []
ui_state.all_messages = {}
channels = get_channels()
self.assertIn("MediumFast", channels)
self.assertIn("Another Channel", channels)
self.assertIn("MediumFast", ui_state.all_messages)
self.assertIn("Another Channel", ui_state.all_messages)
def test_parse_protobuf_returns_string_payload_unchanged(self) -> None:
packet = {"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": "hello"}}
self.assertEqual(parse_protobuf(packet), "hello")
def test_parse_protobuf_returns_placeholder_for_text_messages(self) -> None:
packet = {"decoded": {"portnum": "TEXT_MESSAGE_APP", "payload": b"hello"}}
self.assertEqual(parse_protobuf(packet), "✉️")

View File

@@ -1,14 +0,0 @@
import unittest
from contact.utilities.validation_rules import get_validation_for
class ValidationRulesTests(unittest.TestCase):
def test_get_validation_for_matches_exact_keys(self) -> None:
self.assertEqual(get_validation_for("shortName"), {"max_length": 4})
def test_get_validation_for_matches_substrings(self) -> None:
self.assertEqual(get_validation_for("config.position.latitude"), {"min_value": -90, "max_value": 90})
def test_get_validation_for_returns_empty_dict_for_unknown_key(self) -> None:
self.assertEqual(get_validation_for("totally_unknown"), {})