Files
Meshtastic-Client/LinuxMint/meshtastic_client.py
2025-12-12 20:09:28 +01:00

2219 lines
86 KiB
Python

# -*- coding: utf-8 -*-
#!/usr/bin/env python3
"""
Meshtastic Client (Tkinter)
Improvements vs previous version:
- Thread-safe UI updates (no Tk calls from background threads)
- De-duplicated receive handler (only processes each packet once)
- Added "To selected node (DM)" send target option
- Per-node chat now resolves destinationId correctly before sending
- Connection "connected" fallback when pubsub isn't available
- Theme changes apply to open chat windows too
"""
from __future__ import annotations
import datetime
import json
import logging
import os
import pathlib
import subprocess
import threading
import time
import tkinter as tk
import webbrowser
from tkinter import messagebox, simpledialog, ttk
from typing import Any, Dict, Optional, Tuple
# Optional deps -------------------------------------------------------
try:
from pubsub import pub
except Exception:
pub = None
try:
from meshtastic.tcp_interface import TCPInterface
except Exception:
TCPInterface = None
try:
from meshtastic.serial_interface import SerialInterface
except Exception:
SerialInterface = None
try:
from meshtastic.ble_interface import BLEInterface
except Exception:
BLEInterface = None
try:
from meshtastic.protobuf import mesh_pb2, portnums_pb2, telemetry_pb2
import google.protobuf.json_format as _json_format
except Exception:
mesh_pb2 = None
portnums_pb2 = None
telemetry_pb2 = None
_json_format = None
try:
from serial.tools import list_ports
except Exception:
list_ports = None
# Defaults ------------------------------------------------------------
HOST_DEFAULT = "192.168.0.156"
PORT_DEFAULT = 4403
PROJECT_PATH = pathlib.Path(__file__).resolve().parent
ICON_PATH = PROJECT_PATH / "meshtastic.ico"
CONFIG_FILENAME = "meshtastic_client_settings.json"
DEFAULT_GEOMETRY = "1500x820"
DEFAULT_SASH_FRACTION = 0.40
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("meshtastic-client")
def _prefer_chrome(url: str) -> None:
"""Open URL (prefer Chrome on Windows, otherwise default browser)."""
chrome_paths = [
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
]
for p in chrome_paths:
if os.path.exists(p):
subprocess.Popen([p, url])
return
webbrowser.open(url)
def _fmt_ago(epoch_seconds: Optional[float]) -> str:
if not epoch_seconds:
return "N/A"
try:
delta = time.time() - float(epoch_seconds)
except Exception:
return "N/A"
if delta < 0:
delta = 0
mins = int(delta // 60)
hours = int(delta // 3600)
days = int(delta // 86400)
if delta < 60:
return f"{int(delta)}s"
if mins < 60:
return f"{mins}m"
if hours < 24:
return f"{hours}h"
if days < 7:
return f"{days}d"
dt = datetime.datetime.fromtimestamp(epoch_seconds)
return dt.strftime("%Y-%m-%d %H:%M")
class MeshtasticGUI:
def __init__(self, master: Optional[tk.Tk] = None):
self.root = master or tk.Tk()
self.root.title("Meshtastic Client")
try:
if ICON_PATH.exists():
self.root.iconbitmap(default=str(ICON_PATH))
except Exception:
pass
self.current_theme = "dark"
self._pending_channel_choice: Optional[str] = None
self._startup_sash_fraction: float = DEFAULT_SASH_FRACTION
self._startup_geometry_applied: bool = False
self.root.rowconfigure(0, weight=1)
self.root.columnconfigure(0, weight=1)
self.host_var = tk.StringVar(value=HOST_DEFAULT)
self.port_var = tk.IntVar(value=PORT_DEFAULT)
# Connection settings (persisted)
# conn_type: "tcp" | "serial" | "ble"
self.conn_type_var = tk.StringVar(value="tcp")
self.serial_port_var = tk.StringVar(value="") # empty = auto
self.ble_addr_var = tk.StringVar(value="")
# State
self.iface: Optional[object] = None
self.connected_evt = threading.Event()
self._last_seen_overrides: Dict[str, float] = {}
self._last_sort_col: Optional[str] = "since"
self._last_sort_reverse: bool = True
self._telemetry: Dict[str, Dict[str, Any]] = {}
self._per_node_chats: Dict[str, "NodeChatWindow"] = {}
self._ui_thread_id = threading.get_ident()
# Menu --------------------------------------------------------
self.menubar = tk.Menu(self.root)
m_conn = tk.Menu(self.menubar, tearoff=False)
m_conn.add_command(label="Connect (Saved settings)", command=self.connect_saved)
m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp)
m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog)
m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog)
m_conn.add_command(label="Disconnect", command=self.disconnect)
m_conn.add_separator()
m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port)
self.menubar.add_cascade(label="Connection", menu=m_conn)
m_tools = tk.Menu(self.menubar, tearoff=False)
m_tools.add_command(label="Clear messages", command=self.clear_messages)
m_tools.add_separator()
m_tools.add_command(label="Settings...", command=self.open_settings_dialog)
self.menubar.add_cascade(label="Tools", menu=m_tools)
m_view = tk.Menu(self.menubar, tearoff=False)
m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light"))
m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark"))
m_view.add_separator()
m_view.add_command(label="Neighbor table", command=self.show_neighbors_window)
m_view.add_command(label="Radio config (view)", command=self.show_radio_config_window)
m_view.add_command(label="Channel editor", command=self.show_channel_editor_window)
self.menubar.add_cascade(label="View", menu=m_view)
m_links = tk.Menu(self.menubar, tearoff=False)
m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client"))
m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/"))
m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/"))
m_links.add_command(label="Meshtastic Web Client", command=lambda: self._open_browser_url("https://client.meshtastic.org"))
m_links.add_command(label="Meshtastic docker client", command=lambda: self._open_browser_url("https://meshtastic.org/docs/software/linux/usage/#usage-with-docker"))
m_links.add_separator()
m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/"))
m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/"))
self.menubar.add_cascade(label="Links", menu=m_links)
self.root.config(menu=self.menubar)
# Layout ------------------------------------------------------
self.rootframe = ttk.Frame(self.root)
self.rootframe.grid(row=0, column=0, sticky="nsew")
self.rootframe.rowconfigure(0, weight=1)
self.rootframe.rowconfigure(1, weight=0)
self.rootframe.columnconfigure(0, weight=1)
self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal")
self.paned.grid(row=0, column=0, sticky="nsew")
# Status bar (keeps non-chat info out of the Messages area)
self.status_conn_var = tk.StringVar(value="Conn: Disconnected")
self.status_pos_var = tk.StringVar(value="POS: -")
self.status_tel_var = tk.StringVar(value="TEL: -")
self.status_info_var = tk.StringVar(value="")
self.statusbar = ttk.Frame(self.rootframe, padding=(6, 2))
self.statusbar.grid(row=1, column=0, sticky="ew")
self.statusbar.columnconfigure(3, weight=1)
ttk.Label(self.statusbar, textvariable=self.status_conn_var).grid(row=0, column=0, sticky="w", padx=(0, 12))
ttk.Label(self.statusbar, textvariable=self.status_pos_var).grid(row=0, column=1, sticky="w", padx=(0, 12))
ttk.Label(self.statusbar, textvariable=self.status_tel_var).grid(row=0, column=2, sticky="w", padx=(0, 12))
ttk.Label(self.statusbar, textvariable=self.status_info_var).grid(row=0, column=3, sticky="ew")
# Messages pane
self.msg_frame = ttk.Frame(self.paned)
self.msg_frame.rowconfigure(1, weight=1)
self.msg_frame.columnconfigure(0, weight=1)
ttk.Label(self.msg_frame, text="Messages").grid(row=0, column=0, sticky="w", pady=(2, 0))
self.txt_messages = tk.Text(self.msg_frame, wrap="word")
self.txt_messages.grid(row=1, column=0, sticky="nsew", padx=(0, 4), pady=(2, 2))
yscroll_left = ttk.Scrollbar(self.msg_frame, orient="vertical", command=self.txt_messages.yview)
self.txt_messages.configure(yscrollcommand=yscroll_left.set)
yscroll_left.grid(row=1, column=1, sticky="ns")
self.send_frame = ttk.Frame(self.msg_frame)
self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4))
self.send_frame.columnconfigure(0, weight=1)
self.ent_message = ttk.Entry(self.send_frame)
self.ent_message.grid(row=0, column=0, sticky="nsew")
self.ent_message.bind("<Return>", lambda e: self.send_message())
self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message)
self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew")
# Channel selector
self.channel_var = tk.StringVar()
self._channel_map: Dict[str, Dict[str, Any]] = {}
self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=26)
self._reset_channel_choices()
self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w")
# Nodes pane
self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)")
self.nodes_frame.rowconfigure(1, weight=1)
self.nodes_frame.columnconfigure(0, weight=1)
self.ent_search = ttk.Entry(self.nodes_frame)
self.ent_search.grid(row=0, column=0, sticky="nsew", padx=2, pady=2)
self.ent_search.bind("<KeyRelease>", lambda e: self.refresh_nodes())
self.cols_all = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt", "battery", "voltage",
"lastheard", "hwmodel", "role",
"macaddr", "publickey", "isunmessagable", "id"
)
self.cols_visible = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt", "battery", "voltage", "hwmodel", "role"
)
self.tv_nodes = ttk.Treeview(
self.nodes_frame,
columns=self.cols_all,
show="headings",
displaycolumns=self.cols_visible
)
self.tv_nodes.grid(row=1, column=0, sticky="nsew", padx=(2, 0), pady=(0, 2))
headings = {
"shortname": "Short",
"longname": "Long",
"since": "Since",
"hops": "Hops",
"distkm": "Dist (km)",
"speed": "Speed",
"alt": "Alt (m)",
"battery": "Batt %",
"voltage": "Volt",
"hwmodel": "HW",
"role": "Role",
"lastheard": "",
"macaddr": "MAC",
"publickey": "Public key",
"isunmessagable": "Unmsg?",
"id": "ID",
}
for key, text in headings.items():
self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False))
widths = {
"shortname": 90,
"longname": 200,
"since": 90,
"hops": 50,
"distkm": 70,
"speed": 70,
"alt": 70,
"battery": 70,
"voltage": 80,
"hwmodel": 90,
"role": 110,
}
for key, w in widths.items():
try:
self.tv_nodes.column(
key,
width=w,
anchor="w",
stretch=(key not in ("since", "hops", "distkm", "speed", "alt", "battery", "voltage")),
)
except Exception:
pass
# Hide technical columns
for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"):
try:
self.tv_nodes.column(col, width=0, minwidth=0, stretch=False)
except Exception:
pass
yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview)
self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set)
yscroll_nodes.grid(row=1, column=1, sticky="ns")
# Node context menu
self.node_menu = tk.Menu(self.nodes_frame, tearoff=False)
self.node_menu.add_command(label="Node info", command=self._cm_show_node_details)
self.node_menu.add_command(label="Map", command=self._cm_open_map)
self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute)
self.node_menu.add_command(label="Chat with node", command=self._cm_open_chat)
self.node_menu.add_separator()
self.node_menu.add_command(label="Delete node", command=self._cm_delete_node)
self.tv_nodes.bind("<Button-3>", self._popup_node_menu)
self.tv_nodes.bind("<Double-1>", lambda e: self._cm_open_chat())
self.paned.add(self.msg_frame, weight=3)
self.paned.add(self.nodes_frame, weight=4)
self.root.after(100, lambda: self._safe_set_sash(self._startup_sash_fraction))
# PubSub subscriptions ---------------------------------------
if pub is not None:
try:
pub.subscribe(self.on_connection_established, "meshtastic.connection.established")
pub.subscribe(self.on_connection_lost, "meshtastic.connection.lost")
pub.subscribe(self.on_receive, "meshtastic.receive")
pub.subscribe(self.on_node_updated, "meshtastic.node.updated")
except Exception as e:
log.warning("pubsub subscribe failed: %s", e)
# Load persisted settings (from ./meshtastic_client_settings.json) if present
self.load_settings(silent=True)
self.apply_theme(self.current_theme)
self._status_update(info="Ready. Connection -> Connect (TCP/USB/BLE)")
self._update_title_with_host()
# UI helpers ------------------------------------------------------
def _ui(self, fn, *args, **kwargs):
"""Run fn in Tk main thread."""
if threading.get_ident() == self._ui_thread_id:
fn(*args, **kwargs)
else:
self.root.after(0, lambda: fn(*args, **kwargs))
def _status_update(self, *, conn: Optional[str] = None, pos: Optional[str] = None, tel: Optional[str] = None, info: Optional[str] = None):
"""Update the bottom status bar (UI thread safe)."""
def _do():
ts = time.strftime("%H:%M:%S")
if conn is not None:
self.status_conn_var.set(f"Conn: {conn}")
if pos is not None:
self.status_pos_var.set(f"POS: {pos} @{ts}" if pos != "-" else "POS: -")
if tel is not None:
self.status_tel_var.set(f"TEL: {tel} @{ts}" if tel != "-" else "TEL: -")
if info is not None:
self.status_info_var.set(f"{info} @{ts}" if info else "")
self._ui(_do)
def _status_event(self, kind: str, label: str):
"""Convenience for updating POS/TEL and other packet status."""
kind = (kind or "").upper().strip()
if kind == "POS":
self._status_update(pos=label)
elif kind == "TEL":
self._status_update(tel=label)
else:
self._status_update(info=f"{kind}: {label}" if kind else label)
def clear_messages(self):
self._ui(lambda: self.txt_messages.delete("1.0", "end"))
# helpers ---------------------------------------------------------
def _open_browser_url(self, url: str):
_prefer_chrome(url)
def _update_title_with_host(self):
self.root.title(f"Meshtastic Client - {self.host_var.get()}:{self.port_var.get()}")
def _safe_set_sash(self, fraction: float = 0.5):
try:
w = self.paned.winfo_width() or self.paned.winfo_reqwidth()
self.paned.sashpos(0, int(w * fraction))
except Exception:
pass
def _node_label(self, node_id: str) -> str:
if not self.iface or not getattr(self.iface, "nodes", None):
return node_id
node = self.iface.nodes.get(node_id, {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
label = (f"{shortname} {longname}").strip()
return label or node_id
# pubsub callbacks ------------------------------------------------
def _mark_connected(self):
if self.connected_evt.is_set():
return
self.connected_evt.set()
self._status_update(conn="Connected", info="Connected")
self.refresh_nodes()
try:
self._update_channels_from_iface()
except Exception:
pass
def on_connection_established(self, interface=None, **kwargs):
self._ui(self._mark_connected)
def on_connection_lost(self, interface=None, **kwargs):
def _lost():
self.connected_evt.clear()
self._status_update(conn="Disconnected", info="Connection lost")
self._ui(_lost)
def on_node_updated(self, node=None, interface=None, **kwargs):
self._ui(self.refresh_nodes)
def on_receive(self, packet=None, interface=None, **kwargs):
self._ui(lambda: self._handle_receive(packet or {}))
# receive ---------------------------------------------------------
def _handle_receive(self, packet: dict):
if not isinstance(packet, dict):
return
decoded = packet.get("decoded") or {}
if not isinstance(decoded, dict):
decoded = {}
app_name = decoded.get("app") or ""
portnum = decoded.get("portnum") or app_name or ""
sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort") or "UNKNOWN"
sender = str(sender)
# Mark last seen (for more accurate "Since")
self._last_seen_overrides[sender] = time.time()
label = self._node_label(sender)
# Telemetry parsing (device metrics)
if portnum == "TELEMETRY_APP" and telemetry_pb2 is not None:
try:
payload = decoded.get("payload")
if isinstance(payload, (bytes, bytearray)):
tmsg = telemetry_pb2.Telemetry()
tmsg.ParseFromString(payload)
if tmsg.HasField("device_metrics"):
dm = tmsg.device_metrics
entry = self._telemetry.get(sender, {})
if getattr(dm, "battery_level", None) is not None:
entry["battery"] = dm.battery_level
if getattr(dm, "voltage", None) is not None:
entry["voltage"] = dm.voltage
self._telemetry[sender] = entry
except Exception:
pass # telemetry failures are non-fatal
# Text message
if portnum in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"):
text = decoded.get("text")
if isinstance(text, bytes):
text = text.decode("utf-8", errors="ignore")
if not isinstance(text, str):
# Some packets provide bytes in decoded.payload
p = decoded.get("payload")
if isinstance(p, (bytes, bytearray)):
text = p.decode("utf-8", errors="ignore")
elif isinstance(p, dict):
t2 = p.get("text")
if isinstance(t2, str):
text = t2
elif isinstance(t2, (bytes, bytearray)):
text = t2.decode("utf-8", errors="ignore")
else:
text = ""
rssi = packet.get("rxRssi")
if text:
self._append(f"[MSG] {label}: {text} (RSSI={rssi})")
self._append_to_node_chat(sender, text)
else:
self._append(f"[MSG] {label} (RSSI={rssi})")
if text.strip().lower() == "ping":
self._send_pong(sender, label)
else:
# Non-text: keep lightweight log
tag_map = {
"POSITION_APP": "POS",
"TELEMETRY_APP": "TEL",
"NODEINFO_APP": "INFO",
"ROUTING_APP": "ROUT",
"MAP_REPORT_APP": "MAP",
"ADMIN_APP": "ADM",
"NEIGHBORINFO_APP": "NEI",
"STORE_FORWARD_APP": "SFWD",
"REMOTE_HARDWARE_APP": "RHW",
"PRIVATE_APP": "PRIV",
}
tag = tag_map.get(str(app_name or portnum), "INFO")
# Avoid spamming for every non-text packet; show only brief line
self._status_event(tag, label)
self.refresh_nodes()
def _send_pong(self, dest_id: Optional[str], label: str):
if not self.iface or not dest_id:
return
try:
self.iface.sendText("pong", destinationId=dest_id, wantAck=False)
self._status_update(info=f"auto pong -> {label}")
self._append_to_node_chat(str(dest_id), "[auto] pong")
except Exception as e:
self._status_update(info=f"auto pong failed: {e}")
# connection actions ----------------------------------------------
def set_ip_port(self):
win = tk.Toplevel(self.root)
win.title("Set IP/Port")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.grid(row=0, column=0, sticky="nsew")
win.columnconfigure(0, weight=1)
win.rowconfigure(0, weight=1)
ttk.Label(frm, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4)
ent_host = ttk.Entry(frm, textvariable=self.host_var, width=28)
ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=4)
ttk.Label(frm, text="Port:").grid(row=1, column=0, sticky="w", pady=4)
ent_port = ttk.Entry(frm, textvariable=self.port_var, width=10)
ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=4)
frm.columnconfigure(1, weight=1)
def save():
h = self.host_var.get().strip()
try:
p = int(self.port_var.get())
except Exception:
messagebox.showerror("Port", "Port must be 1-65535")
return
if not h:
messagebox.showerror("Host", "Host cannot be empty")
return
if not (1 <= p <= 65535):
messagebox.showerror("Port", "Port must be 1-65535")
return
self.host_var.set(h)
self.port_var.set(p)
self._update_title_with_host()
win.destroy()
btnbar = ttk.Frame(frm)
btnbar.grid(row=2, column=0, columnspan=2, sticky="e")
ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4)
ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4)
def connect_saved(self):
"""Connect using the method selected in Settings."""
if self.iface:
return
method = (self.conn_type_var.get() or "tcp").strip().lower()
if method == "serial":
port = (self.serial_port_var.get() or "").strip()
self.connect_serial(port if port else None)
elif method in ("ble", "bluetooth"):
addr = (self.ble_addr_var.get() or "").strip()
if not addr:
# If no saved address, fall back to interactive scan
self.connect_ble_dialog()
else:
self.connect_ble(addr)
else:
self.connect_tcp()
def connect_serial(self, port: Optional[str] = None):
"""Connect via USB/Serial. If port is None, auto-detect."""
if self.iface:
return
if SerialInterface is None:
self._ui(lambda: messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed."))
return
self._status_update(conn="Connecting (Serial)", info=f"Connecting Serial {port or '(auto)'}...")
def run():
try:
self.iface = SerialInterface(devPath=port) if port else SerialInterface()
if pub is None:
self._ui(self._mark_connected)
else:
self.connected_evt.wait(timeout=6)
except Exception as e:
self._ui(lambda: messagebox.showerror("Serial connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_ble(self, address: str):
"""Connect via Bluetooth (BLE) using a saved address or name filter."""
if self.iface:
return
if BLEInterface is None:
self._ui(lambda: messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak)."))
return
self._status_update(conn="Connecting (BLE)", info=f"Connecting BLE {address}...")
def run():
try:
# BLEInterface accepts an address or name filter and will scan for matches.
self.iface = BLEInterface(address=address)
if pub is None:
self._ui(self._mark_connected)
else:
self.connected_evt.wait(timeout=10)
except Exception as e:
self._ui(lambda: messagebox.showerror("BLE connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_tcp(self):
if self.iface:
return
host = self.host_var.get().strip()
try:
port = int(self.port_var.get())
except Exception:
messagebox.showerror("Port", "Invalid port")
return
self._status_update(conn="Connecting (TCP)", info=f"Connecting TCP {host}:{port}...")
def run():
try:
if TCPInterface is None:
raise RuntimeError("meshtastic.tcp_interface not installed")
self.iface = TCPInterface(hostname=host, portNumber=port)
# If pubsub isn't available, we might not get the established event.
if pub is None:
self._ui(self._mark_connected)
else:
self.connected_evt.wait(timeout=6)
except Exception as e:
self._ui(lambda: messagebox.showerror("TCP connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_serial_dialog(self):
"""Interactive serial connect (also stores chosen port in Settings)."""
if SerialInterface is None:
messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.")
return
ports = []
if list_ports:
try:
ports = [p.device for p in list_ports.comports()]
except Exception:
ports = []
presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"]
initial = "(auto)"
try:
saved = (self.serial_port_var.get() or "").strip()
if saved:
initial = saved
except Exception:
pass
port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=initial)
if port is None:
return
port = port.strip()
if port.lower() == "(auto)" or port == "":
port = None
# Persist choice
self.serial_port_var.set(port or "")
self.conn_type_var.set("serial")
self.connect_serial(port)
def connect_ble_dialog(self):
"""Interactive BLE connect (also stores chosen device address in Settings)."""
if BLEInterface is None:
messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).")
return
self._status_update(conn="Scanning BLE", info="Scanning BLE for Meshtastic devices...")
try:
devices = BLEInterface.scan()
except Exception as e:
messagebox.showerror("BLE scan failed", str(e))
return
if not devices:
messagebox.showinfo("BLE", "No devices found.")
return
options = [
f"{i + 1}. {getattr(d, 'name', '') or '(unnamed)'} [{getattr(d, 'address', '?')}]"
for i, d in enumerate(devices)
]
choice = simpledialog.askinteger(
"Select BLE device",
"Enter number:\n" + "\n".join(options),
minvalue=1,
maxvalue=len(devices),
)
if not choice:
return
addr = getattr(devices[choice - 1], "address", None)
if not addr:
messagebox.showerror("BLE", "Selected device has no address.")
return
# Persist choice
self.ble_addr_var.set(str(addr))
self.conn_type_var.set("ble")
self.connect_ble(str(addr))
def disconnect(self):
try:
if self.iface:
self.iface.close()
except Exception:
pass
self.iface = None
self.connected_evt.clear()
self._status_update(conn="Disconnected", info="Disconnected")
# send ------------------------------------------------------------
def _reset_channel_choices(self):
"""Initialize channel selector with Public + To selected (DM)."""
self._channel_map = {}
options = []
label_dm = "To selected node (DM)"
self._channel_map[label_dm] = {"mode": "selected", "channelIndex": 0}
options.append(label_dm)
label_pub = "Public (broadcast)"
self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0}
options.append(label_pub)
self.cbo_channel["values"] = options
# Apply pending default channel choice if it exists now
if self._pending_channel_choice:
if self._pending_channel_choice in options:
self.channel_var.set(self._pending_channel_choice)
self._pending_channel_choice = None
self.channel_var.set(label_pub)
def _update_channels_from_iface(self):
"""Populate channel selector with channels from the connected device (broadcast channels)."""
iface = getattr(self, "iface", None)
if not iface:
return
local_node = getattr(iface, "localNode", None)
if not local_node:
return
chans = getattr(local_node, "channels", None)
try:
if (not chans) and hasattr(local_node, "requestChannels"):
local_node.requestChannels()
time.sleep(1.2)
chans = getattr(local_node, "channels", None)
except Exception:
chans = getattr(local_node, "channels", None)
try:
options = list(self.cbo_channel["values"])
except Exception:
return
# Keep DM option first, keep Public second, then channels 1..N
# Update "Public" label with ch0 name if available.
try:
if chans and len(chans) > 0:
ch0 = chans[0]
try:
ch0_name = (getattr(ch0, "settings", None).name or "").strip()
except Exception:
try:
ch0_name = (ch0.settings.name or "").strip()
except Exception:
ch0_name = ""
if ch0_name:
old_label = None
for lbl, meta in list(self._channel_map.items()):
if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0:
old_label = lbl
break
if old_label:
new_label = f"Public (ch0: {ch0_name})"
if new_label != old_label:
self._channel_map[new_label] = self._channel_map.pop(old_label)
options = [new_label if v == old_label else v for v in options]
if self.channel_var.get() == old_label:
self.channel_var.set(new_label)
except Exception:
pass
for idx, ch in enumerate(chans or []):
if idx == 0:
continue
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
label = f"Ch {idx}: {name}" if name else f"Ch {idx}"
if label in self._channel_map:
continue
self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx}
options.append(label)
self.cbo_channel["values"] = options
def send_message(self):
msg = self.ent_message.get().strip()
if not msg:
return
if not self.iface:
messagebox.showwarning("Not connected", "Connect first.")
return
choice = self.channel_var.get() or ""
info = self._channel_map.get(choice) or {"mode": "broadcast", "channelIndex": 0}
mode = info.get("mode", "broadcast")
ch_index = int(info.get("channelIndex", 0) or 0)
try:
if mode == "selected":
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("No selection", "Select a node first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Send failed", "Cannot resolve destination for selected node.")
return
self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index)
self._append(f"[ME -> {self._node_label(nid)}] {msg}")
self._append_to_node_chat(nid, "[ME] " + msg)
else:
self.iface.sendText(msg, wantAck=False, channelIndex=ch_index)
if mode == "broadcast_channel" and ch_index:
self._append(f"[ME ch{ch_index}] {msg}")
else:
self._append(f"[ME] {msg}")
self.ent_message.delete(0, "end")
except Exception as e:
messagebox.showerror("Send failed", str(e))
# nodes -----------------------------------------------------------
def _get_lastheard_epoch(self, node_id: str, node: Dict[str, Any]) -> Optional[float]:
raw = (node or {}).get("lastHeard")
ts_iface = None
if raw is not None:
try:
val = float(raw)
ts_iface = val / 1000.0 if val > 10000000000 else val
except Exception:
ts_iface = None
ts_local = self._last_seen_overrides.get(str(node_id))
if ts_iface and ts_local:
return max(ts_iface, ts_local)
return ts_iface or ts_local
def _extract_latlon(self, node: dict) -> Tuple[Optional[float], Optional[float]]:
pos = (node or {}).get("position") or {}
lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i")
lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i")
try:
if lat is not None:
lat = float(lat) * (1e-7 if abs(float(lat)) > 90 else 1.0)
if lon is not None:
lon = float(lon) * (1e-7 if abs(float(lon)) > 180 else 1.0)
except Exception:
lat = lon = None
return lat, lon
def _extract_speed_alt(self, node: dict) -> Tuple[Optional[float], Optional[float]]:
pos = (node or {}).get("position") or {}
speed = (
pos.get("groundSpeedKmh")
or pos.get("groundSpeedKmhI")
or pos.get("groundSpeed")
or pos.get("ground_speed")
)
alt = (
pos.get("altitude")
or pos.get("altitudeM")
or pos.get("altitude_i")
or pos.get("altitudeI")
)
try:
speed = float(speed) if speed is not None else None
alt = float(alt) if alt is not None else None
except Exception:
speed, alt = None, None
return speed, alt
def _get_local_latlon(self) -> Tuple[Optional[float], Optional[float]]:
if not self.iface:
return (None, None)
try:
mi = getattr(self.iface, "myInfo", None)
nbn = getattr(self.iface, "nodesByNum", None)
if mi is not None and hasattr(mi, "my_node_num") and nbn:
n = nbn.get(mi.my_node_num) or {}
lat, lon = self._extract_latlon(n)
if lat is not None and lon is not None:
return lat, lon
except Exception:
pass
return (None, None)
def _haversine_km(self, lat1, lon1, lat2, lon2) -> float:
import math
R = 6371.0088
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlmb = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlmb / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def refresh_nodes(self):
if not self.iface or not getattr(self.iface, "nodes", None):
return
q = self.ent_search.get().strip().lower()
for iid in self.tv_nodes.get_children(""):
self.tv_nodes.delete(iid)
try:
nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined]
except Exception:
nodes_snapshot = {}
base_lat, base_lon = self._get_local_latlon()
for node_id, node in nodes_snapshot.items():
user = (node or {}).get("user") or {}
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
hwmodel = user.get("hwModel") or ""
role_raw = user.get("role") or ""
role = role_raw if str(role_raw).strip() else "CLIENT"
macaddr = user.get("macaddr") or ""
publickey = user.get("publicKey") or ""
unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False
lastheard_epoch = self._get_lastheard_epoch(str(node_id), node)
since_str = _fmt_ago(lastheard_epoch)
hops = node.get("hopsAway")
lat, lon = self._extract_latlon(node)
dist = None
if base_lat is not None and base_lon is not None and lat is not None and lon is not None:
try:
dist = self._haversine_km(base_lat, base_lon, lat, lon)
except Exception:
dist = None
dist_str = f"{dist:.1f}" if isinstance(dist, (int, float)) else "-"
speed, alt = self._extract_speed_alt(node)
speed_str = f"{speed:.1f}" if isinstance(speed, (int, float)) else "-"
alt_str = f"{alt:.0f}" if isinstance(alt, (int, float)) else "-"
telem = self._telemetry.get(str(node_id), {})
bat = telem.get("battery")
volt = telem.get("voltage")
bat_str = f"{bat:.0f}" if isinstance(bat, (int, float)) else "-"
volt_str = f"{volt:.2f}" if isinstance(volt, (int, float)) else "-"
values = (
shortname,
longname,
since_str,
str(hops) if hops is not None else "-",
dist_str,
speed_str,
alt_str,
bat_str,
volt_str,
f"{(lastheard_epoch or 0):.0f}",
hwmodel,
role,
macaddr,
publickey,
str(bool(unmsg)),
str(node_id),
)
if not q or any(q in str(v).lower() for v in values):
try:
self.tv_nodes.insert("", "end", iid=str(node_id), values=values)
except Exception:
self.tv_nodes.insert("", "end", values=values)
if self._last_sort_col:
self.sort_by_column(self._last_sort_col, self._last_sort_reverse)
self.nodes_frame.config(text=f"Nodes ({len(self.tv_nodes.get_children())})")
def sort_by_column(self, col: str, reverse: bool = False):
self._last_sort_col = col
self._last_sort_reverse = reverse
col_to_sort = "lastheard" if col == "since" else col
numeric = {"lastheard", "distkm", "hops", "speed", "alt", "battery", "voltage"}
rows = []
for iid in self.tv_nodes.get_children(""):
val = self.tv_nodes.set(iid, col_to_sort)
if col_to_sort in numeric:
try:
val = float(val if val != "-" else 0.0)
except Exception:
val = 0.0
else:
val = str(val).casefold()
rows.append((val, iid))
rows.sort(key=lambda t: t[0], reverse=reverse)
for index, (_, iid) in enumerate(rows):
self.tv_nodes.move(iid, "", index)
self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse))
# THEME -----------------------------------------------------------
def apply_theme(self, mode: str = "light"):
self.current_theme = mode
is_dark = mode == "dark"
bg = "#1e1e1e" if is_dark else "#f5f5f5"
fg = "#ffffff" if is_dark else "#000000"
acc = "#2d2d2d" if is_dark else "#ffffff"
sel = "#555555" if is_dark else "#cce0ff"
try:
self.root.configure(bg=bg)
except Exception:
pass
style = ttk.Style(self.root)
try:
style.theme_use("clam")
except Exception:
pass
style.configure("TFrame", background=bg)
style.configure("TLabelframe", background=bg, foreground=fg)
style.configure("TLabelframe.Label", background=bg, foreground=fg)
style.configure("TLabel", background=bg, foreground=fg)
style.configure("TButton", background=acc, foreground=fg)
style.configure("TEntry", fieldbackground=acc, foreground=fg)
style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg)
style.map(
"TCombobox",
fieldbackground=[("readonly", acc)],
foreground=[("readonly", fg)],
background=[("readonly", acc)],
)
style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0)
style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)])
try:
if self.menubar is not None:
self.menubar.configure(
background=bg,
foreground=fg,
activebackground=sel,
activeforeground=fg,
borderwidth=0,
relief="flat",
)
except Exception:
pass
try:
self.txt_messages.configure(
bg=acc,
fg=fg,
insertbackground=fg,
selectbackground=sel,
selectforeground=fg,
)
except Exception:
pass
try:
self.root.option_add("*Menu*background", bg)
self.root.option_add("*Menu*foreground", fg)
self.root.option_add("*Menu*activeBackground", sel)
self.root.option_add("*Menu*activeForeground", fg)
self.root.option_add("*TCombobox*Listbox*background", acc)
self.root.option_add("*TCombobox*Listbox*foreground", fg)
except Exception:
pass
# Apply to open chat windows
for win in list(self._per_node_chats.values()):
try:
win.apply_theme()
except Exception:
pass
def _style_toplevel(self, win: tk.Toplevel):
is_dark = self.current_theme == "dark"
bg = "#1e1e1e" if is_dark else "#f5f5f5"
try:
win.configure(bg=bg)
except Exception:
pass
# UTILS / CONTEXT ------------------------------------------------
def _append(self, text: str):
def _do():
self.txt_messages.insert("end", text + "\n")
self.txt_messages.see("end")
self._ui(_do)
def _get_selected_node_id(self) -> Optional[str]:
sel = self.tv_nodes.selection()
return sel[0] if sel else None
def _popup_node_menu(self, event):
iid = self.tv_nodes.identify_row(event.y)
if iid:
self.tv_nodes.selection_set(iid)
self.node_menu.tk_popup(event.x_root, event.y_root)
self.node_menu.grab_release()
def _cm_show_node_details(self):
self.show_raw_node(friendly=True)
def _cm_open_map(self):
nid = self._get_selected_node_id()
if not nid or not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Map", "No node selected.")
return
node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined]
lat, lon = self._extract_latlon(node)
if lat is None or lon is None:
messagebox.showinfo("Map", "Selected node has no GPS position.")
return
url = f"https://www.google.com/maps/search/?api=1&query={lat},{lon}"
self._open_browser_url(url)
def show_raw_node(self, friendly: bool = False):
nid = self._get_selected_node_id()
if not nid or not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Node", "No node selected.")
return
node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined]
win = tk.Toplevel(self.root)
win.title(f"Node: {self._node_label(nid)}")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
if not friendly:
txt.insert("1.0", json.dumps(node, indent=2, default=str))
txt.configure(state="disabled")
return
def fmt_val(v, indent=0):
pad = " " * indent
if isinstance(v, dict):
lines = []
for k, vv in v.items():
if isinstance(vv, (dict, list)):
lines.append(f"{pad}{k}:")
lines.append(fmt_val(vv, indent + 1))
else:
lines.append(f"{pad}{k}: {vv}")
return "\n".join(lines)
if isinstance(v, list):
lines = []
for i, item in enumerate(v):
if isinstance(item, (dict, list)):
lines.append(f"{pad}- [{i}]")
lines.append(fmt_val(item, indent + 1))
else:
lines.append(f"{pad}- {item}")
return "\n".join(lines)
return f"{pad}{v}"
user = (node or {}).get("user") or {}
pos = (node or {}).get("position") or {}
caps = (node or {}).get("capabilities") or {}
config = (node or {}).get("config") or {}
node_id = user.get("id") or node.get("id") or nid
macaddr = user.get("macaddr") or node.get("macaddr") or ""
publickey = user.get("publicKey") or node.get("publicKey") or ""
hw = user.get("hwModel", "")
lines = [
f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(),
f"ID: {node_id}",
f"MAC: {macaddr}",
f"HW: {hw}",
f"Public key: {publickey}",
"",
f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}",
"",
"Position:",
fmt_val(pos, 1),
]
if caps:
lines.append("Capabilities:")
lines.append(fmt_val(caps, 1))
if config:
lines.append("Config:")
lines.append(fmt_val(config, 1))
lines.append("RAW fields:")
skip = {"user", "position", "capabilities", "config"}
other = {k: v for k, v in (node or {}).items() if k not in skip}
lines.append(fmt_val(other, 1))
txt.insert("1.0", "\n".join(lines))
txt.configure(state="disabled")
def show_neighbors_window(self):
if not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Neighbors", "No interface or nodes available.")
return
win = tk.Toplevel(self.root)
win.title("Neighbor table")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
tree = ttk.Treeview(frm, columns=("from", "to", "snr", "lastheard"), show="headings")
for col, txt in (("from", "From node"), ("to", "To node"), ("snr", "SNR"), ("lastheard", "Last heard")):
tree.heading(col, text=txt)
tree.column(col, width=160 if col in ("from", "to") else 90, anchor="w")
tree.pack(side="left", expand=True, fill="both")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=yscroll.set)
yscroll.pack(side="right", fill="y")
try:
nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined]
except Exception:
nodes_snapshot = {}
rows = 0
for node_id, node in nodes_snapshot.items():
from_label = self._node_label(str(node_id))
neighbors = (node or {}).get("neighbors") or []
if isinstance(neighbors, dict):
neighbors = neighbors.values()
for n in neighbors:
try:
to_id = (n or {}).get("nodeId") or (n or {}).get("id") or ""
to_label = self._node_label(str(to_id)) if to_id else str(to_id)
snr = (n or {}).get("snr") or (n or {}).get("snrDb")
last = (n or {}).get("lastHeard")
try:
last_epoch = float(last) if last is not None else None
except Exception:
last_epoch = None
last_str = _fmt_ago(last_epoch)
except Exception:
continue
tree.insert("", "end", values=(from_label, to_label, snr if snr is not None else "-", last_str))
rows += 1
if rows == 0:
messagebox.showinfo("Neighbors", "No neighbor information found on nodes.")
# admin / traceroute ---------------------------------------------
def _resolve_node_dest_id(self, nid: str) -> Optional[str]:
nid = str(nid or "").strip()
if not nid:
return None
if nid.startswith("!") or nid.isdigit():
return nid
try:
if self.iface and getattr(self.iface, "nodes", None):
node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
node_id = user.get("id") or ""
if node_id:
return str(node_id)
except Exception:
pass
return "!" + nid if not nid.startswith("!") else nid
def _cm_traceroute(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Traceroute", "Select a node first.")
return
if not self.iface:
messagebox.showwarning("Traceroute", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.")
return
self._status_update(info=f"Traceroute -> {self._node_label(nid)} ({dest})")
threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start()
def _cm_delete_node(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Delete node", "Select a node first.")
return
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showwarning("Delete node", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Delete node", "Cannot determine node ID.")
return
label = self._node_label(nid)
if not messagebox.askyesno(
"Delete node",
f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\nThe device might reboot after this."
):
return
try:
self.iface.localNode.removeNode(dest) # type: ignore[attr-defined]
self._status_update(info=f"Admin: delete requested for {label} ({dest})")
except Exception as e:
messagebox.showerror("Delete node", f"Failed to delete node: {e}")
return
try:
self.tv_nodes.delete(nid)
self.nodes_frame.config(text=f"Nodes ({len(self.tv_nodes.get_children())})")
except Exception:
pass
def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0):
if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"):
self._do_traceroute_via_interface(dest, hop_limit, channel_index)
else:
self._do_traceroute_via_cli(dest)
def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int):
evt = threading.Event()
result: Dict[str, Any] = {}
def _num_to_label(num: int) -> str:
try:
nbn = getattr(self.iface, "nodesByNum", None)
if nbn and num in nbn:
n = nbn[num]
user = (n or {}).get("user") or {}
sid = user.get("id") or f"!{num:08x}"
sn = user.get("shortName") or ""
ln = user.get("longName") or ""
label = (sn or ln or sid).strip()
return f"{label} ({sid})" if sid else label
except Exception:
pass
return f"!{int(num):08x}"
def _on_response(p: dict):
try:
rd = mesh_pb2.RouteDiscovery()
rd.ParseFromString(p["decoded"]["payload"])
result["packet"] = p
result["data"] = _json_format.MessageToDict(rd)
except Exception as e:
result["error"] = str(e)
finally:
evt.set()
r = mesh_pb2.RouteDiscovery()
try:
# Some meshtastic-python versions don't expose hopLimit as a sendData kwarg.
self.iface.sendData(
r,
destinationId=dest,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=_on_response,
channelIndex=channel_index,
hopLimit=hop_limit,
)
except TypeError:
self.iface.sendData(
r,
destinationId=dest,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=_on_response,
channelIndex=channel_index,
)
except Exception as e:
self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}"))
return
if not evt.wait(30.0):
self._ui(lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported)."))
return
if "error" in result:
self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}"))
return
p = result.get("packet") or {}
data = result.get("data") or {}
UNK = -128
try:
origin_num = int(p.get("to"))
dest_num = int(p.get("from"))
except Exception:
origin_num = None
dest_num = None
def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]:
route_nums = []
for v in data.get(route_key, []):
try:
route_nums.append(int(v))
except Exception:
pass
snrs = []
for v in data.get(snr_key, []):
try:
snrs.append(int(v))
except Exception:
pass
if start_num is None or end_num is None:
return None
nodes = [start_num] + route_nums + [end_num]
if len(nodes) <= 1:
return None
parts = []
for idx, num in enumerate(nodes):
label = _num_to_label(num)
if idx == 0:
parts.append(label)
else:
snr_txt = "? dB"
if (idx - 1) < len(snrs):
v = snrs[idx - 1]
if v != UNK:
snr_txt = f"{v / 4.0:.2f} dB"
parts.append(f"{label} ({snr_txt})")
return title + "\n" + " -> ".join(parts)
lines = []
fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num)
if fwd:
lines.append(fwd)
back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num)
if back:
lines.append(back)
if not lines:
self._ui(lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available."))
return
text = "\n\n".join(lines)
self._ui(lambda: self._show_traceroute_window(text))
def _do_traceroute_via_cli(self, dest: str):
host = (self.host_var.get() or "").strip() or HOST_DEFAULT
cmd = ["meshtastic", "--host", host, "--traceroute", dest]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40)
except Exception as e:
self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}"))
return
out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "")
if not out.strip():
self._ui(lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute."))
return
self._ui(lambda: self._show_traceroute_window(out))
def _show_traceroute_window(self, text: str):
win = tk.Toplevel(self.root)
win.title("Traceroute")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
txt.insert("1.0", text.strip() or "No traceroute data.")
txt.configure(state="disabled")
# chat windows ----------------------------------------------------
def _cm_open_chat(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Chat", "Select a node first.")
return
self._open_node_chat(nid)
def _open_node_chat(self, nid: str):
key = str(nid)
if key in self._per_node_chats:
win = self._per_node_chats[key]
try:
win.top.deiconify()
win.top.lift()
except Exception:
pass
return
label = self._node_label(nid)
chat = NodeChatWindow(self, key, label)
self._per_node_chats[key] = chat
def _append_to_node_chat(self, node_id: str, line: str):
key = str(node_id)
win = self._per_node_chats.get(key)
if not win:
return
win.append(line)
def _send_text_to_node(self, node_key: str, msg: str) -> bool:
if not self.iface:
messagebox.showwarning("Send", "Connect first.")
return False
msg = (msg or "").strip()
if not msg:
return False
dest_id = self._resolve_node_dest_id(node_key)
if not dest_id:
messagebox.showerror("Send failed", "Cannot resolve destinationId for this node.")
return False
# Use currently selected channelIndex (so DM uses the same channel selection)
choice = self.channel_var.get() or ""
info = self._channel_map.get(choice) or {"mode": "broadcast", "channelIndex": 0}
ch_index = int(info.get("channelIndex", 0) or 0)
try:
self.iface.sendText(msg, destinationId=dest_id, wantAck=False, channelIndex=ch_index)
self._append(f"[ME -> {self._node_label(node_key)}] {msg}")
self._append_to_node_chat(node_key, "[ME] " + msg)
return True
except Exception as e:
messagebox.showerror("Send failed", str(e))
return False
# config viewers/editors -----------------------------------------
def show_radio_config_window(self):
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showinfo("Radio config", "Connect to a device first.")
return
ln = self.iface.localNode
try:
if getattr(ln, "localConfig", None) is None and hasattr(ln, "waitForConfig"):
ln.waitForConfig("localConfig")
except Exception:
pass
cfg = getattr(ln, "localConfig", None)
mod = getattr(ln, "moduleConfig", None)
if cfg is None and mod is None:
messagebox.showinfo("Radio config", "No config available yet from device.")
return
win = tk.Toplevel(self.root)
win.title("Radio + module config (read-only)")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="none")
txt.pack(side="left", expand=True, fill="both")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=txt.yview)
txt.configure(yscrollcommand=yscroll.set)
yscroll.pack(side="right", fill="y")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
lines = []
if cfg is not None:
lines.append("localConfig:")
try:
lines.append(str(cfg))
except Exception:
lines.append(repr(cfg))
if mod is not None:
lines.append("\nmoduleConfig:")
try:
lines.append(str(mod))
except Exception:
lines.append(repr(mod))
txt.insert("1.0", "\n".join(lines))
txt.configure(state="disabled")
def show_channel_editor_window(self):
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showinfo("Channels", "Connect to a device first.")
return
ln = self.iface.localNode
try:
if getattr(ln, "channels", None) in (None, {}) and hasattr(ln, "waitForConfig"):
ln.waitForConfig("channels")
except Exception:
pass
chans = getattr(ln, "channels", None)
if not chans:
messagebox.showinfo("Channels", "No channels available from device.")
return
win = tk.Toplevel(self.root)
win.title("Channel editor")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.grid(row=0, column=0, sticky="nsew")
win.rowconfigure(0, weight=1)
win.columnconfigure(0, weight=1)
listbox = tk.Listbox(frm, height=10)
listbox.grid(row=0, column=0, columnspan=2, sticky="nsew")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
for idx, ch in enumerate(chans):
if ch is None:
label = f"Ch {idx} (empty)"
else:
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
label = f"{idx}: {name or '(no name)'}"
listbox.insert("end", label)
ttk.Label(frm, text="Channel name:").grid(row=1, column=0, sticky="w", pady=(6, 2))
name_var = tk.StringVar()
ent_name = ttk.Entry(frm, textvariable=name_var, width=40)
ent_name.grid(row=2, column=0, columnspan=2, sticky="ew")
def on_select(evt=None):
sel = listbox.curselection()
if not sel:
return
i = sel[0]
ch = chans[i]
if ch is None:
name_var.set("")
return
try:
nm = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
nm = (ch.settings.name or "").strip()
except Exception:
nm = ""
name_var.set(nm)
listbox.bind("<<ListboxSelect>>", on_select)
def save_name():
sel = listbox.curselection()
if not sel:
messagebox.showinfo("Channel editor", "Select a channel first.")
return
i = sel[0]
ch = chans[i]
if ch is None:
messagebox.showerror("Channel editor", "Selected channel object is empty, cannot rename.")
return
new_name = name_var.get().strip()
try:
if hasattr(ch, "settings"):
ch.settings.name = new_name
elif hasattr(ch, "name"):
ch.name = new_name
ln.writeChannel(i)
self._status_update(info=f"Admin: renamed channel {i} to '{new_name}'")
self._update_channels_from_iface()
listbox.delete(i)
label = f"{i}: {new_name or '(no name)'}"
listbox.insert(i, label)
except Exception as e:
messagebox.showerror("Channel editor", f"Failed to write channel: {e}")
ttk.Button(frm, text="Save name to device", command=save_name).grid(row=3, column=0, sticky="w", pady=(6, 0))
# SETTINGS (persist to ./meshtastic_client_settings.json) -------------------
def _settings_path(self) -> pathlib.Path:
"""Return the settings file path in the current working directory ('.')."""
return pathlib.Path.cwd() / CONFIG_FILENAME
def load_settings(self, silent: bool = False) -> None:
"""Load settings from JSON if the file exists."""
path = self._settings_path()
if not path.exists():
# Apply defaults if no config file
if not self._startup_geometry_applied:
try:
self.root.geometry(DEFAULT_GEOMETRY)
self._startup_geometry_applied = True
except Exception:
pass
return
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception as e:
if not silent:
messagebox.showerror("Settings", f"Failed to read settings file:\n{path}\n\n{e}")
return
# Host/port
try:
host = str(data.get("host", "")).strip()
if host:
self.host_var.set(host)
except Exception:
pass
try:
port = int(data.get("port", PORT_DEFAULT))
if 1 <= port <= 65535:
self.port_var.set(port)
except Exception:
pass
# Connection method and params
try:
ctype = str(data.get("connection_type", "")).strip().lower()
if ctype in ("tcp", "serial", "ble", "bluetooth"):
self.conn_type_var.set("ble" if ctype == "bluetooth" else ctype)
except Exception:
pass
try:
sp = str(data.get("serial_port", "")).strip()
self.serial_port_var.set(sp)
except Exception:
pass
try:
ba = str(data.get("ble_address", "")).strip()
self.ble_addr_var.set(ba)
except Exception:
pass
# Theme
try:
theme = str(data.get("theme", "")).strip().lower()
if theme in ("light", "dark"):
self.current_theme = theme
except Exception:
pass
# Default send target / channel choice (label string)
try:
ch_choice = str(data.get("channel_choice", "")).strip()
if ch_choice:
# only apply if exists now; otherwise keep pending until channels are loaded
try:
values = list(self.cbo_channel["values"])
except Exception:
values = []
if ch_choice in values:
self.channel_var.set(ch_choice)
else:
self._pending_channel_choice = ch_choice
except Exception:
pass
# Window geometry
try:
geom = str(data.get("window_geometry", "")).strip()
if geom:
self.root.geometry(geom)
self._startup_geometry_applied = True
except Exception:
pass
# Sash fraction
try:
frac = float(data.get("sash_fraction", DEFAULT_SASH_FRACTION))
if 0.10 <= frac <= 0.90:
self._startup_sash_fraction = frac
except Exception:
pass
self._update_title_with_host()
if not silent:
self._status_update(info=f"Settings loaded from {path}")
def save_settings(self, silent: bool = False) -> None:
"""Save settings to JSON in the current working directory ('.')."""
path = self._settings_path()
# Best-effort sash fraction
sash_fraction = DEFAULT_SASH_FRACTION
try:
w = self.paned.winfo_width() or self.paned.winfo_reqwidth() or 1
pos = self.paned.sashpos(0)
if w > 0:
sash_fraction = max(0.10, min(0.90, float(pos) / float(w)))
except Exception:
pass
payload = {
"version": 2,
"host": (self.host_var.get() or "").strip(),
"port": int(self.port_var.get() or PORT_DEFAULT),
"connection_type": (self.conn_type_var.get() or "tcp").strip().lower(),
"serial_port": (self.serial_port_var.get() or "").strip(),
"ble_address": (self.ble_addr_var.get() or "").strip(),
"theme": self.current_theme,
"channel_choice": (self.channel_var.get() or "").strip(),
"window_geometry": self.root.winfo_geometry(),
"sash_fraction": sash_fraction,
}
try:
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(payload, indent=2), encoding="utf-8")
tmp.replace(path)
if not silent:
messagebox.showinfo("Settings", f"Saved settings to:\n{path}")
except Exception as e:
if not silent:
messagebox.showerror("Settings", f"Failed to save settings to:\n{path}\n\n{e}")
def open_settings_dialog(self) -> None:
"""Open a Settings window (loads/saves JSON in the current working dir)."""
win = tk.Toplevel(self.root)
win.title("Settings")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=10)
frm.grid(row=0, column=0, sticky="nsew")
win.rowconfigure(0, weight=1)
win.columnconfigure(0, weight=1)
nb = ttk.Notebook(frm)
nb.grid(row=0, column=0, sticky="nsew")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
tab_general = ttk.Frame(nb, padding=10)
tab_conn = ttk.Frame(nb, padding=10)
tab_file = ttk.Frame(nb, padding=10)
nb.add(tab_general, text="General")
nb.add(tab_conn, text="Connection")
nb.add(tab_file, text="File")
# --- General tab ------------------------------------------------
theme_var = tk.StringVar(value=self.current_theme)
ttk.Label(tab_general, text="Theme:").grid(row=0, column=0, sticky="w", pady=4)
cbo_theme = ttk.Combobox(tab_general, textvariable=theme_var, state="readonly", values=["dark", "light"], width=10)
cbo_theme.grid(row=0, column=1, sticky="w", pady=4, padx=6)
ttk.Label(tab_general, text="Default send target:").grid(row=1, column=0, sticky="w", pady=4)
choice_var = tk.StringVar(value=self.channel_var.get())
try:
choices = list(self.cbo_channel["values"])
except Exception:
choices = []
cbo_choice = ttk.Combobox(tab_general, textvariable=choice_var, state="readonly", values=choices, width=28)
cbo_choice.grid(row=1, column=1, sticky="w", pady=4, padx=6)
tab_general.columnconfigure(1, weight=1)
# --- Connection tab --------------------------------------------
# Pretty labels in UI, but we store keys: tcp|serial|ble
key_to_label = {"tcp": "TCP (WiFi)", "serial": "USB/Serial", "ble": "Bluetooth (BLE)"}
label_to_key = {v: k for k, v in key_to_label.items()}
method_label_var = tk.StringVar(value=key_to_label.get((self.conn_type_var.get() or "tcp").strip().lower(), "TCP (WiFi)"))
ttk.Label(tab_conn, text="Method:").grid(row=0, column=0, sticky="w", pady=4)
cbo_method = ttk.Combobox(tab_conn, textvariable=method_label_var, state="readonly",
values=[key_to_label["tcp"], key_to_label["serial"], key_to_label["ble"]],
width=18)
cbo_method.grid(row=0, column=1, sticky="w", pady=4, padx=6)
# TCP settings
tcp_box = ttk.Labelframe(tab_conn, text="TCP settings", padding=10)
ttk.Label(tcp_box, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4)
ent_host = ttk.Entry(tcp_box, textvariable=self.host_var, width=32)
ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=6)
ttk.Label(tcp_box, text="Port:").grid(row=1, column=0, sticky="w", pady=4)
ent_port = ttk.Entry(tcp_box, textvariable=self.port_var, width=10)
ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=6)
tcp_box.columnconfigure(1, weight=1)
# Serial settings
serial_box = ttk.Labelframe(tab_conn, text="USB/Serial settings", padding=10)
serial_port_ui = tk.StringVar(value=(self.serial_port_var.get() or "").strip() or "(auto)")
ttk.Label(serial_box, text="Serial port:").grid(row=0, column=0, sticky="w", pady=4)
cbo_serial = ttk.Combobox(serial_box, textvariable=serial_port_ui, state="readonly", width=30)
cbo_serial.grid(row=0, column=1, sticky="w", pady=4, padx=6)
def refresh_serial_ports():
ports = []
if list_ports:
try:
ports = [p.device for p in list_ports.comports()]
except Exception:
ports = []
vals = ["(auto)"] + ports
# Ensure saved/manual value is visible
cur = (serial_port_ui.get() or "").strip()
if cur and cur not in vals:
vals.append(cur)
cbo_serial["values"] = vals
if not cur:
serial_port_ui.set("(auto)")
ttk.Button(serial_box, text="Refresh", command=refresh_serial_ports).grid(row=0, column=2, padx=(6, 0), sticky="w")
refresh_serial_ports()
# BLE settings
ble_box = ttk.Labelframe(tab_conn, text="Bluetooth (BLE) settings", padding=10)
ble_addr_ui = tk.StringVar(value=(self.ble_addr_var.get() or "").strip())
ttk.Label(ble_box, text="Device (name or address):").grid(row=0, column=0, sticky="w", pady=4)
ent_ble = ttk.Entry(ble_box, textvariable=ble_addr_ui, width=36)
ent_ble.grid(row=0, column=1, sticky="ew", pady=4, padx=6)
def scan_ble_into_settings():
if BLEInterface is None:
messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).")
return
try:
devices = BLEInterface.scan()
except Exception as e:
messagebox.showerror("BLE scan failed", str(e))
return
if not devices:
messagebox.showinfo("BLE", "No devices found.")
return
options = [
f"{i + 1}. {getattr(d, 'name', '') or '(unnamed)'} [{getattr(d, 'address', '?')}]"
for i, d in enumerate(devices)
]
choice = simpledialog.askinteger(
"Select BLE device",
"Enter number:\n" + "\n".join(options),
minvalue=1,
maxvalue=len(devices),
)
if not choice:
return
addr = getattr(devices[choice - 1], "address", None)
if not addr:
messagebox.showerror("BLE", "Selected device has no address.")
return
ble_addr_ui.set(str(addr))
ttk.Button(ble_box, text="Scan...", command=scan_ble_into_settings).grid(row=0, column=2, padx=(6, 0), sticky="w")
ble_box.columnconfigure(1, weight=1)
# place boxes (we show/hide depending on method)
tcp_box.grid(row=1, column=0, columnspan=3, sticky="ew", pady=(10, 6))
serial_box.grid(row=2, column=0, columnspan=3, sticky="ew", pady=6)
ble_box.grid(row=3, column=0, columnspan=3, sticky="ew", pady=6)
tab_conn.columnconfigure(1, weight=1)
def _show_only(key: str):
key = key.strip().lower()
if key == "serial":
tcp_box.grid_remove()
ble_box.grid_remove()
serial_box.grid()
elif key == "ble":
tcp_box.grid_remove()
serial_box.grid_remove()
ble_box.grid()
else:
serial_box.grid_remove()
ble_box.grid_remove()
tcp_box.grid()
def _on_method_change(*_):
key = label_to_key.get(method_label_var.get(), "tcp")
_show_only(key)
cbo_method.bind("<<ComboboxSelected>>", _on_method_change)
_show_only(label_to_key.get(method_label_var.get(), "tcp"))
# --- File tab ---------------------------------------------------
path = self._settings_path()
ttk.Label(tab_file, text="Settings file:").grid(row=0, column=0, sticky="w")
lbl_path = ttk.Entry(tab_file, width=60)
lbl_path.grid(row=1, column=0, sticky="ew", pady=(4, 10))
lbl_path.insert(0, str(path))
lbl_path.configure(state="readonly")
tab_file.columnconfigure(0, weight=1)
def open_folder():
folder = str(path.parent)
try:
if os.name == "nt":
os.startfile(folder) # type: ignore[attr-defined]
else:
subprocess.Popen(["xdg-open", folder])
except Exception as e:
messagebox.showerror("Open folder", str(e))
ttk.Button(tab_file, text="Open folder", command=open_folder).grid(row=2, column=0, sticky="w")
# --- Buttons ----------------------------------------------------
btnbar = ttk.Frame(frm)
btnbar.grid(row=1, column=0, sticky="e", pady=(10, 0))
def apply_and_save():
# Validate port if TCP
method_key = label_to_key.get(method_label_var.get(), "tcp")
if method_key == "tcp":
h = (self.host_var.get() or "").strip()
if not h:
messagebox.showerror("Settings", "Host/IP cannot be empty for TCP.")
return
try:
p = int(self.port_var.get())
if not (1 <= p <= 65535):
raise ValueError
except Exception:
messagebox.showerror("Settings", "Port must be 1-65535")
return
# Persist connection settings
self.conn_type_var.set(method_key)
sp = (serial_port_ui.get() or "").strip()
self.serial_port_var.set("" if sp.lower() == "(auto)" else sp)
self.ble_addr_var.set((ble_addr_ui.get() or "").strip())
# Theme
self.current_theme = theme_var.get()
self.apply_theme(self.current_theme)
# Default send target
selected_choice = (choice_var.get() or "").strip()
if selected_choice:
try:
values_now = list(self.cbo_channel["values"])
except Exception:
values_now = []
if selected_choice in values_now:
self.channel_var.set(selected_choice)
else:
self._pending_channel_choice = selected_choice
self.save_settings(silent=False)
def do_load():
self.load_settings(silent=False)
theme_var.set(self.current_theme)
choice_var.set(self.channel_var.get())
# Refresh method + params
method_label_var.set(key_to_label.get((self.conn_type_var.get() or "tcp").strip().lower(), "TCP (WiFi)"))
serial_port_ui.set((self.serial_port_var.get() or "").strip() or "(auto)")
ble_addr_ui.set((self.ble_addr_var.get() or "").strip())
refresh_serial_ports()
_on_method_change()
ttk.Button(btnbar, text="Load", command=do_load).grid(row=0, column=0, padx=4)
ttk.Button(btnbar, text="Save", command=apply_and_save).grid(row=0, column=1, padx=4)
ttk.Button(btnbar, text="Close", command=win.destroy).grid(row=0, column=2, padx=4)
def close_app(self) -> None:
"""Graceful close: save settings, disconnect, then destroy."""
try:
self.save_settings(silent=True)
except Exception:
pass
try:
self.disconnect()
finally:
try:
self.root.destroy()
except Exception:
pass
class NodeChatWindow:
def __init__(self, app: MeshtasticGUI, node_key: str, label: str):
self.app = app
self.node_key = node_key
self.label = label
self.top = tk.Toplevel(app.root)
self.top.title(f"Chat: {label}")
app._style_toplevel(self.top)
self.top.protocol("WM_DELETE_WINDOW", self._on_close)
frm = ttk.Frame(self.top, padding=8)
frm.pack(expand=True, fill="both")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
self.txt = tk.Text(frm, wrap="word")
self.txt.grid(row=0, column=0, columnspan=2, sticky="nsew")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=self.txt.yview)
self.txt.configure(yscrollcommand=yscroll.set)
yscroll.grid(row=0, column=2, sticky="ns")
self.entry = ttk.Entry(frm)
self.entry.grid(row=1, column=0, sticky="nsew", pady=(6, 0))
self.entry.bind("<Return>", self._on_send)
ttk.Button(frm, text="Send", command=self._on_send).grid(row=1, column=1, sticky="e", padx=(4, 0), pady=(6, 0))
self.apply_theme()
def apply_theme(self):
is_dark = self.app.current_theme == "dark"
self.txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
def append(self, line: str):
try:
self.txt.insert("end", line + "\n")
self.txt.see("end")
except Exception:
pass
def _on_send(self, *_):
msg = self.entry.get().strip()
if not msg:
return
ok = self.app._send_text_to_node(self.node_key, msg)
if ok:
self.entry.delete(0, "end")
def _on_close(self):
try:
self.app._per_node_chats.pop(str(self.node_key), None)
except Exception:
pass
self.top.destroy()
def main():
app = MeshtasticGUI()
app.root.protocol("WM_DELETE_WINDOW", app.close_app)
app.root.mainloop()
if __name__ == "__main__":
main()