mirror of
https://github.com/pdxlocations/contact.git
synced 2026-03-28 17:12:35 +01:00
Compare commits
11 Commits
single-pan
...
1.4.19
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ea958ce8a | ||
|
|
c5e93fdd2d | ||
|
|
35a0ff5834 | ||
|
|
18df7d326a | ||
|
|
c7b54caf45 | ||
|
|
773f43edd8 | ||
|
|
6af1c46bd3 | ||
|
|
7e3e44df24 | ||
|
|
45626f5e83 | ||
|
|
e9181972b2 | ||
|
|
795ab84ef5 |
@@ -5,9 +5,10 @@ import shutil
|
||||
import time
|
||||
import subprocess
|
||||
import threading
|
||||
from typing import Any, Dict, Optional
|
||||
# Debounce notification sounds so a burst of queued messages only plays once.
|
||||
_SOUND_DEBOUNCE_SECONDS = 0.8
|
||||
_sound_timer: threading.Timer | None = None
|
||||
_sound_timer: Optional[threading.Timer] = None
|
||||
_sound_timer_lock = threading.Lock()
|
||||
_last_sound_request = 0.0
|
||||
|
||||
@@ -42,8 +43,6 @@ def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
||||
_sound_timer = threading.Timer(delay, _fire, args=(now,))
|
||||
_sound_timer.daemon = True
|
||||
_sound_timer.start()
|
||||
from typing import Any, Dict
|
||||
|
||||
from contact.utilities.utils import (
|
||||
refresh_node_list,
|
||||
add_new_message,
|
||||
|
||||
@@ -14,11 +14,12 @@ from contact.utilities.input_handlers import get_list_input
|
||||
from contact.utilities.i18n import t
|
||||
import contact.ui.default_config as config
|
||||
import contact.ui.dialog
|
||||
from contact.ui.nav_utils import move_main_highlight, draw_main_arrows, get_msg_window_lines, wrap_text
|
||||
from contact.ui.nav_utils import draw_main_arrows, fit_text, get_msg_window_lines, move_main_highlight, wrap_text
|
||||
from contact.utilities.singleton import ui_state, interface_state, menu_state
|
||||
|
||||
|
||||
MIN_COL = 2 # keep hidden panes at least border-safe for curses
|
||||
MIN_COL = 1 # "effectively zero" without breaking curses
|
||||
RESIZE_DEBOUNCE_MS = 250
|
||||
root_win = None
|
||||
|
||||
|
||||
@@ -48,18 +49,14 @@ def compute_widths(total_w: int, focus: int):
|
||||
# tiny terminals: allocate something, anything
|
||||
return max(1, total_w), 0, 0
|
||||
|
||||
hidden_w = MIN_COL
|
||||
if focus == 0:
|
||||
return total_w - 2 * hidden_w, hidden_w, hidden_w
|
||||
return total_w - 2 * MIN_COL, MIN_COL, MIN_COL
|
||||
if focus == 1:
|
||||
return hidden_w, total_w - 2 * hidden_w, hidden_w
|
||||
return hidden_w, hidden_w, total_w - 2 * hidden_w
|
||||
return MIN_COL, total_w - 2 * MIN_COL, MIN_COL
|
||||
return MIN_COL, MIN_COL, total_w - 2 * MIN_COL
|
||||
|
||||
|
||||
def paint_frame(win, selected: bool) -> None:
|
||||
h, w = win.getmaxyx()
|
||||
if h < 2 or w < 2:
|
||||
return
|
||||
win.attrset(get_color("window_frame_selected") if selected else get_color("window_frame"))
|
||||
win.box()
|
||||
win.attrset(get_color("window_frame"))
|
||||
@@ -145,22 +142,9 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||
packetlog_win.resize(pkt_h, messages_width)
|
||||
packetlog_win.mvwin(height - pkt_h - entry_height, channel_width)
|
||||
|
||||
# Draw only currently relevant borders in single-pane mode.
|
||||
if ui_state.single_pane_mode:
|
||||
target_wins = [entry_win]
|
||||
if ui_state.current_window == 0:
|
||||
target_wins.append(channel_win)
|
||||
elif ui_state.current_window == 1:
|
||||
target_wins.append(messages_win)
|
||||
else:
|
||||
target_wins.append(nodes_win)
|
||||
else:
|
||||
target_wins = [channel_win, entry_win, nodes_win, messages_win]
|
||||
|
||||
for win in target_wins:
|
||||
h, w = win.getmaxyx()
|
||||
if h >= 2 and w >= 2:
|
||||
win.box()
|
||||
# Draw window borders
|
||||
for win in [channel_win, entry_win, nodes_win, messages_win]:
|
||||
win.box()
|
||||
win.refresh()
|
||||
|
||||
entry_win.keypad(True)
|
||||
@@ -178,6 +162,24 @@ def handle_resize(stdscr: curses.window, firstrun: bool) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def drain_resize_events(input_win: curses.window) -> Union[str, int, None]:
|
||||
"""Wait for resize events to settle and preserve one queued non-resize key."""
|
||||
input_win.timeout(RESIZE_DEBOUNCE_MS)
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
next_char = input_win.get_wch()
|
||||
except curses.error:
|
||||
return None
|
||||
|
||||
if next_char == curses.KEY_RESIZE:
|
||||
continue
|
||||
|
||||
return next_char
|
||||
finally:
|
||||
input_win.timeout(-1)
|
||||
|
||||
|
||||
def main_ui(stdscr: curses.window) -> None:
|
||||
"""Main UI loop for the curses interface."""
|
||||
global input_text
|
||||
@@ -185,6 +187,7 @@ def main_ui(stdscr: curses.window) -> None:
|
||||
|
||||
root_win = stdscr
|
||||
input_text = ""
|
||||
queued_char = None
|
||||
stdscr.keypad(True)
|
||||
get_channels()
|
||||
handle_resize(stdscr, True)
|
||||
@@ -193,7 +196,11 @@ def main_ui(stdscr: curses.window) -> None:
|
||||
draw_text_field(entry_win, f"Message: {(input_text or '')[-(stdscr.getmaxyx()[1] - 10):]}", get_color("input"))
|
||||
|
||||
# Get user input from entry window
|
||||
char = entry_win.get_wch()
|
||||
if queued_char is None:
|
||||
char = entry_win.get_wch()
|
||||
else:
|
||||
char = queued_char
|
||||
queued_char = None
|
||||
|
||||
# draw_debug(f"Keypress: {char}")
|
||||
|
||||
@@ -241,7 +248,9 @@ def main_ui(stdscr: curses.window) -> None:
|
||||
|
||||
elif char == curses.KEY_RESIZE:
|
||||
input_text = ""
|
||||
queued_char = drain_resize_events(entry_win)
|
||||
handle_resize(stdscr, False)
|
||||
continue
|
||||
|
||||
elif char == chr(4): # Ctrl + D to delete current channel or node
|
||||
handle_ctrl_d()
|
||||
@@ -819,9 +828,7 @@ def draw_channel_list() -> None:
|
||||
notification = " " + config.notification_symbol if idx in ui_state.notifications else ""
|
||||
|
||||
# Truncate the channel name if it's too long to fit in the window
|
||||
truncated_channel = (
|
||||
(channel[: win_width - 5] + "-" if len(channel) > win_width - 5 else channel) + notification
|
||||
).ljust(win_width - 3)
|
||||
truncated_channel = fit_text(f"{channel}{notification}", win_width - 3, suffix="-")
|
||||
|
||||
color = get_color("channel_list")
|
||||
if idx == ui_state.selected_channel:
|
||||
@@ -927,8 +934,7 @@ def draw_node_list() -> None:
|
||||
snr_str = f" ■ SNR: {node['snr']}dB" if node.get("hopsAway") == 0 and "snr" in node else ""
|
||||
|
||||
# Future node name custom formatting possible
|
||||
node_str = f"{status_icon} {node_name}"
|
||||
node_str = node_str.ljust(box_width - 4)[: box_width - 2]
|
||||
node_str = fit_text(f"{status_icon} {node_name}", box_width - 2)
|
||||
color = "node_list"
|
||||
if "isFavorite" in node and node["isFavorite"]:
|
||||
color = "node_favorite"
|
||||
@@ -1057,9 +1063,16 @@ def draw_packetlog_win() -> None:
|
||||
span += column
|
||||
|
||||
# Add headers
|
||||
headers = f"{'From':<{columns[0]}} {'To':<{columns[1]}} {'Port':<{columns[2]}} {'Payload':<{width-span}}"
|
||||
headers = " ".join(
|
||||
[
|
||||
fit_text("From", columns[0]),
|
||||
fit_text("To", columns[1]),
|
||||
fit_text("Port", columns[2]),
|
||||
fit_text("Payload", max(1, width - span - 3)),
|
||||
]
|
||||
)
|
||||
packetlog_win.addstr(
|
||||
1, 1, headers[: width - 2], get_color("log_header", underline=True)
|
||||
1, 1, fit_text(headers, width - 2), get_color("log_header", underline=True)
|
||||
) # Truncate headers if they exceed window width
|
||||
|
||||
for i, packet in enumerate(reversed(ui_state.packet_buffer)):
|
||||
@@ -1067,22 +1080,22 @@ def draw_packetlog_win() -> None:
|
||||
break
|
||||
|
||||
# Format each field
|
||||
from_id = get_name_from_database(packet["from"], "short").ljust(columns[0])
|
||||
from_id = fit_text(get_name_from_database(packet["from"], "short"), columns[0])
|
||||
to_id = (
|
||||
"BROADCAST".ljust(columns[1])
|
||||
fit_text("BROADCAST", columns[1])
|
||||
if str(packet["to"]) == "4294967295"
|
||||
else get_name_from_database(packet["to"], "short").ljust(columns[1])
|
||||
else fit_text(get_name_from_database(packet["to"], "short"), columns[1])
|
||||
)
|
||||
if "decoded" in packet:
|
||||
port = str(packet["decoded"].get("portnum", "")).ljust(columns[2])
|
||||
port = fit_text(str(packet["decoded"].get("portnum", "")), columns[2])
|
||||
parsed_payload = parse_protobuf(packet)
|
||||
else:
|
||||
port = "NO KEY".ljust(columns[2])
|
||||
port = fit_text("NO KEY", columns[2])
|
||||
parsed_payload = "NO KEY"
|
||||
|
||||
# Combine and truncate if necessary
|
||||
logString = f"{from_id} {to_id} {port} {parsed_payload}"
|
||||
logString = logString[: width - 3]
|
||||
logString = fit_text(logString, width - 3)
|
||||
|
||||
# Add to the window
|
||||
packetlog_win.addstr(i + 2, 1, logString, get_color("log"))
|
||||
|
||||
@@ -56,6 +56,13 @@ config_folder = os.path.abspath(config.node_configs_file_path)
|
||||
# Load translations
|
||||
field_mapping, help_text = parse_ini_file(translation_file)
|
||||
|
||||
def _is_repeated_field(field_desc) -> bool:
|
||||
"""Return True if the protobuf field is repeated.
|
||||
Protobuf 6.31.0 and later use an is_repeated property, while older versions compare against the label field.
|
||||
"""
|
||||
if hasattr(field_desc, "is_repeated"):
|
||||
return bool(field_desc.is_repeated)
|
||||
return field_desc.label == field_desc.LABEL_REPEATED
|
||||
|
||||
def reload_translations() -> None:
|
||||
global translation_file, field_mapping, help_text
|
||||
@@ -564,7 +571,7 @@ def settings_menu(stdscr: object, interface: object) -> None:
|
||||
new_value = new_value == "True" or new_value is True
|
||||
menu_state.start_index.pop()
|
||||
|
||||
elif field.label == field.LABEL_REPEATED: # Handle repeated field - Not currently used
|
||||
elif _is_repeated_field(field): # Handle repeated field - Not currently used
|
||||
new_value = get_repeated_input(current_value)
|
||||
new_value = current_value if new_value is None else new_value.split(", ")
|
||||
menu_state.start_index.pop()
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import curses
|
||||
import re
|
||||
from unicodedata import east_asian_width
|
||||
from typing import Any, Optional, List, Dict
|
||||
|
||||
from wcwidth import wcwidth, wcswidth
|
||||
|
||||
from contact.ui.colors import get_color
|
||||
from contact.utilities.i18n import t
|
||||
from contact.utilities.control_utils import transform_menu_path
|
||||
from typing import Any, Optional, List, Dict
|
||||
from contact.utilities.singleton import interface_state, ui_state
|
||||
|
||||
|
||||
@@ -328,7 +329,65 @@ def get_wrapped_help_text(
|
||||
|
||||
|
||||
def text_width(text: str) -> int:
|
||||
return sum(2 if east_asian_width(c) in "FW" else 1 for c in text)
|
||||
width = wcswidth(text)
|
||||
if width >= 0:
|
||||
return width
|
||||
return sum(max(wcwidth(char), 0) for char in text)
|
||||
|
||||
|
||||
def slice_text_to_width(text: str, max_width: int) -> str:
|
||||
"""Return the longest prefix that fits within max_width terminal cells."""
|
||||
if max_width <= 0:
|
||||
return ""
|
||||
|
||||
chunk = ""
|
||||
for char in text:
|
||||
candidate = chunk + char
|
||||
if text_width(candidate) > max_width:
|
||||
break
|
||||
chunk = candidate
|
||||
|
||||
return chunk
|
||||
|
||||
|
||||
def fit_text(text: str, width: int, suffix: str = "") -> str:
|
||||
"""Trim and pad text so its terminal display width fits exactly."""
|
||||
if width <= 0:
|
||||
return ""
|
||||
|
||||
if text_width(text) > width:
|
||||
suffix = slice_text_to_width(suffix, width)
|
||||
available = max(0, width - text_width(suffix))
|
||||
text = slice_text_to_width(text, available).rstrip() + suffix
|
||||
|
||||
padding = max(0, width - text_width(text))
|
||||
return text + (" " * padding)
|
||||
|
||||
|
||||
def split_text_to_width(text: str, max_width: int) -> List[str]:
|
||||
"""Split text into chunks that each fit within max_width terminal cells."""
|
||||
if max_width <= 0:
|
||||
return [""]
|
||||
|
||||
chunks: List[str] = []
|
||||
chunk = ""
|
||||
|
||||
for char in text:
|
||||
candidate = chunk + char
|
||||
if chunk and text_width(candidate) > max_width:
|
||||
chunks.append(chunk)
|
||||
chunk = char
|
||||
else:
|
||||
chunk = candidate
|
||||
|
||||
if text_width(chunk) > max_width:
|
||||
chunks.append(slice_text_to_width(chunk, max_width))
|
||||
chunk = ""
|
||||
|
||||
if chunk:
|
||||
chunks.append(chunk)
|
||||
|
||||
return chunks or [""]
|
||||
|
||||
|
||||
def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
@@ -346,24 +405,18 @@ def wrap_text(text: str, wrap_width: int) -> List[str]:
|
||||
wrap_width -= margin
|
||||
|
||||
for word in words:
|
||||
word_length = text_width(word)
|
||||
word_chunks = split_text_to_width(word, wrap_width) if text_width(word) > wrap_width else [word]
|
||||
|
||||
if word_length > wrap_width: # Break long words
|
||||
if line_buffer:
|
||||
for chunk in word_chunks:
|
||||
chunk_length = text_width(chunk)
|
||||
|
||||
if line_length + chunk_length > wrap_width and chunk.strip():
|
||||
wrapped_lines.append(line_buffer.strip())
|
||||
line_buffer = ""
|
||||
line_length = 0
|
||||
for i in range(0, word_length, wrap_width):
|
||||
wrapped_lines.append(word[i : i + wrap_width])
|
||||
continue
|
||||
|
||||
if line_length + word_length > wrap_width and word.strip():
|
||||
wrapped_lines.append(line_buffer.strip())
|
||||
line_buffer = ""
|
||||
line_length = 0
|
||||
|
||||
line_buffer += word
|
||||
line_length += word_length
|
||||
line_buffer += chunk
|
||||
line_length += chunk_length
|
||||
|
||||
if line_buffer:
|
||||
wrapped_lines.append(line_buffer.strip())
|
||||
@@ -435,16 +488,10 @@ def draw_main_arrows(win: object, max_index: int, window: int, **kwargs) -> None
|
||||
usable_height = height - 2
|
||||
usable_width = width - 2
|
||||
|
||||
if usable_height <= 0 or usable_width <= 0:
|
||||
return
|
||||
|
||||
if window == 1 and ui_state.display_log:
|
||||
if log_height := kwargs.get("log_height"):
|
||||
usable_height -= log_height - 1
|
||||
|
||||
if usable_height <= 0:
|
||||
return
|
||||
|
||||
if usable_height < max_index:
|
||||
if ui_state.start_index[window] > 0:
|
||||
win.addstr(1, usable_width, "▲", get_color("settings_default"))
|
||||
|
||||
@@ -9,6 +9,17 @@ from meshtastic.util import camel_to_snake, snake_to_camel, fromStr
|
||||
# defs are from meshtastic/python/main
|
||||
|
||||
|
||||
def _is_repeated_field(field_desc) -> bool:
|
||||
"""Return True if the protobuf field is repeated.
|
||||
|
||||
Protobuf 6.31.0+ exposes `is_repeated`, while older versions require
|
||||
checking `label == LABEL_REPEATED`.
|
||||
"""
|
||||
if hasattr(field_desc, "is_repeated"):
|
||||
return bool(field_desc.is_repeated)
|
||||
return field_desc.label == field_desc.LABEL_REPEATED
|
||||
|
||||
|
||||
def traverseConfig(config_root, config, interface_config) -> bool:
|
||||
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
|
||||
snake_name = camel_to_snake(config_root)
|
||||
@@ -89,7 +100,7 @@ def setPref(config, comp_name, raw_val) -> bool:
|
||||
return False
|
||||
|
||||
# repeating fields need to be handled with append, not setattr
|
||||
if pref.label != pref.LABEL_REPEATED:
|
||||
if not _is_repeated_field(pref):
|
||||
try:
|
||||
if config_type.message_type is not None:
|
||||
config_values = getattr(config_part, config_type.name)
|
||||
|
||||
@@ -68,19 +68,19 @@ def get_chunks(data):
|
||||
# Leave it string as last resort
|
||||
value = value
|
||||
|
||||
match key:
|
||||
# Python 3.9-compatible alternative to match/case.
|
||||
if key == "uptime_seconds":
|
||||
# convert seconds to hours, for our sanity
|
||||
case "uptime_seconds":
|
||||
value = round(value / 60 / 60, 1)
|
||||
value = round(value / 60 / 60, 1)
|
||||
elif key in ("longitude_i", "latitude_i"):
|
||||
# Convert position to degrees (humanize), as per Meshtastic protobuf comment for this telemetry
|
||||
# truncate to 6th digit after floating point, which would be still accurate
|
||||
case "longitude_i" | "latitude_i":
|
||||
value = round(value * 1e-7, 6)
|
||||
value = round(value * 1e-7, 6)
|
||||
elif key == "wind_direction":
|
||||
# Convert wind direction from degrees to abbreviation
|
||||
case "wind_direction":
|
||||
value = humanize_wind_direction(value)
|
||||
case "time":
|
||||
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
|
||||
value = humanize_wind_direction(value)
|
||||
elif key == "time":
|
||||
value = datetime.datetime.fromtimestamp(int(value)).strftime("%d.%m.%Y %H:%m")
|
||||
|
||||
if key in sensors:
|
||||
parsed+= f"{sensors[key.strip()]['icon']}{value}{sensors[key]['unit']} "
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "contact"
|
||||
version = "1.4.14"
|
||||
version = "1.4.19"
|
||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||
authors = [
|
||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||
@@ -9,7 +9,8 @@ license = "GPL-3.0-only"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.9,<3.15"
|
||||
dependencies = [
|
||||
"meshtastic (>=2.7.5,<3.0.0)"
|
||||
"meshtastic (>=2.7.5,<3.0.0)",
|
||||
"wcwidth (>=0.2.13,<1.0.0)"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
meshtastic
|
||||
wcwidth>=0.2.13,<1.0.0
|
||||
|
||||
Reference in New Issue
Block a user