Compare commits

..

1 Commits

Author SHA1 Message Date
pdxlocations
d2e559ed04 Implement receive queue and UI shutdown handling in app state 2026-02-20 21:47:19 -07:00
12 changed files with 281 additions and 464 deletions

View File

@@ -15,6 +15,7 @@ import curses
import io
import logging
import os
import queue
import subprocess
import sys
import threading
@@ -54,6 +55,8 @@ logging.basicConfig(
)
app_state.lock = threading.Lock()
app_state.rx_queue = queue.SimpleQueue()
app_state.ui_shutdown = False
# ------------------------------------------------------------------------------
@@ -149,11 +152,10 @@ def start() -> None:
sys.exit(0)
try:
app_state.ui_shutdown = False
curses.wrapper(main)
interface_state.interface.close()
except KeyboardInterrupt:
logging.info("User exited with Ctrl+C")
interface_state.interface.close()
sys.exit(0)
except Exception as e:
logging.critical("Fatal error", exc_info=True)
@@ -164,6 +166,13 @@ def start() -> None:
print("Fatal error:", e)
traceback.print_exc()
sys.exit(1)
finally:
app_state.ui_shutdown = True
try:
if interface_state.interface is not None:
interface_state.interface.close()
except Exception:
logging.exception("Error while closing interface")
if __name__ == "__main__":

View File

@@ -77,7 +77,7 @@ help.node_info, "F5 = Full node info", ""
help.archive_chat, "Ctrl+D = Archive chat / remove node", ""
help.favorite, "Ctrl+F = Favorite", ""
help.ignore, "Ctrl+G = Ignore", ""
help.search, "Ctrl+/ or / = Search", ""
help.search, "Ctrl+/ = Search", ""
help.help, "Ctrl+K = Help", ""
help.no_help, "No help available.", ""
confirm.remove_from_nodedb, "Remove {name} from nodedb?", ""

View File

@@ -77,7 +77,7 @@ help.node_info, "F5 = Полная информация об узле", ""
help.archive_chat, "Ctrl+D = Архив чата / удалить узел", ""
help.favorite, "Ctrl+F = Избранное", ""
help.ignore, "Ctrl+G = Игнорировать", ""
help.search, "Ctrl+/ или / = Поиск", ""
help.search, "Ctrl+/ = Поиск", ""
help.help, "Ctrl+K = Справка", ""
help.no_help, "Нет справки.", ""
confirm.remove_from_nodedb, "Удалить {name} из базы узлов?", ""

View File

@@ -2,30 +2,36 @@ import logging
import os
import platform
import shutil
import time
import subprocess
import threading
from typing import Any, Dict, Optional
# Debounce notification sounds so a burst of queued messages only plays once.
import time
from typing import Any, Dict
import contact.ui.default_config as config
from contact.utilities.db_handler import (
get_name_from_database,
maybe_store_nodeinfo_in_db,
save_message_to_db,
update_node_info_in_db,
)
from contact.utilities.singleton import app_state, interface_state, menu_state, ui_state
from contact.utilities.utils import add_new_message, refresh_node_list
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: Optional[threading.Timer] = None
_sound_timer: threading.Timer | None = None
_sound_timer_lock = threading.Lock()
_last_sound_request = 0.0
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
"""Schedule a notification sound after a short quiet period.
If more messages arrive before the delay elapses, the timer is reset.
This prevents playing a sound for each message when a backlog flushes.
"""
"""Schedule a notification sound after a short quiet period."""
global _sound_timer, _last_sound_request
now = time.monotonic()
with _sound_timer_lock:
_last_sound_request = now
# Cancel any previously scheduled sound.
if _sound_timer is not None:
try:
_sound_timer.cancel()
@@ -34,7 +40,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = None
def _fire(expected_request_time: float) -> None:
# Only play if nothing newer has been scheduled.
with _sound_timer_lock:
if expected_request_time != _last_sound_request:
return
@@ -43,42 +48,20 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True
_sound_timer.start()
from contact.utilities.utils import (
refresh_node_list,
add_new_message,
)
from contact.ui.contact_ui import (
draw_packetlog_win,
draw_node_list,
draw_messages_window,
draw_channel_list,
add_notification,
)
from contact.utilities.db_handler import (
save_message_to_db,
maybe_store_nodeinfo_in_db,
get_name_from_database,
update_node_info_in_db,
)
import contact.ui.default_config as config
from contact.utilities.singleton import ui_state, interface_state, app_state, menu_state
def play_sound():
def play_sound() -> None:
try:
system = platform.system()
sound_path = None
executable = None
if system == "Darwin": # macOS
if system == "Darwin":
sound_path = "/System/Library/Sounds/Ping.aiff"
executable = "afplay"
elif system == "Linux":
ogg_path = "/usr/share/sounds/freedesktop/stereo/complete.oga"
wav_path = "/usr/share/sounds/alsa/Front_Center.wav" # common fallback
wav_path = "/usr/share/sounds/alsa/Front_Center.wav"
if shutil.which("paplay") and os.path.exists(ogg_path):
executable = "paplay"
sound_path = ogg_path
@@ -95,102 +78,127 @@ def play_sound():
cmd = [executable, sound_path]
if executable == "ffplay":
cmd = [executable, "-nodisp", "-autoexit", sound_path]
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
except subprocess.CalledProcessError as exc:
logging.error("Sound playback failed: %s", exc)
except Exception as exc:
logging.error("Unexpected error while playing sound: %s", exc)
except subprocess.CalledProcessError as e:
logging.error(f"Sound playback failed: {e}")
except Exception as e:
logging.error(f"Unexpected error: {e}")
def _decode_message_payload(payload: Any) -> str:
if isinstance(payload, bytes):
return payload.decode("utf-8", errors="replace")
if isinstance(payload, str):
return payload
return str(payload)
def process_receive_event(packet: Dict[str, Any]) -> None:
"""Process a queued packet on the UI thread and perform all UI updates."""
# Local import prevents module-level circular import.
from contact.ui.contact_ui import (
add_notification,
draw_channel_list,
draw_messages_window,
draw_node_list,
draw_packetlog_win,
)
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
decoded = packet.get("decoded")
if not isinstance(decoded, dict):
return
changed = refresh_node_list()
if changed:
draw_node_list()
portnum = decoded.get("portnum")
if portnum == "NODEINFO_APP":
user = decoded.get("user")
if isinstance(user, dict) and "longName" in user:
maybe_store_nodeinfo_in_db(packet)
return
if portnum != "TEXT_MESSAGE_APP":
return
hop_start = packet.get("hopStart", 0)
hop_limit = packet.get("hopLimit", 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_string = _decode_message_payload(decoded.get("payload"))
if not ui_state.channel_list:
return
refresh_channels = False
refresh_messages = False
channel_number = packet.get("channel", 0)
if not isinstance(channel_number, int):
channel_number = 0
if channel_number < 0:
channel_number = 0
packet_from = packet.get("from")
if packet.get("to") == interface_state.myNodeNum and packet_from is not None:
if packet_from not in ui_state.channel_list:
ui_state.channel_list.append(packet_from)
if packet_from not in ui_state.all_messages:
ui_state.all_messages[packet_from] = []
update_node_info_in_db(packet_from, chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet_from)
if channel_number >= len(ui_state.channel_list):
channel_number = 0
channel_id = ui_state.channel_list[channel_number]
if ui_state.selected_channel >= len(ui_state.channel_list):
ui_state.selected_channel = 0
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
if packet_from is None:
logging.debug("Skipping TEXT_MESSAGE_APP packet with missing 'from' field")
return
message_from_string = get_name_from_database(packet_from, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, packet_from, message_string)
def on_receive(packet: Dict[str, Any], interface: Any) -> None:
"""
Handles an incoming packet from a Meshtastic interface.
Args:
packet: The received Meshtastic packet as a dictionary.
interface: The Meshtastic interface instance that received the packet.
"""
with app_state.lock:
# Update packet log
ui_state.packet_buffer.append(packet)
if len(ui_state.packet_buffer) > 20:
# Trim buffer to 20 packets
ui_state.packet_buffer = ui_state.packet_buffer[-20:]
if ui_state.display_log:
draw_packetlog_win()
if ui_state.current_window == 4:
menu_state.need_redraw = True
try:
if "decoded" not in packet:
return
# Assume any incoming packet could update the last seen time for a node
changed = refresh_node_list()
if changed:
draw_node_list()
if packet["decoded"]["portnum"] == "NODEINFO_APP":
if "user" in packet["decoded"] and "longName" in packet["decoded"]["user"]:
maybe_store_nodeinfo_in_db(packet)
elif packet["decoded"]["portnum"] == "TEXT_MESSAGE_APP":
hop_start = packet.get('hopStart', 0)
hop_limit = packet.get('hopLimit', 0)
hops = hop_start - hop_limit
if config.notification_sound == "True":
schedule_notification_sound()
message_bytes = packet["decoded"]["payload"]
message_string = message_bytes.decode("utf-8")
refresh_channels = False
refresh_messages = False
if packet.get("channel"):
channel_number = packet["channel"]
else:
channel_number = 0
if packet["to"] == interface_state.myNodeNum:
if packet["from"] in ui_state.channel_list:
pass
else:
ui_state.channel_list.append(packet["from"])
if packet["from"] not in ui_state.all_messages:
ui_state.all_messages[packet["from"]] = []
update_node_info_in_db(packet["from"], chat_archived=False)
refresh_channels = True
channel_number = ui_state.channel_list.index(packet["from"])
channel_id = ui_state.channel_list[channel_number]
if channel_id != ui_state.channel_list[ui_state.selected_channel]:
add_notification(channel_number)
refresh_channels = True
else:
refresh_messages = True
# Add received message to the messages list
message_from_id = packet["from"]
message_from_string = get_name_from_database(message_from_id, type="short") + ":"
add_new_message(channel_id, f"{config.message_prefix} [{hops}] {message_from_string} ", message_string)
if refresh_channels:
draw_channel_list()
if refresh_messages:
draw_messages_window(True)
save_message_to_db(channel_id, message_from_id, message_string)
except KeyError as e:
logging.error(f"Error processing packet: {e}")
"""Enqueue packet to be processed on the main curses thread."""
if app_state.ui_shutdown:
return
if not isinstance(packet, dict):
return
try:
app_state.rx_queue.put(packet)
except Exception:
logging.exception("Failed to enqueue packet for UI processing")

View File

@@ -1,9 +1,11 @@
import curses
import logging
from queue import Empty
import time
import traceback
from typing import Union
from contact.message_handlers.rx_handler import process_receive_event
from contact.utilities.utils import get_channels, get_readable_duration, get_time_ago, refresh_node_list
from contact.settings import settings_menu
from contact.message_handlers.tx_handler import send_message, send_traceroute
@@ -12,17 +14,14 @@ from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t
from contact.utilities.emoji_utils import normalize_message_text
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state
from contact.utilities.singleton import ui_state, interface_state, menu_state, app_state
MIN_COL = 1 # "effectively zero" without breaking curses
RESIZE_DEBOUNCE_MS = 250
root_win = None
nodes_pad = None
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
@@ -65,72 +64,6 @@ def paint_frame(win, selected: bool) -> None:
win.refresh()
def get_channel_row_color(index: int) -> int:
if index == ui_state.selected_channel:
if ui_state.current_window == 0:
return get_color("channel_list", reverse=True)
return get_color("channel_selected")
return get_color("channel_list")
def get_node_row_color(index: int, highlight: bool = False) -> int:
node_num = ui_state.node_list[index]
node = interface_state.interface.nodesByNum.get(node_num, {})
color = "node_list"
if node.get("isFavorite"):
color = "node_favorite"
if node.get("isIgnored"):
color = "node_ignored"
reverse = index == ui_state.selected_node and (ui_state.current_window == 2 or highlight)
return get_color(color, reverse=reverse)
def refresh_node_selection(old_index: int = -1, highlight: bool = False) -> None:
if nodes_pad is None or not ui_state.node_list:
return
width = max(0, nodes_pad.getmaxyx()[1] - 4)
if 0 <= old_index < len(ui_state.node_list):
try:
nodes_pad.chgat(old_index, 1, width, get_node_row_color(old_index, highlight=highlight))
except curses.error:
pass
if 0 <= ui_state.selected_node < len(ui_state.node_list):
try:
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node, highlight=highlight))
except curses.error:
pass
ui_state.start_index[2] = max(0, ui_state.selected_node - (nodes_win.getmaxyx()[0] - 3))
refresh_pad(2)
draw_window_arrows(2)
def refresh_main_window(window_id: int, selected: bool) -> None:
if window_id == 0:
paint_frame(channel_win, selected=selected)
if ui_state.channel_list:
width = max(0, channel_pad.getmaxyx()[1] - 4)
channel_pad.chgat(ui_state.selected_channel, 1, width, get_channel_row_color(ui_state.selected_channel))
refresh_pad(0)
elif window_id == 1:
paint_frame(messages_win, selected=selected)
refresh_pad(1)
elif window_id == 2:
paint_frame(nodes_win, selected=selected)
if ui_state.node_list and nodes_pad is not None:
width = max(0, nodes_pad.getmaxyx()[1] - 4)
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node))
refresh_pad(2)
def get_node_display_name(node_num: int, node: dict) -> str:
user = node.get("user") or {}
return user.get("longName") or get_name_from_database(node_num, "long")
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
@@ -230,22 +163,22 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
"""Wait for resize events to settle and preserve one queued non-resize key."""
input_win.timeout(RESIZE_DEBOUNCE_MS)
try:
while True:
try:
next_char = input_win.get_wch()
except curses.error:
return None
def drain_receive_queue(max_events: int = 200) -> None:
processed = 0
while processed < max_events:
try:
packet = app_state.rx_queue.get(block=False)
except Empty:
return
except Exception:
logging.exception("Error while draining receive queue")
return
if next_char == curses.KEY_RESIZE:
continue
return next_char
finally:
input_win.timeout(-1)
try:
process_receive_event(packet)
except Exception:
logging.exception("Error while processing receive event")
processed += 1
def main_ui(stdscr: curses.window) -> None:
@@ -255,20 +188,20 @@ def main_ui(stdscr: curses.window) -> None:
root_win = stdscr
input_text = ""
queued_char = None
stdscr.keypad(True)
get_channels()
handle_resize(stdscr, True)
entry_win.timeout(75)
while True:
drain_receive_queue()
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
if queued_char is None:
try:
char = entry_win.get_wch()
else:
char = queued_char
queued_char = None
except curses.error:
continue
# draw_debug(f"Keypress: {char}")
@@ -316,16 +249,13 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE:
input_text = ""
queued_char = drain_resize_events(entry_win)
handle_resize(stdscr, False)
continue
entry_win.timeout(75)
elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d()
elif char == chr(31) or (
char == "/" and not input_text and ui_state.current_window in (0, 2)
): # Ctrl + / or / to search in channel/node lists
elif char == chr(31): # Ctrl + / to search
handle_ctrl_fslash()
elif char == chr(11): # Ctrl + K for Help
@@ -429,16 +359,31 @@ def handle_leftright(char: int) -> None:
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
if ui_state.single_pane_mode:
handle_resize(root_win, False)
return
handle_resize(root_win, False)
refresh_main_window(old_window, selected=False)
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
if old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
refresh_main_window(ui_state.current_window, selected=True)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window)
@@ -459,16 +404,31 @@ def handle_function_keys(char: int) -> None:
return
ui_state.current_window = target
if ui_state.single_pane_mode:
handle_resize(root_win, False)
return
handle_resize(root_win, False)
refresh_main_window(old_window, selected=False)
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
elif old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
refresh_main_window(ui_state.current_window, selected=True)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
draw_window_arrows(ui_state.current_window)
@@ -520,34 +480,34 @@ def handle_enter(input_text: str) -> str:
def handle_f5_key(stdscr: curses.window) -> None:
if not ui_state.node_list:
return
def build_node_details() -> tuple[str, list[str]]:
node = None
try:
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
message_parts = []
message_parts.append("**📋 Basic Information:**")
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
message_parts.append(f"• Role: {node.get('user', {}).get('role', 'Unknown')}")
message_parts.append(f"Public key: {node.get('user', {}).get('publicKey')}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
role = f"{node.get('user', {}).get('role', 'Unknown')}"
message_parts.append(f"• Role: {role}")
pk = f"{node.get('user', {}).get('publicKey')}"
message_parts.append(f"Public key: {pk}")
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
if "position" in node:
pos = node["position"]
has_coords = pos.get("latitude") and pos.get("longitude")
if has_coords:
if pos.get("latitude") and pos.get("longitude"):
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
if pos.get("altitude"):
message_parts.append(f"• Altitude: {pos['altitude']}m")
if has_coords:
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
message_parts.append("")
message_parts.append("**🌐 Network Metrics:**")
message_parts.append("\n**🌐 Network Metrics:**")
if "snr" in node:
snr = node["snr"]
@@ -567,13 +527,12 @@ def handle_f5_key(stdscr: curses.window) -> None:
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else ""
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
if node.get("lastHeard"):
if "lastHeard" in node and node["lastHeard"]:
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
if node.get("deviceMetrics"):
metrics = node["deviceMetrics"]
message_parts.append("")
message_parts.append("**📊 Device Metrics:**")
message_parts.append("\n**📊 Device Metrics:**")
if "batteryLevel" in metrics:
battery = metrics["batteryLevel"]
@@ -594,128 +553,21 @@ def handle_f5_key(stdscr: curses.window) -> None:
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
title = t(
"ui.dialog.node_details_title",
default="📡 Node Details: {name}",
name=node.get("user", {}).get("shortName", "Unknown"),
message = "\n".join(message_parts)
contact.ui.dialog.dialog(
t(
"ui.dialog.node_details_title",
default="📡 Node Details: {name}",
name=node.get("user", {}).get("shortName", "Unknown"),
),
message,
)
return title, message_parts
previous_window = ui_state.current_window
ui_state.current_window = 4
scroll_offset = 0
dialog_win = None
curses.curs_set(0)
refresh_node_selection(highlight=True)
try:
while True:
curses.update_lines_cols()
height, width = curses.LINES, curses.COLS
title, message_lines = build_node_details()
max_line_length = max(len(title), *(len(line) for line in message_lines))
dialog_width = min(max(max_line_length + 4, 20), max(10, width - 2))
dialog_height = min(max(len(message_lines) + 4, 6), max(6, height - 2))
x = max(0, (width - dialog_width) // 2)
y = max(0, (height - dialog_height) // 2)
viewport_h = max(1, dialog_height - 4)
max_scroll = max(0, len(message_lines) - viewport_h)
scroll_offset = max(0, min(scroll_offset, max_scroll))
if dialog_win is None:
dialog_win = curses.newwin(dialog_height, dialog_width, y, x)
else:
dialog_win.erase()
dialog_win.refresh()
dialog_win.resize(dialog_height, dialog_width)
dialog_win.mvwin(y, x)
dialog_win.keypad(True)
dialog_win.bkgd(get_color("background"))
dialog_win.attrset(get_color("window_frame"))
dialog_win.border(0)
try:
dialog_win.addstr(0, 2, title[: max(0, dialog_width - 4)], get_color("settings_default"))
hint = f" {ui_state.selected_node + 1}/{len(ui_state.node_list)} "
dialog_win.addstr(0, max(2, dialog_width - len(hint) - 2), hint, get_color("commands"))
except curses.error:
pass
msg_win = dialog_win.derwin(viewport_h + 2, dialog_width - 2, 1, 1)
msg_win.erase()
for row, line in enumerate(message_lines[scroll_offset : scroll_offset + viewport_h], start=1):
trimmed = line[: max(0, dialog_width - 6)]
try:
msg_win.addstr(row, 1, trimmed, get_color("settings_default"))
except curses.error:
pass
if len(message_lines) > viewport_h:
old_index = ui_state.start_index[4] if len(ui_state.start_index) > 4 else 0
while len(ui_state.start_index) <= 4:
ui_state.start_index.append(0)
ui_state.start_index[4] = scroll_offset
draw_main_arrows(msg_win, len(message_lines) - 1, window=4)
ui_state.start_index[4] = old_index
try:
ok_text = " Up/Down: Nodes PgUp/PgDn: Scroll Esc: Close "
dialog_win.addstr(
dialog_height - 2,
max(1, (dialog_width - len(ok_text)) // 2),
ok_text[: max(0, dialog_width - 2)],
get_color("settings_default", reverse=True),
)
except curses.error:
pass
dialog_win.refresh()
msg_win.noutrefresh()
curses.doupdate()
dialog_win.timeout(200)
char = dialog_win.getch()
if menu_state.need_redraw:
menu_state.need_redraw = False
continue
if char in (27, curses.KEY_LEFT, curses.KEY_ENTER, 10, 13, 32):
break
if char == curses.KEY_UP:
old_selected_node = ui_state.selected_node
ui_state.selected_node = (ui_state.selected_node - 1) % len(ui_state.node_list)
scroll_offset = 0
refresh_node_selection(old_selected_node, highlight=True)
elif char == curses.KEY_DOWN:
old_selected_node = ui_state.selected_node
ui_state.selected_node = (ui_state.selected_node + 1) % len(ui_state.node_list)
scroll_offset = 0
refresh_node_selection(old_selected_node, highlight=True)
elif char == curses.KEY_PPAGE:
scroll_offset = max(0, scroll_offset - viewport_h)
elif char == curses.KEY_NPAGE:
scroll_offset = min(max_scroll, scroll_offset + viewport_h)
elif char == curses.KEY_HOME:
scroll_offset = 0
elif char == curses.KEY_END:
scroll_offset = max_scroll
elif char == curses.KEY_RESIZE:
continue
curses.curs_set(1) # Show cursor again
handle_resize(stdscr, False)
except KeyError:
return
finally:
if dialog_win is not None:
dialog_win.erase()
dialog_win.refresh()
ui_state.current_window = previous_window
curses.curs_set(1)
handle_resize(stdscr, False)
def handle_ctrl_t(stdscr: curses.window) -> None:
@@ -772,7 +624,6 @@ def handle_backtick(stdscr: curses.window) -> None:
ui_state.current_window = 4
settings_menu(stdscr, interface_state.interface)
ui_state.current_window = previous_window
ui_state.single_pane_mode = config.single_pane_mode.lower() == "true"
curses.curs_set(1)
refresh_node_list()
handle_resize(stdscr, False)
@@ -1014,7 +865,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0
for prefix, message in messages:
full_message = normalize_message_text(f"{prefix}{message}")
full_message = f"{prefix}{message}"
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
@@ -1056,8 +907,10 @@ def draw_node_list() -> None:
if ui_state.current_window != 2 and ui_state.single_pane_mode:
return
if nodes_pad is None:
nodes_pad = curses.newpad(1, 1)
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
# if nodes_pad is None:
# nodes_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1, 1)
try:
nodes_pad.erase()
@@ -1071,12 +924,28 @@ def draw_node_list() -> None:
node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
status_icon = "🔐" if secure else "🔓"
node_name = get_node_display_name(node_num, node)
node_name = get_name_from_database(node_num, "long")
user_name = node["user"]["shortName"]
uptime_str = ""
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]:
uptime_str = f" / Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
last_heard_str = f"{get_time_ago(node['lastHeard'])}" if node.get("lastHeard") else ""
hops_str = f" ■ Hops: {node['hopsAway']}" if "hopsAway" in node else ""
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
# Future node name custom formatting possible
node_str = f"{status_icon} {node_name}"
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
nodes_pad.addstr(i, 1, node_str, get_node_row_color(i))
color = "node_list"
if "isFavorite" in node and node["isFavorite"]:
color = "node_favorite"
if "isIgnored" in node and node["isIgnored"]:
color = "node_ignored"
nodes_pad.addstr(
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
)
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
nodes_win.refresh()

View File

@@ -56,13 +56,6 @@ config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def reload_translations() -> None:
global translation_file, field_mapping, help_text
@@ -571,7 +564,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
new_value = new_value == "True" or new_value is True
menu_state.start_index.pop()
elif _is_repeated_field(field): # Handle repeated field - Not currently used
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop()

View File

@@ -1,3 +1,4 @@
from queue import SimpleQueue
from typing import Any, Union, List, Dict
from dataclasses import dataclass, field
@@ -44,3 +45,5 @@ class InterfaceState:
@dataclass
class AppState:
lock: Any = None
rx_queue: SimpleQueue = field(default_factory=SimpleQueue)
ui_shutdown: bool = False

View File

@@ -566,7 +566,7 @@ def save_json(file_path: str, data: Dict[str, Any]) -> None:
formatted_json = config.format_json_single_line_arrays(data)
with open(file_path, "w", encoding="utf-8") as f:
f.write(formatted_json)
config.reload_config()
setup_colors(reinit=True)
reload_translations(data.get("language"))

View File

@@ -9,17 +9,6 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
checking `label == LABEL_REPEATED`.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root)
@@ -100,7 +89,7 @@ def setPref(config, comp_name, raw_val) -> bool:
return False
# repeating fields need to be handled with append, not setattr
if not _is_repeated_field(pref):
if pref.label != pref.LABEL_REPEATED:
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)

View File

@@ -1,54 +0,0 @@
"""Helpers for normalizing emoji sequences in width-sensitive message rendering."""
# Strip zero-width and presentation modifiers that make terminal cell width inconsistent.
EMOJI_MODIFIER_REPLACEMENTS = {
"\u200d": "",
"\u20e3": "",
"\ufe0e": "",
"\ufe0f": "",
"\U0001F3FB": "",
"\U0001F3FC": "",
"\U0001F3FD": "",
"\U0001F3FE": "",
"\U0001F3FF": "",
}
_EMOJI_MODIFIER_TRANSLATION = str.maketrans(EMOJI_MODIFIER_REPLACEMENTS)
_REGIONAL_INDICATOR_START = ord("\U0001F1E6")
_REGIONAL_INDICATOR_END = ord("\U0001F1FF")
def _regional_indicator_to_letter(char: str) -> str:
return chr(ord("A") + ord(char) - _REGIONAL_INDICATOR_START)
def _normalize_flag_emoji(text: str) -> str:
"""Convert flag emoji built from regional indicators into ASCII country codes."""
normalized = []
index = 0
while index < len(text):
current = text[index]
current_ord = ord(current)
if _REGIONAL_INDICATOR_START <= current_ord <= _REGIONAL_INDICATOR_END and index + 1 < len(text):
next_char = text[index + 1]
next_ord = ord(next_char)
if _REGIONAL_INDICATOR_START <= next_ord <= _REGIONAL_INDICATOR_END:
normalized.append(_regional_indicator_to_letter(current))
normalized.append(_regional_indicator_to_letter(next_char))
index += 2
continue
normalized.append(current)
index += 1
return "".join(normalized)
def normalize_message_text(text: str) -> str:
"""Strip modifiers and rewrite flag emoji into stable terminal-friendly text."""
if not text:
return text
return _normalize_flag_emoji(text.translate(_EMOJI_MODIFIER_TRANSLATION))

View File

@@ -68,19 +68,19 @@ def get_chunks(data):
# Leave it string as last resort
value = value
# Python 3.9-compatible alternative to match/case.
if key == "uptime_seconds":
match key:
# convert seconds to hours, for our sanity
value = round(value / 60 / 60, 1)
elif key in ("longitude_i", "latitude_i"):
case "uptime_seconds":
value = round(value / 60 / 60, 1)
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate
value = round(value * 1e-7, 6)
elif key == "wind_direction":
case "longitude_i" | "latitude_i":
value = round(value * 1e-7, 6)
# Convert wind direction from degrees to abbreviation
value = humanize_wind_direction(value)
elif key == "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
case "wind_direction":
value = humanize_wind_direction(value)
case "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.4.22"
version = "1.4.14"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}