Compare commits

..

1 Commits

Author SHA1 Message Date
pdxlocations
d2e559ed04 Implement receive queue and UI shutdown handling in app state 2026-02-20 21:47:19 -07:00
9 changed files with 189 additions and 242 deletions

View File

@@ -15,6 +15,7 @@ import curses
import io import io
import logging import logging
import os import os
import queue
import subprocess import subprocess
import sys import sys
import threading import threading
@@ -54,6 +55,8 @@ logging.basicConfig(
) )
app_state.lock = threading.Lock() app_state.lock = threading.Lock()
app_state.rx_queue = queue.SimpleQueue()
app_state.ui_shutdown = False
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@@ -149,11 +152,10 @@ def start() -> None:
sys.exit(0) sys.exit(0)
try: try:
app_state.ui_shutdown = False
curses.wrapper(main) curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("User exited with Ctrl+C") logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
logging.critical("Fatal error", exc_info=True) logging.critical("Fatal error", exc_info=True)
@@ -164,6 +166,13 @@ def start() -> None:
print("Fatal error:", e) print("Fatal error:", e)
traceback.print_exc() traceback.print_exc()
sys.exit(1) sys.exit(1)
finally:
app_state.ui_shutdown = True
try:
if interface_state.interface is not None:
interface_state.interface.close()
except Exception:
logging.exception("Error while closing interface")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -2,30 +2,36 @@ import logging
import os import os
import platform import platform
import shutil import shutil
import time
import subprocess import subprocess
import threading import threading
from typing import Any, Dict, Optional import time
# Debounce notification sounds so a burst of queued messages only plays once. from typing import Any, Dict
import contact.ui.default_config as config
from contact.utilities.db_handler import (
get_name_from_database,
maybe_store_nodeinfo_in_db,
save_message_to_db,
update_node_info_in_db,
)
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
from contact.utilities.utils import add_new_message, refresh_node_list
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8 _SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: Optional[threading.Timer] = None _sound_timer: threading.Timer | None = None
_sound_timer_lock = threading.Lock() _sound_timer_lock = threading.Lock()
_last_sound_request = 0.0 _last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period. """Schedule a notification sound after a short quiet period."""
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
global _sound_timer, _last_sound_request global _sound_timer, _last_sound_request
now = time.monotonic() now = time.monotonic()
with _sound_timer_lock: with _sound_timer_lock:
_last_sound_request = now _last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None: if _sound_timer is not None:
try: try:
_sound_timer.cancel() _sound_timer.cancel()
@@ -34,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = None _sound_timer = None
def _fire(expected_request_time: float) -> None: def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock: with _sound_timer_lock:
if expected_request_time != _last_sound_request: if expected_request_time != _last_sound_request:
return return
@@ -43,42 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,)) _sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True _sound_timer.daemon = True
_sound_timer.start() _sound_timer.start()
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound(): def play_sound() -> None:
try: try:
system = platform.system() system = platform.system()
sound_path = None sound_path = None
executable = None executable = None
if system == "Darwin": # macOS if system == "Darwin":
sound_path = "/System/Library/Sounds/Ping.aiff" sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay" executable = "afplay"
elif system == "Linux": elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga" ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
if shutil.which("paplay") and os.path.exists(ogg_path): if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay" executable = "paplay"
sound_path = ogg_path sound_path = ogg_path
@@ -95,102 +78,127 @@ def play_sound():
cmd = [executable, sound_path] cmd = [executable, sound_path]
if executable == "ffplay": if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path] cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return except subprocess.CalledProcessError as exc:
logging.error("Sound playback failed: %s", exc)
except Exception as exc:
logging.error("Unexpected error while playing sound: %s", exc)
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}") def _decode_message_payload(payload: Any) -> str:
except Exception as e: if isinstance(payload, bytes):
logging.error(f"Unexpected error: {e}") return payload.decode("utf-8", errors="replace")
if isinstance(payload, str):
return payload
return str(payload)
def process_receive_event(packet: Dict[str, Any]) -> None:
"""Process a queued packet on the UI thread and perform all UI updates."""
# Local import prevents module-level circular import.
from contact.ui.contact_ui import (
add_notification,
draw_channel_list,
draw_messages_window,
draw_node_list,
draw_packetlog_win,
)
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return
changed = refresh_node_list()
if changed:
draw_node_list()
portnum = decoded.get("portnum")
if portnum == "NODEINFO_APP":
user = decoded.get("user")
if isinstance(user, dict) and "longName" in user:
maybe_store_nodeinfo_in_db(packet)
return
if portnum != "TEXT_MESSAGE_APP":
return
hop_start = packet.get("hopStart", 0)
hop_limit = packet.get("hopLimit", 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_string = _decode_message_payload(decoded.get("payload"))
if not ui_state.channel_list:
return
refresh_channels = False
refresh_messages = False
channel_number = packet.get("channel", 0)
if not isinstance(channel_number, int):
channel_number = 0
if channel_number < 0:
channel_number = 0
packet_from = packet.get("from")
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
if packet_from not in ui_state.channel_list:
ui_state.channel_list.append(packet_from)
if packet_from not in ui_state.all_messages:
ui_state.all_messages[packet_from] = []
update_node_info_in_db(packet_from, chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet_from)
if channel_number >= len(ui_state.channel_list):
channel_number = 0
channel_id = ui_state.channel_list[channel_number]
if ui_state.selected_channel >= len(ui_state.channel_list):
ui_state.selected_channel = 0
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
if packet_from is None:
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
return
message_from_string = get_name_from_database(packet_from, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, packet_from, message_string)
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def on_receive(packet: Dict[str, Any], interface: Any) -> None:
""" """Enqueue packet to be processed on the main curses thread."""
Handles an incoming packet from a Meshtastic interface. if app_state.ui_shutdown:
return
Args: if not isinstance(packet, dict):
packet: The received Meshtastic packet as a dictionary. return
interface: The Meshtastic interface instance that received the packet. try:
""" app_state.rx_queue.put(packet)
with app_state.lock: except Exception:
# Update packet log logging.exception("Failed to enqueue packet for UI processing")
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
try:
if "decoded" not in packet:
return
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list()
if changed:
draw_node_list()
if packet["decoded"]["portnum"] == "NODEINFO_APP":
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0)
hop_limit = packet.get('hopLimit', 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
refresh_channels = False
refresh_messages = False
if packet.get("channel"):
channel_number = packet["channel"]
else:
channel_number = 0
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
# Add received message to the messages list
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,9 +1,11 @@
import curses import curses
import logging import logging
from queue import Empty
import time import time
import traceback import traceback
from typing import Union from typing import Union
from contact.message_handlers.rx_handler import process_receive_event
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute from contact.message_handlers.tx_handler import send_message, send_traceroute
@@ -12,15 +14,13 @@ from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t from contact.utilities.i18n import t
from contact.utilities.emoji_utils import normalize_message_text
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.ui.dialog import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
MIN_COL = 1 # "effectively zero" without breaking curses MIN_COL = 1 # "effectively zero" without breaking curses
RESIZE_DEBOUNCE_MS = 250
root_win = None root_win = None
@@ -163,22 +163,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass pass
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]: def drain_receive_queue(max_events: int = 200) -> None:
"""Wait for resize events to settle and preserve one queued non-resize key.""" processed = 0
input_win.timeout(RESIZE_DEBOUNCE_MS) while processed < max_events:
try: try:
while True: packet = app_state.rx_queue.get(block=False)
try: except Empty:
next_char = input_win.get_wch() return
except curses.error: except Exception:
return None logging.exception("Error while draining receive queue")
return
if next_char == curses.KEY_RESIZE: try:
continue process_receive_event(packet)
except Exception:
return next_char logging.exception("Error while processing receive event")
finally: processed += 1
input_win.timeout(-1)
def main_ui(stdscr: curses.window) -> None: def main_ui(stdscr: curses.window) -> None:
@@ -188,20 +188,20 @@ def main_ui(stdscr: curses.window) -> None:
root_win = stdscr root_win = stdscr
input_text = "" input_text = ""
queued_char = None
stdscr.keypad(True) stdscr.keypad(True)
get_channels() get_channels()
handle_resize(stdscr, True) handle_resize(stdscr, True)
entry_win.timeout(75)
while True: while True:
drain_receive_queue()
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window # Get user input from entry window
if queued_char is None: try:
char = entry_win.get_wch() char = entry_win.get_wch()
else: except curses.error:
char = queued_char continue
queued_char = None
# draw_debug(f"Keypress: {char}") # draw_debug(f"Keypress: {char}")
@@ -249,9 +249,8 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE: elif char == curses.KEY_RESIZE:
input_text = "" input_text = ""
queued_char = drain_resize_events(entry_win)
handle_resize(stdscr, False) handle_resize(stdscr, False)
continue entry_win.timeout(75)
elif char == chr(4): # Ctrl + D to delete current channel or node elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d() handle_ctrl_d()
@@ -866,7 +865,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0 row = 0
for prefix, message in messages: for prefix, message in messages:
full_message = normalize_message_text(f"{prefix}{message}") full_message = f"{prefix}{message}"
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2) wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines) msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1]) messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])

View File

@@ -56,13 +56,6 @@ config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations # Load translations
field_mapping, help_text = parse_ini_file(translation_file) field_mapping, help_text = parse_ini_file(translation_file)
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def reload_translations() -> None: def reload_translations() -> None:
global translation_file, field_mapping, help_text global translation_file, field_mapping, help_text
@@ -571,7 +564,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
new_value = new_value == "True" or new_value is True new_value = new_value == "True" or new_value is True
menu_state.start_index.pop() menu_state.start_index.pop()
elif _is_repeated_field(field): # Handle repeated field - Not currently used elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value) new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ") new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop() menu_state.start_index.pop()

View File

@@ -1,3 +1,4 @@
from queue import SimpleQueue
from typing import Any, Union, List, Dict from typing import Any, Union, List, Dict
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -44,3 +45,5 @@ class InterfaceState:
@dataclass @dataclass
class AppState: class AppState:
lock: Any = None lock: Any = None
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
ui_shutdown: bool = False

View File

@@ -9,17 +9,6 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main # defs are from meshtastic/python/main
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
checking `label == LABEL_REPEATED`.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def traverseConfig(config_root, config, interface_config) -> bool: def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference""" """Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root) snake_name = camel_to_snake(config_root)
@@ -100,7 +89,7 @@ def setPref(config, comp_name, raw_val) -> bool:
return False return False
# repeating fields need to be handled with append, not setattr # repeating fields need to be handled with append, not setattr
if not _is_repeated_field(pref): if pref.label != pref.LABEL_REPEATED:
try: try:
if config_type.message_type is not None: if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name) config_values = getattr(config_part, config_type.name)

View File

@@ -1,54 +0,0 @@
"""Helpers for normalizing emoji sequences in width-sensitive message rendering."""
# Strip zero-width and presentation modifiers that make terminal cell width inconsistent.
EMOJI_MODIFIER_REPLACEMENTS = {
"\u200d": "",
"\u20e3": "",
"\ufe0e": "",
"\ufe0f": "",
"\U0001F3FB": "",
"\U0001F3FC": "",
"\U0001F3FD": "",
"\U0001F3FE": "",
"\U0001F3FF": "",
}
_EMOJI_MODIFIER_TRANSLATION = str.maketrans(EMOJI_MODIFIER_REPLACEMENTS)
_REGIONAL_INDICATOR_START = ord("\U0001F1E6")
_REGIONAL_INDICATOR_END = ord("\U0001F1FF")
def _regional_indicator_to_letter(char: str) -> str:
return chr(ord("A") + ord(char) - _REGIONAL_INDICATOR_START)
def _normalize_flag_emoji(text: str) -> str:
"""Convert flag emoji built from regional indicators into ASCII country codes."""
normalized = []
index = 0
while index < len(text):
current = text[index]
current_ord = ord(current)
if _REGIONAL_INDICATOR_START <= current_ord <= _REGIONAL_INDICATOR_END and index + 1 < len(text):
next_char = text[index + 1]
next_ord = ord(next_char)
if _REGIONAL_INDICATOR_START <= next_ord <= _REGIONAL_INDICATOR_END:
normalized.append(_regional_indicator_to_letter(current))
normalized.append(_regional_indicator_to_letter(next_char))
index += 2
continue
normalized.append(current)
index += 1
return "".join(normalized)
def normalize_message_text(text: str) -> str:
"""Strip modifiers and rewrite flag emoji into stable terminal-friendly text."""
if not text:
return text
return _normalize_flag_emoji(text.translate(_EMOJI_MODIFIER_TRANSLATION))

View File

@@ -68,19 +68,19 @@ def get_chunks(data):
# Leave it string as last resort # Leave it string as last resort
value = value value = value
# Python 3.9-compatible alternative to match/case. match key:
if key == "uptime_seconds":
# convert seconds to hours, for our sanity # convert seconds to hours, for our sanity
value = round(value / 60 / 60, 1) case "uptime_seconds":
elif key in ("longitude_i", "latitude_i"): value = round(value / 60 / 60, 1)
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry # Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate # truncate to 6th digit after floating point, which would be still accurate
value = round(value * 1e-7, 6) case "longitude_i" | "latitude_i":
elif key == "wind_direction": value = round(value * 1e-7, 6)
# Convert wind direction from degrees to abbreviation # Convert wind direction from degrees to abbreviation
value = humanize_wind_direction(value) case "wind_direction":
elif key == "time": value = humanize_wind_direction(value)
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m") case "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors: if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} " parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "contact" name = "contact"
version = "1.4.23" version = "1.4.14"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores." description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [ authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"} {name = "Ben Lipsey",email = "ben@pdxlocations.com"}