Compare commits

..

12 Commits

Author SHA1 Message Date
pdxlocations
dd0eb1473c Enhance UI color handling for channel and node rows, and refactor refresh logic 2026-03-18 22:17:08 -07:00
pdxlocations
7d9703a548 Merge pull request #252 from pdxlocations:normalize-emojis
Add emoji normalization utility and integrate into message rendering
2026-03-18 21:37:02 -07:00
pdxlocations
68b8d15181 Add emoji normalization utility and integrate into message rendering 2026-03-18 21:36:46 -07:00
pdxlocations
4ef87871df bump version 2026-03-12 19:37:21 -07:00
pdxlocations
18df7d326a Merge pull request #250 from pdxlocations:resize-faster
Add resize event handling
2026-03-12 16:21:15 -07:00
pdxlocations
c7b54caf45 Add resize event handling and update version to 1.4.18 2026-03-12 15:30:03 -07:00
pdxlocations
773f43edd8 Merge pull request #248 from pdxlocations:fix-protubuf-depend
Fix-protubuf-depend
2026-03-02 16:31:55 -08:00
pdxlocations
6af1c46bd3 bump version to 1.4.17 2026-03-02 16:31:33 -08:00
pdxlocations
7e3e44df24 Refactor repeated field handling in protobuf utilities 2026-03-02 16:31:09 -08:00
pdxlocations
45626f5e83 bump version 2026-02-28 10:32:43 -08:00
pdxlocations
e9181972b2 bump version 2026-02-28 10:32:16 -08:00
pdxlocations
795ab84ef5 Fix 3.9 compatibility 2026-02-28 10:31:48 -08:00
8 changed files with 179 additions and 109 deletions

View File

@@ -5,9 +5,10 @@ import shutil
import time
import subprocess
import threading
from typing import Any, Dict, Optional
# Debounce notification sounds so a burst of queued messages only plays once.
_SOUND_DEBOUNCE_SECONDS = 0.8
_sound_timer: threading.Timer | None = None
_sound_timer: Optional[threading.Timer] = None
_sound_timer_lock = threading.Lock()
_last_sound_request = 0.0
@@ -42,8 +43,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
_sound_timer = threading.Timer(delay, _fire, args=(now,))
_sound_timer.daemon = True
_sound_timer.start()
from typing import Any, Dict
from contact.utilities.utils import (
refresh_node_list,
add_new_message,

View File

@@ -12,14 +12,17 @@ from contact.ui.colors import get_color
from contact.utilities.db_handler import get_name_from_database, update_node_info_in_db, is_chat_archived
from contact.utilities.input_handlers import get_list_input
from contact.utilities.i18n import t
from contact.utilities.emoji_utils import normalize_message_text
import contact.ui.default_config as config
import contact.ui.dialog
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
from contact.utilities.singleton import ui_state, interface_state, menu_state
MIN_COL = 2 # keep hidden panes at least border-safe for curses
MIN_COL = 1 # "effectively zero" without breaking curses
RESIZE_DEBOUNCE_MS = 250
root_win = None
nodes_pad = None
# Draw arrows for a specific window id (0=channel,1=messages,2=nodes).
@@ -48,24 +51,62 @@ def compute_widths(total_w: int, focus: int):
# tiny terminals: allocate something, anything
return max(1, total_w), 0, 0
hidden_w = MIN_COL
if focus == 0:
return total_w - 2 * hidden_w, hidden_w, hidden_w
return total_w - 2 * MIN_COL, MIN_COL, MIN_COL
if focus == 1:
return hidden_w, total_w - 2 * hidden_w, hidden_w
return hidden_w, hidden_w, total_w - 2 * hidden_w
return MIN_COL, total_w - 2 * MIN_COL, MIN_COL
return MIN_COL, MIN_COL, total_w - 2 * MIN_COL
def paint_frame(win, selected: bool) -> None:
h, w = win.getmaxyx()
if h < 2 or w < 2:
return
win.attrset(get_color("window_frame_selected") if selected else get_color("window_frame"))
win.box()
win.attrset(get_color("window_frame"))
win.refresh()
def get_channel_row_color(index: int) -> int:
if index == ui_state.selected_channel:
if ui_state.current_window == 0:
return get_color("channel_list", reverse=True)
return get_color("channel_selected")
return get_color("channel_list")
def get_node_row_color(index: int) -> int:
node_num = ui_state.node_list[index]
node = interface_state.interface.nodesByNum.get(node_num, {})
color = "node_list"
if node.get("isFavorite"):
color = "node_favorite"
if node.get("isIgnored"):
color = "node_ignored"
return get_color(color, reverse=index == ui_state.selected_node and ui_state.current_window == 2)
def refresh_main_window(window_id: int, selected: bool) -> None:
if window_id == 0:
paint_frame(channel_win, selected=selected)
if ui_state.channel_list:
width = max(0, channel_pad.getmaxyx()[1] - 4)
channel_pad.chgat(ui_state.selected_channel, 1, width, get_channel_row_color(ui_state.selected_channel))
refresh_pad(0)
elif window_id == 1:
paint_frame(messages_win, selected=selected)
refresh_pad(1)
elif window_id == 2:
paint_frame(nodes_win, selected=selected)
if ui_state.node_list and nodes_pad is not None:
width = max(0, nodes_pad.getmaxyx()[1] - 4)
nodes_pad.chgat(ui_state.selected_node, 1, width, get_node_row_color(ui_state.selected_node))
refresh_pad(2)
def get_node_display_name(node_num: int, node: dict) -> str:
user = node.get("user") or {}
return user.get("longName") or get_name_from_database(node_num, "long")
def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
"""Handle terminal resize events and redraw the UI accordingly."""
global messages_pad, messages_win, nodes_pad, nodes_win, channel_pad, channel_win, packetlog_win, entry_win
@@ -145,22 +186,9 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
packetlog_win.resize(pkt_h, messages_width)
packetlog_win.mvwin(height - pkt_h - entry_height, channel_width)
# Draw only currently relevant borders in single-pane mode.
if ui_state.single_pane_mode:
target_wins = [entry_win]
if ui_state.current_window == 0:
target_wins.append(channel_win)
elif ui_state.current_window == 1:
target_wins.append(messages_win)
else:
target_wins.append(nodes_win)
else:
target_wins = [channel_win, entry_win, nodes_win, messages_win]
for win in target_wins:
h, w = win.getmaxyx()
if h >= 2 and w >= 2:
win.box()
# Draw window borders
for win in [channel_win, entry_win, nodes_win, messages_win]:
win.box()
win.refresh()
entry_win.keypad(True)
@@ -178,6 +206,24 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
pass
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
"""Wait for resize events to settle and preserve one queued non-resize key."""
input_win.timeout(RESIZE_DEBOUNCE_MS)
try:
while True:
try:
next_char = input_win.get_wch()
except curses.error:
return None
if next_char == curses.KEY_RESIZE:
continue
return next_char
finally:
input_win.timeout(-1)
def main_ui(stdscr: curses.window) -> None:
"""Main UI loop for the curses interface."""
global input_text
@@ -185,6 +231,7 @@ def main_ui(stdscr: curses.window) -> None:
root_win = stdscr
input_text = ""
queued_char = None
stdscr.keypad(True)
get_channels()
handle_resize(stdscr, True)
@@ -193,7 +240,11 @@ def main_ui(stdscr: curses.window) -> None:
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
# Get user input from entry window
char = entry_win.get_wch()
if queued_char is None:
char = entry_win.get_wch()
else:
char = queued_char
queued_char = None
# draw_debug(f"Keypress: {char}")
@@ -241,7 +292,9 @@ def main_ui(stdscr: curses.window) -> None:
elif char == curses.KEY_RESIZE:
input_text = ""
queued_char = drain_resize_events(entry_win)
handle_resize(stdscr, False)
continue
elif char == chr(4): # Ctrl + D to delete current channel or node
handle_ctrl_d()
@@ -350,31 +403,16 @@ def handle_leftright(char: int) -> None:
delta = -1 if char == curses.KEY_LEFT else 1
old_window = ui_state.current_window
ui_state.current_window = (ui_state.current_window + delta) % 3
handle_resize(root_win, False)
if ui_state.single_pane_mode:
handle_resize(root_win, False)
return
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
if old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
refresh_main_window(old_window, selected=False)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
refresh_main_window(ui_state.current_window, selected=True)
draw_window_arrows(ui_state.current_window)
@@ -395,31 +433,16 @@ def handle_function_keys(char: int) -> None:
return
ui_state.current_window = target
handle_resize(root_win, False)
if ui_state.single_pane_mode:
handle_resize(root_win, False)
return
if old_window == 0:
paint_frame(channel_win, selected=False)
refresh_pad(0)
elif old_window == 1:
paint_frame(messages_win, selected=False)
refresh_pad(1)
elif old_window == 2:
paint_frame(nodes_win, selected=False)
refresh_pad(2)
refresh_main_window(old_window, selected=False)
if not ui_state.single_pane_mode:
draw_window_arrows(old_window)
if ui_state.current_window == 0:
paint_frame(channel_win, selected=True)
refresh_pad(0)
elif ui_state.current_window == 1:
paint_frame(messages_win, selected=True)
refresh_pad(1)
elif ui_state.current_window == 2:
paint_frame(nodes_win, selected=True)
refresh_pad(2)
refresh_main_window(ui_state.current_window, selected=True)
draw_window_arrows(ui_state.current_window)
@@ -856,7 +879,7 @@ def draw_messages_window(scroll_to_bottom: bool = False) -> None:
row = 0
for prefix, message in messages:
full_message = f"{prefix}{message}"
full_message = normalize_message_text(f"{prefix}{message}")
wrapped_lines = wrap_text(full_message, messages_win.getmaxyx()[1] - 2)
msg_line_count += len(wrapped_lines)
messages_pad.resize(msg_line_count, messages_win.getmaxyx()[1])
@@ -898,10 +921,8 @@ def draw_node_list() -> None:
if ui_state.current_window != 2 and ui_state.single_pane_mode:
return
# This didn't work, for some reason an error is thown on startup, so we just create the pad every time
# if nodes_pad is None:
# nodes_pad = curses.newpad(1, 1)
nodes_pad = curses.newpad(1, 1)
if nodes_pad is None:
nodes_pad = curses.newpad(1, 1)
try:
nodes_pad.erase()
@@ -915,28 +936,12 @@ def draw_node_list() -> None:
node = interface_state.interface.nodesByNum[node_num]
secure = "user" in node and "publicKey" in node["user"] and node["user"]["publicKey"]
status_icon = "🔐" if secure else "🔓"
node_name = get_name_from_database(node_num, "long")
user_name = node["user"]["shortName"]
uptime_str = ""
if "deviceMetrics" in node and "uptimeSeconds" in node["deviceMetrics"]:
uptime_str = f" / Up: {get_readable_duration(node['deviceMetrics']['uptimeSeconds'])}"
last_heard_str = f"{get_time_ago(node['lastHeard'])}" if node.get("lastHeard") else ""
hops_str = f" ■ Hops: {node['hopsAway']}" if "hopsAway" in node else ""
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
node_name = get_node_display_name(node_num, node)
# Future node name custom formatting possible
node_str = f"{status_icon} {node_name}"
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
color = "node_list"
if "isFavorite" in node and node["isFavorite"]:
color = "node_favorite"
if "isIgnored" in node and node["isIgnored"]:
color = "node_ignored"
nodes_pad.addstr(
i, 1, node_str, get_color(color, reverse=ui_state.selected_node == i and ui_state.current_window == 2)
)
nodes_pad.addstr(i, 1, node_str, get_node_row_color(i))
paint_frame(nodes_win, selected=(ui_state.current_window == 2))
nodes_win.refresh()

View File

@@ -56,6 +56,13 @@ config_folder = os.path.abspath(config.node_configs_file_path)
# Load translations
field_mapping, help_text = parse_ini_file(translation_file)
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def reload_translations() -> None:
global translation_file, field_mapping, help_text
@@ -564,7 +571,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
new_value = new_value == "True" or new_value is True
menu_state.start_index.pop()
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
elif _is_repeated_field(field): # Handle repeated field - Not currently used
new_value = get_repeated_input(current_value)
new_value = current_value if new_value is None else new_value.split(", ")
menu_state.start_index.pop()

View File

@@ -435,16 +435,10 @@ def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None
usable_height = height - 2
usable_width = width - 2
if usable_height <= 0 or usable_width <= 0:
return
if window == 1 and ui_state.display_log:
if log_height := kwargs.get("log_height"):
usable_height -= log_height - 1
if usable_height <= 0:
return
if usable_height < max_index:
if ui_state.start_index[window] > 0:
win.addstr(1, usable_width, "", get_color("settings_default"))

View File

@@ -9,6 +9,17 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
# defs are from meshtastic/python/main
def _is_repeated_field(field_desc) -> bool:
"""Return True if the protobuf field is repeated.
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
checking `label == LABEL_REPEATED`.
"""
if hasattr(field_desc, "is_repeated"):
return bool(field_desc.is_repeated)
return field_desc.label == field_desc.LABEL_REPEATED
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = camel_to_snake(config_root)
@@ -89,7 +100,7 @@ def setPref(config, comp_name, raw_val) -> bool:
return False
# repeating fields need to be handled with append, not setattr
if pref.label != pref.LABEL_REPEATED:
if not _is_repeated_field(pref):
try:
if config_type.message_type is not None:
config_values = getattr(config_part, config_type.name)

View File

@@ -0,0 +1,54 @@
"""Helpers for normalizing emoji sequences in width-sensitive message rendering."""
# Strip zero-width and presentation modifiers that make terminal cell width inconsistent.
EMOJI_MODIFIER_REPLACEMENTS = {
"\u200d": "",
"\u20e3": "",
"\ufe0e": "",
"\ufe0f": "",
"\U0001F3FB": "",
"\U0001F3FC": "",
"\U0001F3FD": "",
"\U0001F3FE": "",
"\U0001F3FF": "",
}
_EMOJI_MODIFIER_TRANSLATION = str.maketrans(EMOJI_MODIFIER_REPLACEMENTS)
_REGIONAL_INDICATOR_START = ord("\U0001F1E6")
_REGIONAL_INDICATOR_END = ord("\U0001F1FF")
def _regional_indicator_to_letter(char: str) -> str:
return chr(ord("A") + ord(char) - _REGIONAL_INDICATOR_START)
def _normalize_flag_emoji(text: str) -> str:
"""Convert flag emoji built from regional indicators into ASCII country codes."""
normalized = []
index = 0
while index < len(text):
current = text[index]
current_ord = ord(current)
if _REGIONAL_INDICATOR_START <= current_ord <= _REGIONAL_INDICATOR_END and index + 1 < len(text):
next_char = text[index + 1]
next_ord = ord(next_char)
if _REGIONAL_INDICATOR_START <= next_ord <= _REGIONAL_INDICATOR_END:
normalized.append(_regional_indicator_to_letter(current))
normalized.append(_regional_indicator_to_letter(next_char))
index += 2
continue
normalized.append(current)
index += 1
return "".join(normalized)
def normalize_message_text(text: str) -> str:
"""Strip modifiers and rewrite flag emoji into stable terminal-friendly text."""
if not text:
return text
return _normalize_flag_emoji(text.translate(_EMOJI_MODIFIER_TRANSLATION))

View File

@@ -68,19 +68,19 @@ def get_chunks(data):
# Leave it string as last resort
value = value
match key:
# Python 3.9-compatible alternative to match/case.
if key == "uptime_seconds":
# convert seconds to hours, for our sanity
case "uptime_seconds":
value = round(value / 60 / 60, 1)
value = round(value / 60 / 60, 1)
elif key in ("longitude_i", "latitude_i"):
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
# truncate to 6th digit after floating point, which would be still accurate
case "longitude_i" | "latitude_i":
value = round(value * 1e-7, 6)
value = round(value * 1e-7, 6)
elif key == "wind_direction":
# Convert wind direction from degrees to abbreviation
case "wind_direction":
value = humanize_wind_direction(value)
case "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
value = humanize_wind_direction(value)
elif key == "time":
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
if key in sensors:
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "

View File

@@ -1,6 +1,6 @@
[project]
name = "contact"
version = "1.4.14"
version = "1.4.22"
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
authors = [
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}