Compare commits

...

1 Commits

Author SHA1 Message Date
pdxlocations d2e559ed04 Implement receive queue and UI shutdown handling in app state 2026-02-20 21:47:19 -07:00
4 changed files with 179 additions and 134 deletions
+11 -2
View File
@@ -15,6 +15,7 @@ import curses
import io import io
import logging import logging
import os import os
import queue
import subprocess import subprocess
import sys import sys
import threading import threading
@@ -54,6 +55,8 @@ logging.basicConfig(
) )
app_state.lock = threading.Lock() app_state.lock = threading.Lock()
app_state.rx_queue = queue.SimpleQueue()
app_state.ui_shutdown = False
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@@ -149,11 +152,10 @@ def start() -> None:
sys.exit(0) sys.exit(0)
try: try:
app_state.ui_shutdown = False
curses.wrapper(main) curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("User exited with Ctrl+C") logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
logging.critical("Fatal error", exc_info=True) logging.critical("Fatal error", exc_info=True)
@@ -164,6 +166,13 @@ def start() -> None:
print("Fatal error:", e) print("Fatal error:", e)
traceback.print_exc() traceback.print_exc()
sys.exit(1) sys.exit(1)
finally:
app_state.ui_shutdown = True
try:
if interface_state.interface is not None:
interface_state.interface.close()
except Exception:
logging.exception("Error while closing interface")
if __name__ == "__main__": if __name__ == "__main__":
+88 -81
View File
@@ -2,9 +2,21 @@ import logging
import os import os
import platform import platform
import shutil import shutil
import time
import subprocess import subprocess
import threading import threading
import time
from typing import Any, Dict
import contact.ui.default_config as config
from contact.utilities.db_handler import (
get_name_from_database,
maybe_store_nodeinfo_in_db,
save_message_to_db,
update_node_info_in_db,
)
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
from contact.utilities.utils import add_new_message, refresh_node_list
# Debounce notification sounds so a burst of queued messages only plays once. # Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8 _SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: threading.Timer | None = None _sound_timer: threading.Timer | None = None
@@ -13,18 +25,13 @@ _last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period. """Schedule a notification sound after a short quiet period."""
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
global _sound_timer, _last_sound_request global _sound_timer, _last_sound_request
now = time.monotonic() now = time.monotonic()
with _sound_timer_lock: with _sound_timer_lock:
_last_sound_request = now _last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None: if _sound_timer is not None:
try: try:
_sound_timer.cancel() _sound_timer.cancel()
@@ -33,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = None _sound_timer = None
def _fire(expected_request_time: float) -> None: def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock: with _sound_timer_lock:
if expected_request_time != _last_sound_request: if expected_request_time != _last_sound_request:
return return
@@ -42,44 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,)) _sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True _sound_timer.daemon = True
_sound_timer.start() _sound_timer.start()
from typing import Any, Dict
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound(): def play_sound() -> None:
try: try:
system = platform.system() system = platform.system()
sound_path = None sound_path = None
executable = None executable = None
if system == "Darwin": # macOS if system == "Darwin":
sound_path = "/System/Library/Sounds/Ping.aiff" sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay" executable = "afplay"
elif system == "Linux": elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga" ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
if shutil.which("paplay") and os.path.exists(ogg_path): if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay" executable = "paplay"
sound_path = ogg_path sound_path = ogg_path
@@ -96,94 +78,110 @@ def play_sound():
cmd = [executable, sound_path] cmd = [executable, sound_path]
if executable == "ffplay": if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path] cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return except subprocess.CalledProcessError as exc:
logging.error("Sound playback failed: %s", exc)
except subprocess.CalledProcessError as e: except Exception as exc:
logging.error(f"Sound playback failed: {e}") logging.error("Unexpected error while playing sound: %s", exc)
except Exception as e:
logging.error(f"Unexpected error: {e}")
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def _decode_message_payload(payload: Any) -> str:
""" if isinstance(payload, bytes):
Handles an incoming packet from a Meshtastic interface. return payload.decode("utf-8", errors="replace")
if isinstance(payload, str):
return payload
return str(payload)
def process_receive_event(packet: Dict[str, Any]) -> None:
"""Process a queued packet on the UI thread and perform all UI updates."""
# Local import prevents module-level circular import.
from contact.ui.contact_ui import (
add_notification,
draw_channel_list,
draw_messages_window,
draw_node_list,
draw_packetlog_win,
)
Args:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with app_state.lock:
# Update packet log # Update packet log
ui_state.packet_buffer.append(packet) ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20: if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:] ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log: if ui_state.display_log:
draw_packetlog_win() draw_packetlog_win()
if ui_state.current_window == 4: if ui_state.current_window == 4:
menu_state.need_redraw = True menu_state.need_redraw = True
try:
if "decoded" not in packet: decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return return
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list() changed = refresh_node_list()
if changed: if changed:
draw_node_list() draw_node_list()
if packet["decoded"]["portnum"] == "NODEINFO_APP": portnum = decoded.get("portnum")
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]: if portnum == "NODEINFO_APP":
user = decoded.get("user")
if isinstance(user, dict) and "longName" in user:
maybe_store_nodeinfo_in_db(packet) maybe_store_nodeinfo_in_db(packet)
return
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP": if portnum != "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0) return
hop_limit = packet.get('hopLimit', 0)
hop_start = packet.get("hopStart", 0)
hop_limit = packet.get("hopLimit", 0)
hops = hop_start - hop_limit hops = hop_start - hop_limit
if config.notification_sound == "True": if config.notification_sound == "True":
schedule_notification_sound() schedule_notification_sound()
message_bytes = packet["decoded"]["payload"] message_string = _decode_message_payload(decoded.get("payload"))
message_string = message_bytes.decode("utf-8")
if not ui_state.channel_list:
return
refresh_channels = False refresh_channels = False
refresh_messages = False refresh_messages = False
if packet.get("channel"): channel_number = packet.get("channel", 0)
channel_number = packet["channel"] if not isinstance(channel_number, int):
else: channel_number = 0
if channel_number < 0:
channel_number = 0 channel_number = 0
if packet["to"] == interface_state.myNodeNum: packet_from = packet.get("from")
if packet["from"] in ui_state.channel_list: if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
pass if packet_from not in ui_state.channel_list:
else: ui_state.channel_list.append(packet_from)
ui_state.channel_list.append(packet["from"]) if packet_from not in ui_state.all_messages:
if packet["from"] not in ui_state.all_messages: ui_state.all_messages[packet_from] = []
ui_state.all_messages[packet["from"]] = [] update_node_info_in_db(packet_from, chat_archived=False)
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True refresh_channels = True
channel_number = ui_state.channel_list.index(packet_from)
channel_number = ui_state.channel_list.index(packet["from"]) if channel_number >= len(ui_state.channel_list):
channel_number = 0
channel_id = ui_state.channel_list[channel_number] channel_id = ui_state.channel_list[channel_number]
if ui_state.selected_channel >= len(ui_state.channel_list):
ui_state.selected_channel = 0
if channel_id != ui_state.channel_list[ui_state.selected_channel]: if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number) add_notification(channel_number)
refresh_channels = True refresh_channels = True
else: else:
refresh_messages = True refresh_messages = True
# Add received message to the messages list if packet_from is None:
message_from_id = packet["from"] logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
message_from_string = get_name_from_database(message_from_id, type="short") + ":" return
message_from_string = get_name_from_database(packet_from, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string) add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels: if refresh_channels:
@@ -191,7 +189,16 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
if refresh_messages: if refresh_messages:
draw_messages_window(True) draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string) save_message_to_db(channel_id, packet_from, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}") def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""Enqueue packet to be processed on the main curses thread."""
if app_state.ui_shutdown:
return
if not isinstance(packet, dict):
return
try:
app_state.rx_queue.put(packet)
except Exception:
logging.exception("Failed to enqueue packet for UI processing")
+27 -1
View File
@@ -1,9 +1,11 @@
import curses import curses
import logging import logging
from queue import Empty
import time import time
import traceback import traceback
from typing import Union from typing import Union
from contact.message_handlers.rx_handler import process_receive_event
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute from contact.message_handlers.tx_handler import send_message, send_traceroute
@@ -15,7 +17,7 @@ from contact.utilities.i18n import t
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.ui.dialog import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
MIN_COL = 1 # "effectively zero" without breaking curses MIN_COL = 1 # "effectively zero" without breaking curses
@@ -161,6 +163,24 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass pass
def drain_receive_queue(max_events: int = 200) -> None:
processed = 0
while processed < max_events:
try:
packet = app_state.rx_queue.get(block=False)
except Empty:
return
except Exception:
logging.exception("Error while draining receive queue")
return
try:
process_receive_event(packet)
except Exception:
logging.exception("Error while processing receive event")
processed += 1
def main_ui(stdscr: curses.window) -> None: def main_ui(stdscr: curses.window) -> None:
"""Main UI loop for the curses interface.""" """Main UI loop for the curses interface."""
global input_text global input_text
@@ -171,12 +191,17 @@ def main_ui(stdscr: curses.window) -> None:
stdscr.keypad(True) stdscr.keypad(True)
get_channels() get_channels()
handle_resize(stdscr, True) handle_resize(stdscr, True)
entry_win.timeout(75)
while True: while True:
drain_receive_queue()
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window # Get user input from entry window
try:
char = entry_win.get_wch() char = entry_win.get_wch()
except curses.error:
continue
# draw_debug(f"Keypress: {char}") # draw_debug(f"Keypress: {char}")
@@ -225,6 +250,7 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE: elif char == curses.KEY_RESIZE:
input_text = "" input_text = ""
handle_resize(stdscr, False) handle_resize(stdscr, False)
entry_win.timeout(75)
elif char == chr(4): # Ctrl + D to delete current channel or node elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d() handle_ctrl_d()
+3
View File
@@ -1,3 +1,4 @@
from queue import SimpleQueue
from typing import Any, Union, List, Dict from typing import Any, Union, List, Dict
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -44,3 +45,5 @@ class InterfaceState:
@dataclass @dataclass
class AppState: class AppState:
lock: Any = None lock: Any = None
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
ui_shutdown: bool = False