forked from iarv/contact
Compare commits
15 Commits
rm-functio
...
1.4.7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
56637f806b | ||
|
|
c6abedec75 | ||
|
|
6b18809215 | ||
|
|
b048fe2480 | ||
|
|
600fc61ed7 | ||
|
|
fbf5ff6bd3 | ||
|
|
faab1e961f | ||
|
|
255db3aa3c | ||
|
|
42717c956f | ||
|
|
ad77eba0d6 | ||
|
|
7d6918c69e | ||
|
|
70646a1214 | ||
|
|
53c1320d87 | ||
|
|
ed9ff60f97 | ||
|
|
443df7bf48 |
@@ -42,7 +42,7 @@ For smaller displays you may wish to enable `single_pane_mode`:
|
||||
- `↑→↓←` = Navigate around the UI.
|
||||
- `F1/F2/F3` = Jump to Channel/Messages/Nodes
|
||||
- `ENTER` = Send a message typed in the Input Window, or with the Node List highlighted, select a node to DM
|
||||
- `` ` ` or F12` = Open the Settings dialogue
|
||||
- `` ` `` or `F12` = Open the Settings dialogue
|
||||
- `CTRL` + `p` = Hide/show a log of raw received packets.
|
||||
- `CTRL` + `t` or `F4` = With the Node List highlighted, send a traceroute to the selected node
|
||||
- `F5` = Display a node's info
|
||||
|
||||
@@ -129,8 +129,10 @@ def start() -> None:
|
||||
|
||||
try:
|
||||
curses.wrapper(main)
|
||||
interface_state.interface.close()
|
||||
except KeyboardInterrupt:
|
||||
logging.info("User exited with Ctrl+C")
|
||||
interface_state.interface.close()
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logging.critical("Fatal error", exc_info=True)
|
||||
|
||||
@@ -2,7 +2,46 @@ import logging
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import time
|
||||
import subprocess
|
||||
import threading
|
||||
# Debounce notification sounds so a burst of queued messages only plays once.
|
||||
_SOUND_DEBOUNCE_SECONDS = 0.8
|
||||
_sound_timer: threading.Timer | None = None
|
||||
_sound_timer_lock = threading.Lock()
|
||||
_last_sound_request = 0.0
|
||||
|
||||
|
||||
def schedule_notification_sound(delay: float = _SOUND_DEBOUNCE_SECONDS) -> None:
|
||||
"""Schedule a notification sound after a short quiet period.
|
||||
|
||||
If more messages arrive before the delay elapses, the timer is reset.
|
||||
This prevents playing a sound for each message when a backlog flushes.
|
||||
"""
|
||||
global _sound_timer, _last_sound_request
|
||||
|
||||
now = time.monotonic()
|
||||
with _sound_timer_lock:
|
||||
_last_sound_request = now
|
||||
|
||||
# Cancel any previously scheduled sound.
|
||||
if _sound_timer is not None:
|
||||
try:
|
||||
_sound_timer.cancel()
|
||||
except Exception:
|
||||
pass
|
||||
_sound_timer = None
|
||||
|
||||
def _fire(expected_request_time: float) -> None:
|
||||
# Only play if nothing newer has been scheduled.
|
||||
with _sound_timer_lock:
|
||||
if expected_request_time != _last_sound_request:
|
||||
return
|
||||
play_sound()
|
||||
|
||||
_sound_timer = threading.Timer(delay, _fire, args=(now,))
|
||||
_sound_timer.daemon = True
|
||||
_sound_timer.start()
|
||||
from typing import Any, Dict
|
||||
|
||||
from contact.utilities.utils import (
|
||||
@@ -108,7 +147,7 @@ def on_receive(packet: Dict[str, Any], interface: Any) -> None:
|
||||
|
||||
|
||||
if config.notification_sound == "True":
|
||||
play_sound()
|
||||
schedule_notification_sound()
|
||||
|
||||
message_bytes = packet["decoded"]["payload"]
|
||||
message_string = message_bytes.decode("utf-8")
|
||||
|
||||
@@ -357,7 +357,6 @@ def handle_leftright(char: int) -> None:
|
||||
paint_frame(nodes_win, selected=True)
|
||||
refresh_pad(2)
|
||||
|
||||
# Draw arrows last; force even in multi-pane to avoid flicker
|
||||
draw_window_arrows(ui_state.current_window)
|
||||
|
||||
|
||||
@@ -454,80 +453,85 @@ def handle_f5_key(stdscr: curses.window) -> None:
|
||||
node = None
|
||||
try:
|
||||
node = interface_state.interface.nodesByNum[ui_state.node_list[ui_state.selected_node]]
|
||||
|
||||
message_parts = []
|
||||
|
||||
message_parts.append("**📋 Basic Information:**")
|
||||
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
|
||||
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
|
||||
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
|
||||
|
||||
role = f"{node.get('user', {}).get('role', 'Unknown')}"
|
||||
message_parts.append(f"• Role: {role}")
|
||||
|
||||
pk = f"{node.get('user', {}).get('publicKey')}"
|
||||
message_parts.append(f"Public key: {pk}")
|
||||
|
||||
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
|
||||
if "position" in node:
|
||||
pos = node["position"]
|
||||
if pos.get("latitude") and pos.get("longitude"):
|
||||
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
|
||||
if pos.get("altitude"):
|
||||
message_parts.append(f"• Altitude: {pos['altitude']}m")
|
||||
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
|
||||
|
||||
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
|
||||
message_parts.append("\n**🌐 Network Metrics:**")
|
||||
|
||||
if "snr" in node:
|
||||
snr = node["snr"]
|
||||
snr_status = (
|
||||
"🟢 Excellent"
|
||||
if snr > 10
|
||||
else (
|
||||
"🟡 Good"
|
||||
if snr > 3
|
||||
else "🟠 Fair" if snr > -10 else "🔴 Poor" if snr > -20 else "💀 Very Poor"
|
||||
)
|
||||
)
|
||||
message_parts.append(f"• SNR: {snr}dB {snr_status}")
|
||||
|
||||
if "hopsAway" in node:
|
||||
hops = node["hopsAway"]
|
||||
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else "⏩"
|
||||
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
|
||||
|
||||
if "lastHeard" in node and node["lastHeard"]:
|
||||
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
|
||||
|
||||
if node.get("deviceMetrics"):
|
||||
metrics = node["deviceMetrics"]
|
||||
message_parts.append("\n**📊 Device Metrics:**")
|
||||
|
||||
if "batteryLevel" in metrics:
|
||||
battery = metrics["batteryLevel"]
|
||||
battery_emoji = "🟢" if battery > 50 else "🟡" if battery > 20 else "🔴"
|
||||
voltage_info = f" ({metrics['voltage']}v)" if "voltage" in metrics else ""
|
||||
message_parts.append(f"• Battery: {battery_emoji} {battery}%{voltage_info}")
|
||||
|
||||
if "uptimeSeconds" in metrics:
|
||||
message_parts.append(f"• Uptime: ⏱️ {get_readable_duration(metrics['uptimeSeconds'])}")
|
||||
|
||||
if "channelUtilization" in metrics:
|
||||
util = metrics["channelUtilization"]
|
||||
util_emoji = "🔴" if util > 80 else "🟡" if util > 50 else "🟢"
|
||||
message_parts.append(f"• Channel utilization: {util_emoji} {util:.2f}%")
|
||||
|
||||
if "airUtilTx" in metrics:
|
||||
air_util = metrics["airUtilTx"]
|
||||
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
|
||||
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
|
||||
|
||||
message = "\n".join(message_parts)
|
||||
|
||||
contact.ui.dialog.dialog(f"📡 Node Details: {node.get('user', {}).get('shortName', 'Unknown')}", message)
|
||||
curses.curs_set(1) # Show cursor again
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
message_parts = []
|
||||
|
||||
message_parts.append("**📋 Basic Information:**")
|
||||
message_parts.append(f"• Device: {node.get('user', {}).get('longName', 'Unknown')}")
|
||||
message_parts.append(f"• Short name: {node.get('user', {}).get('shortName', 'Unknown')}")
|
||||
message_parts.append(f"• Hardware: {node.get('user', {}).get('hwModel', 'Unknown')}")
|
||||
|
||||
role = f"{node.get('user', {}).get('role', 'Unknown')}"
|
||||
message_parts.append(f"• Role: {role}")
|
||||
|
||||
pk = f"{node.get('user', {}).get('publicKey')}"
|
||||
message_parts.append(f"Public key: {pk}")
|
||||
|
||||
message_parts.append(f"• Node ID: {node.get('num', 'Unknown')}")
|
||||
if "position" in node:
|
||||
pos = node["position"]
|
||||
if pos.get("latitude") and pos.get("longitude"):
|
||||
message_parts.append(f"• Position: {pos['latitude']:.4f}, {pos['longitude']:.4f}")
|
||||
if pos.get("altitude"):
|
||||
message_parts.append(f"• Altitude: {pos['altitude']}m")
|
||||
message_parts.append(f"https://maps.google.com/?q={pos['latitude']:.4f},{pos['longitude']:.4f}")
|
||||
|
||||
if any(key in node for key in ["snr", "hopsAway", "lastHeard"]):
|
||||
message_parts.append("\n**🌐 Network Metrics:**")
|
||||
|
||||
if "snr" in node:
|
||||
snr = node["snr"]
|
||||
snr_status = (
|
||||
"🟢 Excellent"
|
||||
if snr > 10
|
||||
else "🟡 Good" if snr > 3 else "🟠 Fair" if snr > -10 else "🔴 Poor" if snr > -20 else "💀 Very Poor"
|
||||
)
|
||||
message_parts.append(f"• SNR: {snr}dB {snr_status}")
|
||||
|
||||
if "hopsAway" in node:
|
||||
hops = node["hopsAway"]
|
||||
hop_emoji = "📡" if hops == 0 else "🔄" if hops == 1 else "⏩"
|
||||
message_parts.append(f"• Hops away: {hop_emoji} {hops}")
|
||||
|
||||
if "lastHeard" in node and node["lastHeard"]:
|
||||
message_parts.append(f"• Last heard: 🕐 {get_time_ago(node['lastHeard'])}")
|
||||
|
||||
if node.get("deviceMetrics"):
|
||||
metrics = node["deviceMetrics"]
|
||||
message_parts.append("\n**📊 Device Metrics:**")
|
||||
|
||||
if "batteryLevel" in metrics:
|
||||
battery = metrics["batteryLevel"]
|
||||
battery_emoji = "🟢" if battery > 50 else "🟡" if battery > 20 else "🔴"
|
||||
voltage_info = f" ({metrics['voltage']}v)" if "voltage" in metrics else ""
|
||||
message_parts.append(f"• Battery: {battery_emoji} {battery}%{voltage_info}")
|
||||
|
||||
if "uptimeSeconds" in metrics:
|
||||
message_parts.append(f"• Uptime: ⏱️ {get_readable_duration(metrics['uptimeSeconds'])}")
|
||||
|
||||
if "channelUtilization" in metrics:
|
||||
util = metrics["channelUtilization"]
|
||||
util_emoji = "🔴" if util > 80 else "🟡" if util > 50 else "🟢"
|
||||
message_parts.append(f"• Channel utilization: {util_emoji} {util:.2f}%")
|
||||
|
||||
if "airUtilTx" in metrics:
|
||||
air_util = metrics["airUtilTx"]
|
||||
air_emoji = "🔴" if air_util > 80 else "🟡" if air_util > 50 else "🟢"
|
||||
message_parts.append(f"• Air utilization TX: {air_emoji} {air_util:.2f}%")
|
||||
|
||||
message = "\n".join(message_parts)
|
||||
|
||||
contact.ui.dialog.dialog(f"📡 Node Details: {node.get('user', {}).get('shortName', 'Unknown')}", message)
|
||||
curses.curs_set(1) # Show cursor again
|
||||
handle_resize(stdscr, False)
|
||||
|
||||
|
||||
def handle_ctrl_t(stdscr: curses.window) -> None:
|
||||
"""Handle Ctrl + T key events to send a traceroute."""
|
||||
@@ -1084,6 +1088,11 @@ def search(win: int) -> None:
|
||||
|
||||
|
||||
def refresh_pad(window: int) -> None:
|
||||
|
||||
# If in single-pane mode and this isn't the focused window, skip refreshing its (collapsed) pad
|
||||
if ui_state.single_pane_mode and window != ui_state.current_window:
|
||||
return
|
||||
|
||||
# Derive the target box and pad for the requested window
|
||||
win_height = channel_win.getmaxyx()[0]
|
||||
|
||||
@@ -1113,10 +1122,6 @@ def refresh_pad(window: int) -> None:
|
||||
selected_item = ui_state.selected_channel
|
||||
start_index = max(0, selected_item - (win_height - 3)) # Leave room for borders
|
||||
|
||||
# If in single-pane mode and this isn't the focused window, skip refreshing its (collapsed) pad
|
||||
if ui_state.single_pane_mode and window != ui_state.current_window:
|
||||
return
|
||||
|
||||
# Compute inner drawable area of the box
|
||||
box_y, box_x = box.getbegyx()
|
||||
box_h, box_w = box.getmaxyx()
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import curses
|
||||
from contact.ui.colors import get_color
|
||||
from contact.ui.nav_utils import draw_main_arrows
|
||||
from contact.utilities.singleton import menu_state, ui_state
|
||||
|
||||
|
||||
@@ -13,12 +14,40 @@ def dialog(title: str, message: str) -> None:
|
||||
height, width = curses.LINES, curses.COLS
|
||||
|
||||
# Parse message into lines and calculate dimensions
|
||||
message_lines = message.splitlines()
|
||||
message_lines = message.splitlines() or [""]
|
||||
max_line_length = max(len(l) for l in message_lines)
|
||||
|
||||
# Desired size
|
||||
dialog_width = max(len(title) + 4, max_line_length + 4)
|
||||
dialog_height = len(message_lines) + 4
|
||||
x = (width - dialog_width) // 2
|
||||
y = (height - dialog_height) // 2
|
||||
desired_height = len(message_lines) + 4
|
||||
|
||||
# Clamp dialog size to the screen (leave a 1-cell margin if possible)
|
||||
max_w = max(10, width - 2)
|
||||
max_h = max(6, height - 2)
|
||||
dialog_width = min(dialog_width, max_w)
|
||||
dialog_height = min(desired_height, max_h)
|
||||
|
||||
x = max(0, (width - dialog_width) // 2)
|
||||
y = max(0, (height - dialog_height) // 2)
|
||||
|
||||
# Ensure we have a start index slot for this dialog window id (4)
|
||||
# ui_state.start_index is used by draw_main_arrows()
|
||||
try:
|
||||
while len(ui_state.start_index) <= 4:
|
||||
ui_state.start_index.append(0)
|
||||
except Exception:
|
||||
# If start_index isn't list-like, fall back to an attribute
|
||||
if not hasattr(ui_state, "start_index"):
|
||||
ui_state.start_index = [0, 0, 0, 0, 0]
|
||||
|
||||
def visible_message_rows() -> int:
|
||||
# Rows available for message text inside the border, excluding title row and OK row.
|
||||
# Layout:
|
||||
# row 0: title
|
||||
# rows 1..(dialog_height-3): message viewport (with arrows drawn on a subwindow)
|
||||
# row dialog_height-2: OK button
|
||||
# So message viewport height is dialog_height - 3 - 1 + 1 = dialog_height - 3
|
||||
return max(1, dialog_height - 4)
|
||||
|
||||
def draw_window():
|
||||
win.erase()
|
||||
@@ -26,23 +55,66 @@ def dialog(title: str, message: str) -> None:
|
||||
win.attrset(get_color("window_frame"))
|
||||
win.border(0)
|
||||
|
||||
win.addstr(0, 2, title, get_color("settings_default"))
|
||||
# Title
|
||||
try:
|
||||
win.addstr(0, 2, title[: max(0, dialog_width - 4)], get_color("settings_default"))
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
for i, line in enumerate(message_lines):
|
||||
msg_x = (dialog_width - len(line)) // 2
|
||||
win.addstr(2 + i, msg_x, line, get_color("settings_default"))
|
||||
# Message viewport
|
||||
viewport_h = visible_message_rows()
|
||||
start = ui_state.start_index[4]
|
||||
start = max(0, min(start, max(0, len(message_lines) - viewport_h)))
|
||||
ui_state.start_index[4] = start
|
||||
|
||||
# Create a subwindow covering the message region so draw_main_arrows() doesn't collide with the OK row
|
||||
msg_win = win.derwin(viewport_h + 2, dialog_width - 2, 1, 1)
|
||||
msg_win.erase()
|
||||
|
||||
for i in range(viewport_h):
|
||||
idx = start + i
|
||||
if idx >= len(message_lines):
|
||||
break
|
||||
line = message_lines[idx]
|
||||
# Hard-trim lines that don't fit
|
||||
trimmed = line[: max(0, dialog_width - 6)]
|
||||
msg_x = max(0, ((dialog_width - 2) - len(trimmed)) // 2)
|
||||
try:
|
||||
msg_win.addstr(1 + i, msg_x, trimmed, get_color("settings_default"))
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# Draw arrows only when scrolling is needed
|
||||
if len(message_lines) > viewport_h:
|
||||
draw_main_arrows(msg_win, len(message_lines) - 1, window=4)
|
||||
else:
|
||||
# Clear arrow positions if not needed
|
||||
try:
|
||||
h, w = msg_win.getmaxyx()
|
||||
msg_win.addstr(1, w - 2, " ", get_color("settings_default"))
|
||||
msg_win.addstr(h - 2, w - 2, " ", get_color("settings_default"))
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
msg_win.noutrefresh()
|
||||
|
||||
# OK button
|
||||
ok_text = " Ok "
|
||||
win.addstr(
|
||||
dialog_height - 2,
|
||||
(dialog_width - len(ok_text)) // 2,
|
||||
ok_text,
|
||||
get_color("settings_default", reverse=True),
|
||||
)
|
||||
try:
|
||||
win.addstr(
|
||||
dialog_height - 2,
|
||||
(dialog_width - len(ok_text)) // 2,
|
||||
ok_text,
|
||||
get_color("settings_default", reverse=True),
|
||||
)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
win.refresh()
|
||||
win.noutrefresh()
|
||||
curses.doupdate()
|
||||
|
||||
win = curses.newwin(dialog_height, dialog_width, y, x)
|
||||
win.keypad(True)
|
||||
draw_window()
|
||||
|
||||
while True:
|
||||
@@ -51,9 +123,19 @@ def dialog(title: str, message: str) -> None:
|
||||
|
||||
if menu_state.need_redraw:
|
||||
menu_state.need_redraw = False
|
||||
curses.update_lines_cols()
|
||||
height, width = curses.LINES, curses.COLS
|
||||
draw_window()
|
||||
|
||||
if char in (curses.KEY_ENTER, 10, 13, 32, 27): # Enter, space, Esc
|
||||
# Close dialog
|
||||
ok_selected = True
|
||||
if char in (27, curses.KEY_LEFT): # Esc or Left arrow
|
||||
win.erase()
|
||||
win.refresh()
|
||||
ui_state.current_window = previous_window
|
||||
return
|
||||
|
||||
if ok_selected and char in (curses.KEY_ENTER, 10, 13, 32):
|
||||
win.erase()
|
||||
win.refresh()
|
||||
ui_state.current_window = previous_window
|
||||
@@ -61,3 +143,22 @@ def dialog(title: str, message: str) -> None:
|
||||
|
||||
if char == -1:
|
||||
continue
|
||||
|
||||
# Scroll if the dialog is clipped vertically
|
||||
viewport_h = visible_message_rows()
|
||||
if len(message_lines) > viewport_h:
|
||||
start = ui_state.start_index[4]
|
||||
max_start = max(0, len(message_lines) - viewport_h)
|
||||
|
||||
if char in (curses.KEY_UP, ord("k")):
|
||||
ui_state.start_index[4] = max(0, start - 1)
|
||||
draw_window()
|
||||
elif char in (curses.KEY_DOWN, ord("j")):
|
||||
ui_state.start_index[4] = min(max_start, start + 1)
|
||||
draw_window()
|
||||
elif char == curses.KEY_PPAGE: # Page up
|
||||
ui_state.start_index[4] = max(0, start - viewport_h)
|
||||
draw_window()
|
||||
elif char == curses.KEY_NPAGE: # Page down
|
||||
ui_state.start_index[4] = min(max_start, start + viewport_h)
|
||||
draw_window()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "contact"
|
||||
version = "1.4.4"
|
||||
version = "1.4.7"
|
||||
description = "This Python curses client for Meshtastic is a terminal-based client designed to manage device settings, enable mesh chat communication, and handle configuration backups and restores."
|
||||
authors = [
|
||||
{name = "Ben Lipsey",email = "ben@pdxlocations.com"}
|
||||
|
||||
Reference in New Issue
Block a user