Compare commits

..

7 Commits

Author SHA1 Message Date
pdxlocations
c7b54caf45 Add resize event handling and update version to 1.4.18 2026-03-12 15:30:03 -07:00
pdxlocations
773f43edd8 Merge pull request #248 from pdxlocations:fix-protubuf-depend
Fix-protubuf-depend
2026-03-02 16:31:55 -08:00
pdxlocations
6af1c46bd3 bump version to 1.4.17 2026-03-02 16:31:33 -08:00
pdxlocations
7e3e44df24 Refactor repeated field handling in protobuf utilities 2026-03-02 16:31:09 -08:00
pdxlocations
45626f5e83 bump version 2026-02-28 10:32:43 -08:00
pdxlocations
e9181972b2 bump version 2026-02-28 10:32:16 -08:00
pdxlocations
795ab84ef5 Fix 3.9 compatibility 2026-02-28 10:31:48 -08:00
8 changed files with 186 additions and 188 deletions

View File

@@ -15,7 +15,6 @@ import curses
import io import io
import logging import logging
import os import os
import queue
import subprocess import subprocess
import sys import sys
import threading import threading
@@ -55,8 +54,6 @@ logging.basicConfig(
) )
app_state.lock = threading.Lock() app_state.lock = threading.Lock()
app_state.rx_queue = queue.SimpleQueue()
app_state.ui_shutdown = False
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@@ -152,10 +149,11 @@ def start() -> None:
sys.exit(0) sys.exit(0)
try: try:
app_state.ui_shutdown = False
curses.wrapper(main) curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt: except KeyboardInterrupt:
logging.info("User exited with Ctrl+C") logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0) sys.exit(0)
except Exception as e: except Exception as e:
logging.critical("Fatal error", exc_info=True) logging.critical("Fatal error", exc_info=True)
@@ -166,13 +164,6 @@ def start() -> None:
print("Fatal error:", e) print("Fatal error:", e)
traceback.print_exc() traceback.print_exc()
sys.exit(1) sys.exit(1)
finally:
app_state.ui_shutdown = True
try:
if interface_state.interface is not None:
interface_state.interface.close()
except Exception:
logging.exception("Error while closing interface")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -2,36 +2,30 @@ import logging
import os import os
import platform import platform
import shutil import shutil
import time
import subprocess import subprocess
import threading import threading
import time from typing import Any, Dict, Optional
from typing import Any, Dict # Debounce notification sounds so a burst of queued messages only plays once.
import contact.ui.default_config as config
from contact.utilities.db_handler import (
get_name_from_database,
maybe_store_nodeinfo_in_db,
save_message_to_db,
update_node_info_in_db,
)
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
from contact.utilities.utils import add_new_message, refresh_node_list
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8 _SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: threading.Timer | None = None _sound_timer: Optional[threading.Timer] = None
_sound_timer_lock = threading.Lock() _sound_timer_lock = threading.Lock()
_last_sound_request = 0.0 _last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None: def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period.""" """Schedule a notification sound after a short quiet period.
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
global _sound_timer, _last_sound_request global _sound_timer, _last_sound_request
now = time.monotonic() now = time.monotonic()
with _sound_timer_lock: with _sound_timer_lock:
_last_sound_request = now _last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None: if _sound_timer is not None:
try: try:
_sound_timer.cancel() _sound_timer.cancel()
@@ -40,6 +34,7 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = None _sound_timer = None
def _fire(expected_request_time: float) -> None: def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock: with _sound_timer_lock:
if expected_request_time != _last_sound_request: if expected_request_time != _last_sound_request:
return return
@@ -48,20 +43,42 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,)) _sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True _sound_timer.daemon = True
_sound_timer.start() _sound_timer.start()
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound() -> None: def play_sound():
try: try:
system = platform.system() system = platform.system()
sound_path = None sound_path = None
executable = None executable = None
if system == "Darwin": if system == "Darwin": # macOS
sound_path = "/System/Library/Sounds/Ping.aiff" sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay" executable = "afplay"
elif system == "Linux": elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga" ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
if shutil.which("paplay") and os.path.exists(ogg_path): if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay" executable = "paplay"
sound_path = ogg_path sound_path = ogg_path
@@ -78,127 +95,102 @@ def play_sound() -> None:
cmd = [executable, sound_path] cmd = [executable, sound_path]
if executable == "ffplay": if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path] cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as exc: return
logging.error("Sound playback failed: %s", exc)
except Exception as exc:
logging.error("Unexpected error while playing sound: %s", exc)
except subprocess.CalledProcessError as e:
def _decode_message_payload(payload: Any) -> str: logging.error(f"Sound playback failed: {e}")
if isinstance(payload, bytes): except Exception as e:
return payload.decode("utf-8", errors="replace") logging.error(f"Unexpected error: {e}")
if isinstance(payload, str):
return payload
return str(payload)
def process_receive_event(packet: Dict[str, Any]) -> None:
"""Process a queued packet on the UI thread and perform all UI updates."""
# Local import prevents module-level circular import.
from contact.ui.contact_ui import (
add_notification,
draw_channel_list,
draw_messages_window,
draw_node_list,
draw_packetlog_win,
)
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return
changed = refresh_node_list()
if changed:
draw_node_list()
portnum = decoded.get("portnum")
if portnum == "NODEINFO_APP":
user = decoded.get("user")
if isinstance(user, dict) and "longName" in user:
maybe_store_nodeinfo_in_db(packet)
return
if portnum != "TEXT_MESSAGE_APP":
return
hop_start = packet.get("hopStart", 0)
hop_limit = packet.get("hopLimit", 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_string = _decode_message_payload(decoded.get("payload"))
if not ui_state.channel_list:
return
refresh_channels = False
refresh_messages = False
channel_number = packet.get("channel", 0)
if not isinstance(channel_number, int):
channel_number = 0
if channel_number < 0:
channel_number = 0
packet_from = packet.get("from")
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
if packet_from not in ui_state.channel_list:
ui_state.channel_list.append(packet_from)
if packet_from not in ui_state.all_messages:
ui_state.all_messages[packet_from] = []
update_node_info_in_db(packet_from, chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet_from)
if channel_number >= len(ui_state.channel_list):
channel_number = 0
channel_id = ui_state.channel_list[channel_number]
if ui_state.selected_channel >= len(ui_state.channel_list):
ui_state.selected_channel = 0
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
if packet_from is None:
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
return
message_from_string = get_name_from_database(packet_from, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, packet_from, message_string)
def on_receive(packet: Dict[str, Any], interface: Any) -> None: def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""Enqueue packet to be processed on the main curses thread.""" """
if app_state.ui_shutdown: Handles an incoming packet from a Meshtastic interface.
return
if not isinstance(packet, dict): Args:
return packet: The received Meshtastic packet as a dictionary.
try: interface: The Meshtastic interface instance that received the packet.
app_state.rx_queue.put(packet) """
except Exception: with app_state.lock:
logging.exception("Failed to enqueue packet for UI processing") # Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
try:
if "decoded" not in packet:
return
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list()
if changed:
draw_node_list()
if packet["decoded"]["portnum"] == "NODEINFO_APP":
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0)
hop_limit = packet.get('hopLimit', 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
refresh_channels = False
refresh_messages = False
if packet.get("channel"):
channel_number = packet["channel"]
else:
channel_number = 0
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
# Add received message to the messages list
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")

View File

@@ -1,11 +1,9 @@
import curses import curses
import logging import logging
from queue import Empty
import time import time
import traceback import traceback
from typing import Union from typing import Union
from contact.message_handlers.rx_handler import process_receive_event
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute from contact.message_handlers.tx_handler import send_message, send_traceroute
@@ -17,10 +15,11 @@ from contact.utilities.i18n import t
import contact.ui.default_config as config import contact.ui.default_config as config
import contact.ui.dialog import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state from contact.utilities.singleton import ui_state, interface_state, menu_state
MIN_COL = 1 # "effectively zero" without breaking curses MIN_COL = 1 # "effectively zero" without breaking curses
RESIZE_DEBOUNCE_MS = 250
root_win = None root_win = None
@@ -163,22 +162,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass pass
def drain_receive_queue(max_events: int = 200) -> None: def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
processed = 0 """Wait for resize events to settle and preserve one queued non-resize key."""
while processed < max_events: input_win.timeout(RESIZE_DEBOUNCE_MS)
try: try:
packet = app_state.rx_queue.get(block=False) while True:
except Empty: try:
return next_char = input_win.get_wch()
except Exception: except curses.error:
logging.exception("Error while draining receive queue") return None
return
try: if next_char == curses.KEY_RESIZE:
process_receive_event(packet) continue
except Exception:
logging.exception("Error while processing receive event") return next_char
processed += 1 finally:
input_win.timeout(-1)
def main_ui(stdscr: curses.window) -> None: def main_ui(stdscr: curses.window) -> None:
@@ -188,20 +187,20 @@ def main_ui(stdscr: curses.window) -> None:
root_win = stdscr root_win = stdscr
input_text = "" input_text = ""
queued_char = None
stdscr.keypad(True) stdscr.keypad(True)
get_channels() get_channels()
handle_resize(stdscr, True) handle_resize(stdscr, True)
entry_win.timeout(75)
while True: while True:
drain_receive_queue()
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input")) draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window # Get user input from entry window
try: if queued_char is None:
char = entry_win.get_wch() char = entry_win.get_wch()
except curses.error: else:
continue char = queued_char
queued_char = None
# draw_debug(f"Keypress: {char}") # draw_debug(f"Keypress: {char}")
@@ -249,8 +248,9 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE: elif char == curses.KEY_RESIZE:
input_text = "" input_text = ""
queued_char = drain_resize_events(entry_win)
handle_resize(stdscr, False) handle_resize(stdscr, False)
entry_win.timeout(75) continue
elif char == chr(4): # Ctrl + D to delete current channel or node elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d() handle_ctrl_d()

View File

@@ -56,6 +56,13 @@ config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations # Load translations
field_mapping, help_text = parse_ini_file(translation_file) field_mapping, help_text = parse_ini_file(translation_file)
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def reload_translations() -> None: def reload_translations() -> None:
global translation_file, field_mapping, help_text global translation_file, field_mapping, help_text
@@ -564,7 +571,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
new_value = new_value == "True" or new_value is True new_value = new_value == "True" or new_value is True
menu_state.start_index.pop() menu_state.start_index.pop()
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used elif _is_repeated_field(field): # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value) new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ") new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop() menu_state.start_index.pop()

View File

@@ -1,4 +1,3 @@
from queue import SimpleQueue
from typing import Any, Union, List, Dict from typing import Any, Union, List, Dict
from dataclasses import dataclass, field from dataclasses import dataclass, field
@@ -45,5 +44,3 @@ class InterfaceState:
@dataclass @dataclass
class AppState: class AppState:
lock: Any = None lock: Any = None
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
ui_shutdown: bool = False

View File

@@ -9,6 +9,17 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main # defs are from meshtastic/python/main
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
checking `label == LABEL_REPEATED`.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def traverseConfig(config_root, config, interface_config) -> bool: def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference""" """Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root) snake_name = camel_to_snake(config_root)
@@ -89,7 +100,7 @@ def setPref(config, comp_name, raw_val) -> bool:
return False return False
# repeating fields need to be handled with append, not setattr # repeating fields need to be handled with append, not setattr
if pref.label != pref.LABEL_REPEATED: if not _is_repeated_field(pref):
try: try:
if config_type.message_type is not None: if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name) config_values = getattr(config_part, config_type.name)

View File

@@ -68,19 +68,19 @@ def get_chunks(data):
# Leave it string as last resort # Leave it string as last resort
value = value value = value
match key: # Python 3.9-compatible alternative to match/case.
if key == "uptime_seconds":
# convert seconds to hours, for our sanity # convert seconds to hours, for our sanity
case "uptime_seconds": value = round(value / 60 / 60, 1)
value = round(value / 60 / 60, 1) elif key in ("longitude_i", "latitude_i"):
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry # Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate # truncate to 6th digit after floating point, which would be still accurate
case "longitude_i" | "latitude_i": value = round(value * 1e-7, 6)
value = round(value * 1e-7, 6) elif key == "wind_direction":
# Convert wind direction from degrees to abbreviation # Convert wind direction from degrees to abbreviation
case "wind_direction": value = humanize_wind_direction(value)
value = humanize_wind_direction(value) elif key == "time":
case "time": value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors: if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} " parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "contact" name = "contact"
version = "1.4.14" version = "1.4.18"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores." description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [ authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"} {name = "Ben Lipsey",email = "ben@pdxlocations.com"}