From d2e559ed0431cfdb5b192dbddaac0416f820676e Mon Sep 17 00:00:00 2001 From: pdxlocations Date: Fri, 20 Feb 2026 21:47:19 -0700 Subject: [PATCH] Implement receive queue and UI shutdown handling in app state --- contact/__main__.py | 13 +- contact/message_handlers/rx_handler.py | 267 +++++++++++++------------ contact/ui/contact_ui.py | 30 ++- contact/ui/ui_state.py | 3 + 4 files changed, 179 insertions(+), 134 deletions(-) diff --git a/contact/__main__.py b/contact/__main__.py index 5749171..d33c25b 100644 --- a/contact/__main__.py +++ b/contact/__main__.py @@ -15,6 +15,7 @@ import curses import io import logging import os +import queue import subprocess import sys import threading @@ -54,6 +55,8 @@ logging.basicConfig( ) app_state.lock = threading.Lock() +app_state.rx_queue = queue.SimpleQueue() +app_state.ui_shutdown = False # ------------------------------------------------------------------------------ @@ -149,11 +152,10 @@ def start() -> None: sys.exit(0) try: + app_state.ui_shutdown = False curses.wrapper(main) - interface_state.interface.close() except KeyboardInterrupt: logging.info("User exited with Ctrl+C") - interface_state.interface.close() sys.exit(0) except Exception as e: logging.critical("Fatal error", exc_info=True) @@ -164,6 +166,13 @@ def start() -> None: print("Fatal error:", e) traceback.print_exc() sys.exit(1) + finally: + app_state.ui_shutdown = True + try: + if interface_state.interface is not None: + interface_state.interface.close() + except Exception: + logging.exception("Error while closing interface") if __name__ == "__main__": diff --git a/contact/message_handlers/rx_handler.py b/contact/message_handlers/rx_handler.py index 4a9cfdf..e020500 100644 --- a/contact/message_handlers/rx_handler.py +++ b/contact/message_handlers/rx_handler.py @@ -2,10 +2,22 @@ import logging import os import platform import shutil -import time import subprocess import threading - # Debounce notification sounds so a burst of queued messages only plays once. +import time +from typing import Any, Dict + +import contact.ui.default_config as config +from contact.utilities.db_handler import ( + get_name_from_database, + maybe_store_nodeinfo_in_db, + save_message_to_db, + update_node_info_in_db, +) +from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state +from contact.utilities.utils import add_new_message, refresh_node_list + +# Debounce notification sounds so a burst of queued messages only plays once. _SOUND_DEBOUNCE_SECONDS = 0.8 _sound_timer: threading.Timer | None = None _sound_timer_lock = threading.Lock() @@ -13,18 +25,13 @@ _last_sound_request = 0.0 def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: - """Schedule a notification sound after a short quiet period. - - If more messages arrive before the delay elapses, the timer is reset. - This prevents playing a sound for each message when a backlog flushes. - """ + """Schedule a notification sound after a short quiet period.""" global _sound_timer, _last_sound_request now = time.monotonic() with _sound_timer_lock: _last_sound_request = now - # Cancel any previously scheduled sound. if _sound_timer is not None: try: _sound_timer.cancel() @@ -33,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: _sound_timer = None def _fire(expected_request_time: float) -> None: - # Only play if nothing newer has been scheduled. with _sound_timer_lock: if expected_request_time != _last_sound_request: return @@ -42,44 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: _sound_timer = threading.Timer(delay, _fire, args=(now,)) _sound_timer.daemon = True _sound_timer.start() -from typing import Any, Dict - -from contact.utilities.utils import ( - refresh_node_list, - add_new_message, -) -from contact.ui.contact_ui import ( - draw_packetlog_win, - draw_node_list, - draw_messages_window, - draw_channel_list, - add_notification, -) -from contact.utilities.db_handler import ( - save_message_to_db, - maybe_store_nodeinfo_in_db, - get_name_from_database, - update_node_info_in_db, -) -import contact.ui.default_config as config - -from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state -def play_sound(): +def play_sound() -> None: try: system = platform.system() sound_path = None executable = None - if system == "Darwin": # macOS + if system == "Darwin": sound_path = "/System/Library/Sounds/Ping.aiff" executable = "afplay" - elif system == "Linux": ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga" - wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback - + wav_path = "/usr/share/sounds/alsa/Front_Center.wav" if shutil.which("paplay") and os.path.exists(ogg_path): executable = "paplay" sound_path = ogg_path @@ -96,102 +78,127 @@ def play_sound(): cmd = [executable, sound_path] if executable == "ffplay": cmd = [executable, "-nodisp", "-autoexit", sound_path] - subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - return + except subprocess.CalledProcessError as exc: + logging.error("Sound playback failed: %s", exc) + except Exception as exc: + logging.error("Unexpected error while playing sound: %s", exc) - except subprocess.CalledProcessError as e: - logging.error(f"Sound playback failed: {e}") - except Exception as e: - logging.error(f"Unexpected error: {e}") + +def _decode_message_payload(payload: Any) -> str: + if isinstance(payload, bytes): + return payload.decode("utf-8", errors="replace") + if isinstance(payload, str): + return payload + return str(payload) + + +def process_receive_event(packet: Dict[str, Any]) -> None: + """Process a queued packet on the UI thread and perform all UI updates.""" + # Local import prevents module-level circular import. + from contact.ui.contact_ui import ( + add_notification, + draw_channel_list, + draw_messages_window, + draw_node_list, + draw_packetlog_win, + ) + + # Update packet log + ui_state.packet_buffer.append(packet) + if len(ui_state.packet_buffer) > 20: + ui_state.packet_buffer = ui_state.packet_buffer[-20:] + + if ui_state.display_log: + draw_packetlog_win() + if ui_state.current_window == 4: + menu_state.need_redraw = True + + decoded = packet.get("decoded") + if not isinstance(decoded, dict): + return + + changed = refresh_node_list() + if changed: + draw_node_list() + + portnum = decoded.get("portnum") + if portnum == "NODEINFO_APP": + user = decoded.get("user") + if isinstance(user, dict) and "longName" in user: + maybe_store_nodeinfo_in_db(packet) + return + + if portnum != "TEXT_MESSAGE_APP": + return + + hop_start = packet.get("hopStart", 0) + hop_limit = packet.get("hopLimit", 0) + hops = hop_start - hop_limit + + if config.notification_sound == "True": + schedule_notification_sound() + + message_string = _decode_message_payload(decoded.get("payload")) + + if not ui_state.channel_list: + return + + refresh_channels = False + refresh_messages = False + + channel_number = packet.get("channel", 0) + if not isinstance(channel_number, int): + channel_number = 0 + if channel_number < 0: + channel_number = 0 + + packet_from = packet.get("from") + if packet.get("to") == interface_state.myNodeNum and packet_from is not None: + if packet_from not in ui_state.channel_list: + ui_state.channel_list.append(packet_from) + if packet_from not in ui_state.all_messages: + ui_state.all_messages[packet_from] = [] + update_node_info_in_db(packet_from, chat_archived=False) + refresh_channels = True + channel_number = ui_state.channel_list.index(packet_from) + + if channel_number >= len(ui_state.channel_list): + channel_number = 0 + + channel_id = ui_state.channel_list[channel_number] + + if ui_state.selected_channel >= len(ui_state.channel_list): + ui_state.selected_channel = 0 + + if channel_id != ui_state.channel_list[ui_state.selected_channel]: + add_notification(channel_number) + refresh_channels = True + else: + refresh_messages = True + + if packet_from is None: + logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field") + return + + message_from_string = get_name_from_database(packet_from, type="short") + ":" + add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string) + + if refresh_channels: + draw_channel_list() + if refresh_messages: + draw_messages_window(True) + + save_message_to_db(channel_id, packet_from, message_string) def on_receive(packet: Dict[str, Any], interface: Any) -> None: - """ - Handles an incoming packet from a Meshtastic interface. - - Args: - packet: The received Meshtastic packet as a dictionary. - interface: The Meshtastic interface instance that received the packet. - """ - with app_state.lock: - # Update packet log - ui_state.packet_buffer.append(packet) - if len(ui_state.packet_buffer) > 20: - # Trim buffer to 20 packets - ui_state.packet_buffer = ui_state.packet_buffer[-20:] - - if ui_state.display_log: - draw_packetlog_win() - - if ui_state.current_window == 4: - menu_state.need_redraw = True - try: - if "decoded" not in packet: - return - - # Assume any incoming packet could update the last seen time for a node - changed = refresh_node_list() - if changed: - draw_node_list() - - if packet["decoded"]["portnum"] == "NODEINFO_APP": - if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]: - maybe_store_nodeinfo_in_db(packet) - - elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP": - hop_start = packet.get('hopStart', 0) - hop_limit = packet.get('hopLimit', 0) - - hops = hop_start - hop_limit - - - if config.notification_sound == "True": - schedule_notification_sound() - - message_bytes = packet["decoded"]["payload"] - message_string = message_bytes.decode("utf-8") - - refresh_channels = False - refresh_messages = False - - if packet.get("channel"): - channel_number = packet["channel"] - else: - channel_number = 0 - - if packet["to"] == interface_state.myNodeNum: - if packet["from"] in ui_state.channel_list: - pass - else: - ui_state.channel_list.append(packet["from"]) - if packet["from"] not in ui_state.all_messages: - ui_state.all_messages[packet["from"]] = [] - update_node_info_in_db(packet["from"], chat_archived=False) - refresh_channels = True - - channel_number = ui_state.channel_list.index(packet["from"]) - - channel_id = ui_state.channel_list[channel_number] - - if channel_id != ui_state.channel_list[ui_state.selected_channel]: - add_notification(channel_number) - refresh_channels = True - else: - refresh_messages = True - - # Add received message to the messages list - message_from_id = packet["from"] - message_from_string = get_name_from_database(message_from_id, type="short") + ":" - - add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string) - - if refresh_channels: - draw_channel_list() - if refresh_messages: - draw_messages_window(True) - - save_message_to_db(channel_id, message_from_id, message_string) - - except KeyError as e: - logging.error(f"Error processing packet: {e}") + """Enqueue packet to be processed on the main curses thread.""" + if app_state.ui_shutdown: + return + if not isinstance(packet, dict): + return + try: + app_state.rx_queue.put(packet) + except Exception: + logging.exception("Failed to enqueue packet for UI processing") diff --git a/contact/ui/contact_ui.py b/contact/ui/contact_ui.py index 55fd0c1..7837ca2 100644 --- a/contact/ui/contact_ui.py +++ b/contact/ui/contact_ui.py @@ -1,9 +1,11 @@ import curses import logging +from queue import Empty import time import traceback from typing import Union +from contact.message_handlers.rx_handler import process_receive_event from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list from contact.settings import settings_menu from contact.message_handlers.tx_handler import send_message, send_traceroute @@ -15,7 +17,7 @@ from contact.utilities.i18n import t import contact.ui.default_config as config import contact.ui.dialog from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text -from contact.utilities.singleton import ui_state, interface_state, menu_state +from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state MIN_COL = 1 # "effectively zero" without breaking curses @@ -161,6 +163,24 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None: pass +def drain_receive_queue(max_events: int = 200) -> None: + processed = 0 + while processed < max_events: + try: + packet = app_state.rx_queue.get(block=False) + except Empty: + return + except Exception: + logging.exception("Error while draining receive queue") + return + + try: + process_receive_event(packet) + except Exception: + logging.exception("Error while processing receive event") + processed += 1 + + def main_ui(stdscr: curses.window) -> None: """Main UI loop for the curses interface.""" global input_text @@ -171,12 +191,17 @@ def main_ui(stdscr: curses.window) -> None: stdscr.keypad(True) get_channels() handle_resize(stdscr, True) + entry_win.timeout(75) while True: + drain_receive_queue() draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) # Get user input from entry window - char = entry_win.get_wch() + try: + char = entry_win.get_wch() + except curses.error: + continue # draw_debug(f"Keypress: {char}") @@ -225,6 +250,7 @@ def main_ui(stdscr: curses.window) -> None: elif char == curses.KEY_RESIZE: input_text = "" handle_resize(stdscr, False) + entry_win.timeout(75) elif char == chr(4): # Ctrl + D to delete current channel or node handle_ctrl_d() diff --git a/contact/ui/ui_state.py b/contact/ui/ui_state.py index 291c5f5..08e7f39 100644 --- a/contact/ui/ui_state.py +++ b/contact/ui/ui_state.py @@ -1,3 +1,4 @@ +from queue import SimpleQueue from typing import Any, Union, List, Dict from dataclasses import dataclass, field @@ -44,3 +45,5 @@ class InterfaceState: @dataclass class AppState: lock: Any = None + rx_queue: SimpleQueue = field(default_factory=SimpleQueue) + ui_shutdown: bool = False