diff --git a/Debian/meshtastic_client.py b/Debian/meshtastic_client.py index e5f5f6a..699fe9e 100644 --- a/Debian/meshtastic_client.py +++ b/Debian/meshtastic_client.py @@ -1,13 +1,32 @@ # -*- coding: utf-8 -*- #!/usr/bin/env python3 -from __future__ import annotations -import json, time, datetime, threading, pathlib, tkinter as tk -from tkinter import ttk, messagebox, simpledialog -from typing import Any, Dict, Optional -import os -import subprocess -import webbrowser +""" +Meshtastic Client (Tkinter) +Improvements vs previous version: +- Thread-safe UI updates (no Tk calls from background threads) +- De-duplicated receive handler (only processes each packet once) +- Added "To selected node (DM)" send target option +- Per-node chat now resolves destinationId correctly before sending +- Connection "connected" fallback when pubsub isn't available +- Theme changes apply to open chat windows too +""" +from __future__ import annotations + +import datetime +import json +import logging +import os +import pathlib +import subprocess +import threading +import time +import tkinter as tk +import webbrowser +from tkinter import messagebox, simpledialog, ttk +from typing import Any, Dict, Optional, Tuple + +# Optional deps ------------------------------------------------------- try: from pubsub import pub except Exception: @@ -17,10 +36,12 @@ try: from meshtastic.tcp_interface import TCPInterface except Exception: TCPInterface = None + try: from meshtastic.serial_interface import SerialInterface except Exception: SerialInterface = None + try: from meshtastic.ble_interface import BLEInterface except Exception: @@ -34,19 +55,29 @@ except Exception: portnums_pb2 = None telemetry_pb2 = None _json_format = None + try: from serial.tools import list_ports except Exception: list_ports = None +# Defaults ------------------------------------------------------------ HOST_DEFAULT = "192.168.0.156" PORT_DEFAULT = 4403 -PROJECT_PATH = pathlib.Path(__file__).parent + +PROJECT_PATH = pathlib.Path(__file__).resolve().parent ICON_PATH = PROJECT_PATH / "meshtastic.ico" +CONFIG_FILENAME = "meshtastic_client_settings.json" +DEFAULT_GEOMETRY = "1500x820" +DEFAULT_SASH_FRACTION = 0.40 -def _prefer_chrome(url: str): - # try Chrome first +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +log = logging.getLogger("meshtastic-client") + + +def _prefer_chrome(url: str) -> None: + """Open URL (prefer Chrome on Windows, otherwise default browser).""" chrome_paths = [ r"C:\Program Files\Google\Chrome\Application\chrome.exe", r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe", @@ -55,7 +86,6 @@ def _prefer_chrome(url: str): if os.path.exists(p): subprocess.Popen([p, url]) return - # fallback webbrowser.open(url) @@ -72,13 +102,13 @@ def _fmt_ago(epoch_seconds: Optional[float]) -> str: hours = int(delta // 3600) days = int(delta // 86400) if delta < 60: - return "%ds" % int(delta) + return f"{int(delta)}s" if mins < 60: - return "%dm" % mins + return f"{mins}m" if hours < 24: - return "%dh" % hours + return f"{hours}h" if days < 7: - return "%dd" % days + return f"{days}d" dt = datetime.datetime.fromtimestamp(epoch_seconds) return dt.strftime("%Y-%m-%d %H:%M") @@ -95,6 +125,9 @@ class MeshtasticGUI: pass self.current_theme = "dark" + self._pending_channel_choice: Optional[str] = None + self._startup_sash_fraction: float = DEFAULT_SASH_FRACTION + self._startup_geometry_applied: bool = False self.root.rowconfigure(0, weight=1) self.root.columnconfigure(0, weight=1) @@ -102,8 +135,27 @@ class MeshtasticGUI: self.host_var = tk.StringVar(value=HOST_DEFAULT) self.port_var = tk.IntVar(value=PORT_DEFAULT) + # Connection settings (persisted) + # conn_type: "tcp" | "serial" | "ble" + self.conn_type_var = tk.StringVar(value="tcp") + self.serial_port_var = tk.StringVar(value="") # empty = auto + self.ble_addr_var = tk.StringVar(value="") + + # State + self.iface: Optional[object] = None + self.connected_evt = threading.Event() + self._last_seen_overrides: Dict[str, float] = {} + self._last_sort_col: Optional[str] = "since" + self._last_sort_reverse: bool = True + self._telemetry: Dict[str, Dict[str, Any]] = {} + self._per_node_chats: Dict[str, "NodeChatWindow"] = {} + self._ui_thread_id = threading.get_ident() + + # Menu -------------------------------------------------------- self.menubar = tk.Menu(self.root) + m_conn = tk.Menu(self.menubar, tearoff=False) + m_conn.add_command(label="Connect (Saved settings)", command=self.connect_saved) m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp) m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog) m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog) @@ -113,7 +165,9 @@ class MeshtasticGUI: self.menubar.add_cascade(label="Connection", menu=m_conn) m_tools = tk.Menu(self.menubar, tearoff=False) - m_tools.add_command(label="Clear messages", command=lambda: self.txt_messages.delete("1.0", "end")) + m_tools.add_command(label="Clear messages", command=self.clear_messages) + m_tools.add_separator() + m_tools.add_command(label="Settings...", command=self.open_settings_dialog) self.menubar.add_cascade(label="Tools", menu=m_tools) m_view = tk.Menu(self.menubar, tearoff=False) @@ -138,15 +192,32 @@ class MeshtasticGUI: self.root.config(menu=self.menubar) + # Layout ------------------------------------------------------ self.rootframe = ttk.Frame(self.root) self.rootframe.grid(row=0, column=0, sticky="nsew") self.rootframe.rowconfigure(0, weight=1) + self.rootframe.rowconfigure(1, weight=0) self.rootframe.columnconfigure(0, weight=1) self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal") self.paned.grid(row=0, column=0, sticky="nsew") - # messages + + # Status bar (keeps non-chat info out of the Messages area) + self.status_conn_var = tk.StringVar(value="Conn: Disconnected") + self.status_pos_var = tk.StringVar(value="POS: -") + self.status_tel_var = tk.StringVar(value="TEL: -") + self.status_info_var = tk.StringVar(value="") + + self.statusbar = ttk.Frame(self.rootframe, padding=(6, 2)) + self.statusbar.grid(row=1, column=0, sticky="ew") + self.statusbar.columnconfigure(3, weight=1) + + ttk.Label(self.statusbar, textvariable=self.status_conn_var).grid(row=0, column=0, sticky="w", padx=(0, 12)) + ttk.Label(self.statusbar, textvariable=self.status_pos_var).grid(row=0, column=1, sticky="w", padx=(0, 12)) + ttk.Label(self.statusbar, textvariable=self.status_tel_var).grid(row=0, column=2, sticky="w", padx=(0, 12)) + ttk.Label(self.statusbar, textvariable=self.status_info_var).grid(row=0, column=3, sticky="ew") + # Messages pane self.msg_frame = ttk.Frame(self.paned) self.msg_frame.rowconfigure(1, weight=1) self.msg_frame.columnconfigure(0, weight=1) @@ -162,20 +233,22 @@ class MeshtasticGUI: self.send_frame = ttk.Frame(self.msg_frame) self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4)) self.send_frame.columnconfigure(0, weight=1) + self.ent_message = ttk.Entry(self.send_frame) self.ent_message.grid(row=0, column=0, sticky="nsew") self.ent_message.bind("", lambda e: self.send_message()) + self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message) self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew") - # channel selector (public / selected / private channels) + # Channel selector self.channel_var = tk.StringVar() - self._channel_map = {} - self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=22) + self._channel_map: Dict[str, Dict[str, Any]] = {} + self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=26) self._reset_channel_choices() self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w") - # nodes + # Nodes pane self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)") self.nodes_frame.rowconfigure(1, weight=1) self.nodes_frame.columnconfigure(0, weight=1) @@ -222,6 +295,7 @@ class MeshtasticGUI: } for key, text in headings.items(): self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False)) + widths = { "shortname": 90, "longname": 200, @@ -237,23 +311,27 @@ class MeshtasticGUI: } for key, w in widths.items(): try: - self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt", "battery", "voltage"))) + self.tv_nodes.column( + key, + width=w, + anchor="w", + stretch=(key not in ("since", "hops", "distkm", "speed", "alt", "battery", "voltage")), + ) except Exception: pass - # hide technical columns + # Hide technical columns for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"): try: self.tv_nodes.column(col, width=0, minwidth=0, stretch=False) except Exception: pass - # scrollbar for node list yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview) self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set) yscroll_nodes.grid(row=1, column=1, sticky="ns") - # right-click: + # Node context menu self.node_menu = tk.Menu(self.nodes_frame, tearoff=False) self.node_menu.add_command(label="Node info", command=self._cm_show_node_details) self.node_menu.add_command(label="Map", command=self._cm_open_map) @@ -267,16 +345,9 @@ class MeshtasticGUI: self.paned.add(self.msg_frame, weight=3) self.paned.add(self.nodes_frame, weight=4) - self.root.after(100, lambda: self._safe_set_sash(0.40)) - - self.iface: Optional[object] = None - self.connected_evt = threading.Event() - self._last_seen_overrides: Dict[str, float] = {} - self._last_sort_col: Optional[str] = "since" - self._last_sort_reverse: bool = True - self._telemetry: Dict[str, Dict[str, Any]] = {} - self._per_node_chats: Dict[str, "NodeChatWindow"] = {} + self.root.after(100, lambda: self._safe_set_sash(self._startup_sash_fraction)) + # PubSub subscriptions --------------------------------------- if pub is not None: try: pub.subscribe(self.on_connection_established, "meshtastic.connection.established") @@ -284,18 +355,55 @@ class MeshtasticGUI: pub.subscribe(self.on_receive, "meshtastic.receive") pub.subscribe(self.on_node_updated, "meshtastic.node.updated") except Exception as e: - print("pubsub subscribe failed:", e) + log.warning("pubsub subscribe failed: %s", e) - self.apply_theme("dark") - self._append("Ready. Connection -> Connect (TCP/USB/BLE)") + # Load persisted settings (from ./meshtastic_client_settings.json) if present + self.load_settings(silent=True) + self.apply_theme(self.current_theme) + self._status_update(info="Ready. Connection -> Connect (TCP/USB/BLE)") self._update_title_with_host() + # UI helpers ------------------------------------------------------ + def _ui(self, fn, *args, **kwargs): + """Run fn in Tk main thread.""" + if threading.get_ident() == self._ui_thread_id: + fn(*args, **kwargs) + else: + self.root.after(0, lambda: fn(*args, **kwargs)) + + def _status_update(self, *, conn: Optional[str] = None, pos: Optional[str] = None, tel: Optional[str] = None, info: Optional[str] = None): + """Update the bottom status bar (UI thread safe).""" + def _do(): + ts = time.strftime("%H:%M:%S") + if conn is not None: + self.status_conn_var.set(f"Conn: {conn}") + if pos is not None: + self.status_pos_var.set(f"POS: {pos} @{ts}" if pos != "-" else "POS: -") + if tel is not None: + self.status_tel_var.set(f"TEL: {tel} @{ts}" if tel != "-" else "TEL: -") + if info is not None: + self.status_info_var.set(f"{info} @{ts}" if info else "") + self._ui(_do) + + def _status_event(self, kind: str, label: str): + """Convenience for updating POS/TEL and other packet status.""" + kind = (kind or "").upper().strip() + if kind == "POS": + self._status_update(pos=label) + elif kind == "TEL": + self._status_update(tel=label) + else: + self._status_update(info=f"{kind}: {label}" if kind else label) + + def clear_messages(self): + self._ui(lambda: self.txt_messages.delete("1.0", "end")) + # helpers --------------------------------------------------------- def _open_browser_url(self, url: str): _prefer_chrome(url) def _update_title_with_host(self): - self.root.title("Meshtastic Client - %s:%s" % (self.host_var.get(), self.port_var.get())) + self.root.title(f"Meshtastic Client - {self.host_var.get()}:{self.port_var.get()}") def _safe_set_sash(self, fraction: float = 0.5): try: @@ -311,69 +419,57 @@ class MeshtasticGUI: user = (node or {}).get("user") or {} shortname = user.get("shortName") or "" longname = user.get("longName") or "" - label = ("%s %s" % (shortname, longname)).strip() - if label: - return label - return node_id + label = (f"{shortname} {longname}").strip() + return label or node_id # pubsub callbacks ------------------------------------------------ - def on_connection_established(self, interface=None, **kwargs): + def _mark_connected(self): + if self.connected_evt.is_set(): + return self.connected_evt.set() - self._append("[+] Connected") - # refresh nodes and channel list when we connect + self._status_update(conn="Connected", info="Connected") self.refresh_nodes() try: self._update_channels_from_iface() except Exception: pass + + def on_connection_established(self, interface=None, **kwargs): + self._ui(self._mark_connected) + def on_connection_lost(self, interface=None, **kwargs): - self.connected_evt.clear() - self._append("[-] Connection lost") + def _lost(): + self.connected_evt.clear() + self._status_update(conn="Disconnected", info="Connection lost") + self._ui(_lost) def on_node_updated(self, node=None, interface=None, **kwargs): - self.root.after(0, self.refresh_nodes) + self._ui(self.refresh_nodes) def on_receive(self, packet=None, interface=None, **kwargs): - self.root.after(0, lambda: self._handle_receive(packet or {})) + self._ui(lambda: self._handle_receive(packet or {})) # receive --------------------------------------------------------- def _handle_receive(self, packet: dict): - decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} - app_name = decoded.get("app", "") - portnum = decoded.get("portnum") or app_name - from_id = packet.get("fromId") or packet.get("from") or "UNKNOWN" - label = self._node_label(from_id) if hasattr(self, "_node_label") else from_id + if not isinstance(packet, dict): + return - tag_map = { - "TEXT_MESSAGE_APP": "MSG", - "TEXT_MESSAGE_COMPRESSED_APP": "MSG", - "POSITION_APP": "POS", - "TELEMETRY_APP": "TEL", - "NODEINFO_APP": "INFO", - "ROUTING_APP": "ROUT", - "MAP_REPORT_APP": "MAP", - "ADMIN_APP": "ADM", - "NEIGHBORINFO_APP": "NEI", - "STORE_FORWARD_APP": "SFWD", - "REMOTE_HARDWARE_APP": "RHW", - "PRIVATE_APP": "PRIV", - } - tag = tag_map.get(app_name or portnum, "INFO") + decoded = packet.get("decoded") or {} + if not isinstance(decoded, dict): + decoded = {} - if app_name in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"): - text = decoded.get("text") or decoded.get("payload", {}).get("text", "") - if text: - self._append(f"[MSG] {label}: {text}") - else: - self._append(f"[MSG] {label}") + app_name = decoded.get("app") or "" + portnum = decoded.get("portnum") or app_name or "" + sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort") or "UNKNOWN" + sender = str(sender) - if hasattr(self, "refresh_nodes"): - self.refresh_nodes() + # Mark last seen (for more accurate "Since") + self._last_seen_overrides[sender] = time.time() - decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} - portnum = decoded.get("portnum") - sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort") - if portnum == "TELEMETRY_APP" and telemetry_pb2 is not None and sender: + label = self._node_label(sender) + + # Telemetry parsing (device metrics) + if portnum == "TELEMETRY_APP" and telemetry_pb2 is not None: try: payload = decoded.get("payload") if isinstance(payload, (bytes, bytearray)): @@ -381,49 +477,60 @@ class MeshtasticGUI: tmsg.ParseFromString(payload) if tmsg.HasField("device_metrics"): dm = tmsg.device_metrics - entry = self._telemetry.get(str(sender), {}) - if dm.battery_level is not None: + entry = self._telemetry.get(sender, {}) + if getattr(dm, "battery_level", None) is not None: entry["battery"] = dm.battery_level - if dm.voltage is not None: + if getattr(dm, "voltage", None) is not None: entry["voltage"] = dm.voltage - self._telemetry[str(sender)] = entry + self._telemetry[sender] = entry except Exception: - # ignore telemetry parse errors, they are non-fatal - pass + pass # telemetry failures are non-fatal - if sender: - self._last_seen_overrides[str(sender)] = time.time() + # Text message + if portnum in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"): + text = decoded.get("text") + if isinstance(text, bytes): + text = text.decode("utf-8", errors="ignore") + if not isinstance(text, str): + # Some packets provide bytes in decoded.payload + p = decoded.get("payload") + if isinstance(p, (bytes, bytearray)): + text = p.decode("utf-8", errors="ignore") + elif isinstance(p, dict): + t2 = p.get("text") + if isinstance(t2, str): + text = t2 + elif isinstance(t2, (bytes, bytearray)): + text = t2.decode("utf-8", errors="ignore") + else: + text = "" - user = {} - if self.iface and getattr(self.iface, "nodes", None) and sender: - user = (self.iface.nodes.get(sender) or {}).get("user", {}) # type: ignore[attr-defined] - shortname = user.get("shortName") or "" - longname = user.get("longName") or "" - label = str(shortname or longname or sender or "Unknown").strip() + rssi = packet.get("rxRssi") + if text: + self._append(f"[MSG] {label}: {text} (RSSI={rssi})") + self._append_to_node_chat(sender, text) + else: + self._append(f"[MSG] {label} (RSSI={rssi})") - text = "" - p = decoded.get("payload", "") - if isinstance(p, (bytes, bytearray)): - try: - text = p.decode("utf-8", errors="ignore") - except Exception: - text = repr(p) - elif isinstance(p, str): - text = p - else: - t = decoded.get("text") - if isinstance(t, bytes): - text = t.decode("utf-8", errors="ignore") - elif isinstance(t, str): - text = t - - rssi = packet.get("rxRssi") - - if portnum == "TEXT_MESSAGE_APP": - self._append("[MSG] %s: %s (RSSI=%s)" % (label, text, rssi)) - self._append_to_node_chat(str(sender), "%s" % (text,)) - if isinstance(text, str) and text.strip().lower() == "ping": + if text.strip().lower() == "ping": self._send_pong(sender, label) + else: + # Non-text: keep lightweight log + tag_map = { + "POSITION_APP": "POS", + "TELEMETRY_APP": "TEL", + "NODEINFO_APP": "INFO", + "ROUTING_APP": "ROUT", + "MAP_REPORT_APP": "MAP", + "ADMIN_APP": "ADM", + "NEIGHBORINFO_APP": "NEI", + "STORE_FORWARD_APP": "SFWD", + "REMOTE_HARDWARE_APP": "RHW", + "PRIVATE_APP": "PRIV", + } + tag = tag_map.get(str(app_name or portnum), "INFO") + # Avoid spamming for every non-text packet; show only brief line + self._status_event(tag, label) self.refresh_nodes() @@ -432,16 +539,17 @@ class MeshtasticGUI: return try: self.iface.sendText("pong", destinationId=dest_id, wantAck=False) - self._append("[auto] pong -> %s" % label) + self._status_update(info=f"auto pong -> {label}") self._append_to_node_chat(str(dest_id), "[auto] pong") except Exception as e: - self._append("[auto] pong failed: %s" % e) + self._status_update(info=f"auto pong failed: {e}") # connection actions ---------------------------------------------- def set_ip_port(self): win = tk.Toplevel(self.root) win.title("Set IP/Port") self._style_toplevel(win) + frm = ttk.Frame(win, padding=8) frm.grid(row=0, column=0, sticky="nsew") win.columnconfigure(0, weight=1) @@ -480,6 +588,67 @@ class MeshtasticGUI: ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4) ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4) + def connect_saved(self): + """Connect using the method selected in Settings.""" + if self.iface: + return + method = (self.conn_type_var.get() or "tcp").strip().lower() + if method == "serial": + port = (self.serial_port_var.get() or "").strip() + self.connect_serial(port if port else None) + elif method in ("ble", "bluetooth"): + addr = (self.ble_addr_var.get() or "").strip() + if not addr: + # If no saved address, fall back to interactive scan + self.connect_ble_dialog() + else: + self.connect_ble(addr) + else: + self.connect_tcp() + + def connect_serial(self, port: Optional[str] = None): + """Connect via USB/Serial. If port is None, auto-detect.""" + if self.iface: + return + if SerialInterface is None: + self._ui(lambda: messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.")) + return + self._status_update(conn="Connecting (Serial)", info=f"Connecting Serial {port or '(auto)'}...") + + def run(): + try: + self.iface = SerialInterface(devPath=port) if port else SerialInterface() + if pub is None: + self._ui(self._mark_connected) + else: + self.connected_evt.wait(timeout=6) + except Exception as e: + self._ui(lambda: messagebox.showerror("Serial connect failed", str(e))) + + threading.Thread(target=run, daemon=True).start() + + def connect_ble(self, address: str): + """Connect via Bluetooth (BLE) using a saved address or name filter.""" + if self.iface: + return + if BLEInterface is None: + self._ui(lambda: messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).")) + return + self._status_update(conn="Connecting (BLE)", info=f"Connecting BLE {address}...") + + def run(): + try: + # BLEInterface accepts an address or name filter and will scan for matches. + self.iface = BLEInterface(address=address) + if pub is None: + self._ui(self._mark_connected) + else: + self.connected_evt.wait(timeout=10) + except Exception as e: + self._ui(lambda: messagebox.showerror("BLE connect failed", str(e))) + + threading.Thread(target=run, daemon=True).start() + def connect_tcp(self): if self.iface: return @@ -489,19 +658,25 @@ class MeshtasticGUI: except Exception: messagebox.showerror("Port", "Invalid port") return - self._append("Connecting TCP %s:%s ..." % (host, port)) + self._status_update(conn="Connecting (TCP)", info=f"Connecting TCP {host}:{port}...") def run(): try: if TCPInterface is None: raise RuntimeError("meshtastic.tcp_interface not installed") self.iface = TCPInterface(hostname=host, portNumber=port) - self.connected_evt.wait(timeout=5) + # If pubsub isn't available, we might not get the established event. + if pub is None: + self._ui(self._mark_connected) + else: + self.connected_evt.wait(timeout=6) except Exception as e: - self.root.after(0, lambda: messagebox.showerror("TCP connect failed", str(e))) + self._ui(lambda: messagebox.showerror("TCP connect failed", str(e))) + threading.Thread(target=run, daemon=True).start() def connect_serial_dialog(self): + """Interactive serial connect (also stores chosen port in Settings).""" if SerialInterface is None: messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.") return @@ -512,28 +687,35 @@ class MeshtasticGUI: ports = [p.device for p in list_ports.comports()] except Exception: ports = [] + presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"] - port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=presets[0]) + initial = "(auto)" + try: + saved = (self.serial_port_var.get() or "").strip() + if saved: + initial = saved + except Exception: + pass + + port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=initial) if port is None: return port = port.strip() if port.lower() == "(auto)" or port == "": port = None - self._append("Connecting Serial %s ..." % (port or "(auto)")) - def run(): - try: - self.iface = SerialInterface(devPath=port) if port else SerialInterface() - self.connected_evt.wait(timeout=5) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("Serial connect failed", str(e))) - threading.Thread(target=run, daemon=True).start() + # Persist choice + self.serial_port_var.set(port or "") + self.conn_type_var.set("serial") + + self.connect_serial(port) def connect_ble_dialog(self): + """Interactive BLE connect (also stores chosen device address in Settings).""" if BLEInterface is None: messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).") return - self._append("Scanning BLE for Meshtastic devices ...") + self._status_update(conn="Scanning BLE", info="Scanning BLE for Meshtastic devices...") try: devices = BLEInterface.scan() except Exception as e: @@ -542,28 +724,30 @@ class MeshtasticGUI: if not devices: messagebox.showinfo("BLE", "No devices found.") return - options = ["%d. %s [%s]" % (i + 1, getattr(d, "name", "") or "(unnamed)", getattr(d, "address", "?")) for i, d in enumerate(devices)] + + options = [ + f"{i + 1}. {getattr(d, 'name', '') or '(unnamed)'} [{getattr(d, 'address', '?')}]" + for i, d in enumerate(devices) + ] choice = simpledialog.askinteger( "Select BLE device", - "Enter number:\n" + "\n".join(options), + "Enter number:\n" + "\n".join(options), minvalue=1, maxvalue=len(devices), ) if not choice: return + addr = getattr(devices[choice - 1], "address", None) if not addr: messagebox.showerror("BLE", "Selected device has no address.") return - self._append("Connecting BLE %s ..." % addr) - def run(): - try: - self.iface = BLEInterface(address=addr) - self.connected_evt.wait(timeout=8) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("BLE connect failed", str(e))) - threading.Thread(target=run, daemon=True).start() + # Persist choice + self.ble_addr_var.set(str(addr)) + self.conn_type_var.set("ble") + + self.connect_ble(str(addr)) def disconnect(self): try: @@ -573,45 +757,34 @@ class MeshtasticGUI: pass self.iface = None self.connected_evt.clear() - self._append("[*] Disconnected") + self._status_update(conn="Disconnected", info="Disconnected") # send ------------------------------------------------------------ - - # channel selector helpers ---------------------------------------- def _reset_channel_choices(self): - """Initialize channel selector with Public + To selected.""" + """Initialize channel selector with Public + To selected (DM).""" self._channel_map = {} options = [] + label_dm = "To selected node (DM)" + self._channel_map[label_dm] = {"mode": "selected", "channelIndex": 0} + options.append(label_dm) + label_pub = "Public (broadcast)" self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0} options.append(label_pub) - if hasattr(self, "cbo_channel"): - self.cbo_channel["values"] = options - if hasattr(self, "channel_var"): - self.channel_var.set(label_pub) + self.cbo_channel["values"] = options - def _set_channel_choice(self, label: str): - """Safely set the current channel choice, if it exists.""" - try: - values = list(self.cbo_channel["values"]) - except Exception: - return - if label not in values: - return - self.channel_var.set(label) + # Apply pending default channel choice if it exists now + if self._pending_channel_choice: + if self._pending_channel_choice in options: + self.channel_var.set(self._pending_channel_choice) + self._pending_channel_choice = None + self.channel_var.set(label_pub) def _update_channels_from_iface(self): - """ - Populate channel selector with channels from the connected device. - - We keep: - * "Public (broadcast)" -> broadcast on channel 0 - And then append additional channels (1..N) from the radio as: - * "Ch : " -> broadcast on that channel. - """ + """Populate channel selector with channels from the connected device (broadcast channels).""" iface = getattr(self, "iface", None) if not iface: return @@ -619,12 +792,11 @@ class MeshtasticGUI: if not local_node: return - # Try to request channels if we don't have them yet. chans = getattr(local_node, "channels", None) try: if (not chans) and hasattr(local_node, "requestChannels"): local_node.requestChannels() - time.sleep(1.5) + time.sleep(1.2) chans = getattr(local_node, "channels", None) except Exception: chans = getattr(local_node, "channels", None) @@ -634,7 +806,8 @@ class MeshtasticGUI: except Exception: return - # If channel 0 has a name, update the "Public" label to show it. + # Keep DM option first, keep Public second, then channels 1..N + # Update "Public" label with ch0 name if available. try: if chans and len(chans) > 0: ch0 = chans[0] @@ -646,7 +819,6 @@ class MeshtasticGUI: except Exception: ch0_name = "" if ch0_name: - # Find the existing public entry (mode=broadcast, channelIndex=0) old_label = None for lbl, meta in list(self._channel_map.items()): if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0: @@ -655,44 +827,30 @@ class MeshtasticGUI: if old_label: new_label = f"Public (ch0: {ch0_name})" if new_label != old_label: - # Update mapping self._channel_map[new_label] = self._channel_map.pop(old_label) - # Update combobox options options = [new_label if v == old_label else v for v in options] - # Keep current selection if it was pointing to the old label if self.channel_var.get() == old_label: self.channel_var.set(new_label) except Exception: - # Failing to pretty-print channel 0 is not fatal; just continue. pass - # Add remaining channels (1..N) as broadcast options. for idx, ch in enumerate(chans or []): if idx == 0: - # Skip channel 0 here – it's already represented by "Public". continue try: name = (getattr(ch, "settings", None).name or "").strip() except Exception: - # older protobufs might expose fields differently try: name = (ch.settings.name or "").strip() except Exception: name = "" - if not name: - label = f"Ch {idx}" - else: - label = f"Ch {idx}: {name}" + label = f"Ch {idx}: {name}" if name else f"Ch {idx}" if label in self._channel_map: continue self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx} options.append(label) - try: - self.cbo_channel["values"] = options - except Exception: - pass - + self.cbo_channel["values"] = options def send_message(self): msg = self.ent_message.get().strip() @@ -702,17 +860,13 @@ class MeshtasticGUI: messagebox.showwarning("Not connected", "Connect first.") return - try: - choice = self.channel_var.get() if hasattr(self, "channel_var") else "" - info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or { - "mode": "broadcast", - "channelIndex": 0, - } - mode = info.get("mode", "broadcast") - ch_index = int(info.get("channelIndex", 0) or 0) + choice = self.channel_var.get() or "" + info = self._channel_map.get(choice) or {"mode": "broadcast", "channelIndex": 0} + mode = info.get("mode", "broadcast") + ch_index = int(info.get("channelIndex", 0) or 0) + try: if mode == "selected": - # Direct message to the currently selected node nid = self._get_selected_node_id() if not nid: messagebox.showinfo("No selection", "Select a node first.") @@ -722,15 +876,14 @@ class MeshtasticGUI: messagebox.showerror("Send failed", "Cannot resolve destination for selected node.") return self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index) - self._append("[ME -> %s] %s" % (self._node_label(nid), msg)) + self._append(f"[ME -> {self._node_label(nid)}] {msg}") + self._append_to_node_chat(nid, "[ME] " + msg) else: - # Broadcast on chosen channel (public or private) self.iface.sendText(msg, wantAck=False, channelIndex=ch_index) - label = self.channel_var.get() if hasattr(self, "channel_var") else "" if mode == "broadcast_channel" and ch_index: - self._append("[ME ch%d] %s" % (ch_index, msg)) + self._append(f"[ME ch{ch_index}] {msg}") else: - self._append("[ME] %s" % msg) + self._append(f"[ME] {msg}") self.ent_message.delete(0, "end") except Exception as e: @@ -751,7 +904,7 @@ class MeshtasticGUI: return max(ts_iface, ts_local) return ts_iface or ts_local - def _extract_latlon(self, node: dict) -> tuple[float | None, float | None]: + def _extract_latlon(self, node: dict) -> Tuple[Optional[float], Optional[float]]: pos = (node or {}).get("position") or {} lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i") lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i") @@ -764,8 +917,7 @@ class MeshtasticGUI: lat = lon = None return lat, lon - def _extract_speed_alt(self, node: dict) -> tuple[float | None, float | None]: - """Return (speed_kmh, alt_m) if present in node.position, else (None, None).""" + def _extract_speed_alt(self, node: dict) -> Tuple[Optional[float], Optional[float]]: pos = (node or {}).get("position") or {} speed = ( pos.get("groundSpeedKmh") @@ -780,15 +932,13 @@ class MeshtasticGUI: or pos.get("altitudeI") ) try: - if speed is not None: - speed = float(speed) - if alt is not None: - alt = float(alt) + speed = float(speed) if speed is not None else None + alt = float(alt) if alt is not None else None except Exception: speed, alt = None, None return speed, alt - def _get_local_latlon(self) -> tuple[float | None, float | None]: + def _get_local_latlon(self) -> Tuple[Optional[float], Optional[float]]: if not self.iface: return (None, None) try: @@ -818,6 +968,7 @@ class MeshtasticGUI: if not self.iface or not getattr(self.iface, "nodes", None): return q = self.ent_search.get().strip().lower() + for iid in self.tv_nodes.get_children(""): self.tv_nodes.delete(iid) @@ -834,34 +985,33 @@ class MeshtasticGUI: longname = user.get("longName") or "" hwmodel = user.get("hwModel") or "" role_raw = user.get("role") or "" - role = role_raw if role_raw.strip() else "CLIENT" + role = role_raw if str(role_raw).strip() else "CLIENT" macaddr = user.get("macaddr") or "" publickey = user.get("publicKey") or "" unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False - lastheard_epoch = self._get_lastheard_epoch(node_id, node) + lastheard_epoch = self._get_lastheard_epoch(str(node_id), node) since_str = _fmt_ago(lastheard_epoch) hops = node.get("hopsAway") lat, lon = self._extract_latlon(node) + dist = None if base_lat is not None and base_lon is not None and lat is not None and lon is not None: try: dist = self._haversine_km(base_lat, base_lon, lat, lon) except Exception: dist = None - else: - dist = None - dist_str = "%.1f" % dist if isinstance(dist, (int, float)) else "-" + dist_str = f"{dist:.1f}" if isinstance(dist, (int, float)) else "-" speed, alt = self._extract_speed_alt(node) - speed_str = "%.1f" % speed if isinstance(speed, (int, float)) else "-" - alt_str = "%.0f" % alt if isinstance(alt, (int, float)) else "-" + speed_str = f"{speed:.1f}" if isinstance(speed, (int, float)) else "-" + alt_str = f"{alt:.0f}" if isinstance(alt, (int, float)) else "-" - telem = self._telemetry.get(node_id, {}) if hasattr(self, "_telemetry") else {} + telem = self._telemetry.get(str(node_id), {}) bat = telem.get("battery") volt = telem.get("voltage") - bat_str = "%.0f" % bat if isinstance(bat, (int, float)) else "-" - volt_str = "%.2f" % volt if isinstance(volt, (int, float)) else "-" + bat_str = f"{bat:.0f}" if isinstance(bat, (int, float)) else "-" + volt_str = f"{volt:.2f}" if isinstance(volt, (int, float)) else "-" values = ( shortname, @@ -873,30 +1023,33 @@ class MeshtasticGUI: alt_str, bat_str, volt_str, - "%.0f" % (lastheard_epoch or 0), + f"{(lastheard_epoch or 0):.0f}", hwmodel, role, macaddr, publickey, str(bool(unmsg)), - node_id, + str(node_id), ) if not q or any(q in str(v).lower() for v in values): try: - self.tv_nodes.insert("", "end", iid=node_id, values=values) + self.tv_nodes.insert("", "end", iid=str(node_id), values=values) except Exception: self.tv_nodes.insert("", "end", values=values) + if self._last_sort_col: self.sort_by_column(self._last_sort_col, self._last_sort_reverse) - self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children())) + self.nodes_frame.config(text=f"Nodes ({len(self.tv_nodes.get_children())})") def sort_by_column(self, col: str, reverse: bool = False): self._last_sort_col = col self._last_sort_reverse = reverse + col_to_sort = "lastheard" if col == "since" else col numeric = {"lastheard", "distkm", "hops", "speed", "alt", "battery", "voltage"} + rows = [] for iid in self.tv_nodes.get_children(""): val = self.tv_nodes.set(iid, col_to_sort) @@ -906,11 +1059,13 @@ class MeshtasticGUI: except Exception: val = 0.0 else: - val = val.casefold() + val = str(val).casefold() rows.append((val, iid)) + rows.sort(key=lambda t: t[0], reverse=reverse) for index, (_, iid) in enumerate(rows): self.tv_nodes.move(iid, "", index) + self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse)) # THEME ----------------------------------------------------------- @@ -922,7 +1077,6 @@ class MeshtasticGUI: acc = "#2d2d2d" if is_dark else "#ffffff" sel = "#555555" if is_dark else "#cce0ff" - # Root window background (client area) try: self.root.configure(bg=bg) except Exception: @@ -933,6 +1087,7 @@ class MeshtasticGUI: style.theme_use("clam") except Exception: pass + style.configure("TFrame", background=bg) style.configure("TLabelframe", background=bg, foreground=fg) style.configure("TLabelframe.Label", background=bg, foreground=fg) @@ -949,9 +1104,8 @@ class MeshtasticGUI: style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0) style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)]) - # Menubar itself (stored as self.menubar when created) try: - if hasattr(self, "menubar") and self.menubar is not None: + if self.menubar is not None: self.menubar.configure( background=bg, foreground=fg, @@ -973,259 +1127,43 @@ class MeshtasticGUI: ) except Exception: pass + try: self.root.option_add("*Menu*background", bg) self.root.option_add("*Menu*foreground", fg) self.root.option_add("*Menu*activeBackground", sel) self.root.option_add("*Menu*activeForeground", fg) - # Dark background for ttk.Combobox dropdown list - try: - self.root.option_add("*TCombobox*Listbox*background", acc) - self.root.option_add("*TCombobox*Listbox*foreground", fg) - except Exception: - pass + self.root.option_add("*TCombobox*Listbox*background", acc) + self.root.option_add("*TCombobox*Listbox*foreground", fg) except Exception: pass + # Apply to open chat windows + for win in list(self._per_node_chats.values()): + try: + win.apply_theme() + except Exception: + pass + def _style_toplevel(self, win: tk.Toplevel): is_dark = self.current_theme == "dark" bg = "#1e1e1e" if is_dark else "#f5f5f5" - win.configure(bg=bg) + try: + win.configure(bg=bg) + except Exception: + pass # UTILS / CONTEXT ------------------------------------------------ def _append(self, text: str): - self.txt_messages.insert("end", text + "\n") - self.txt_messages.see("end") + def _do(): + self.txt_messages.insert("end", text + "\n") + self.txt_messages.see("end") + self._ui(_do) def _get_selected_node_id(self) -> Optional[str]: sel = self.tv_nodes.selection() - if not sel: - return None - return sel[0] + return sel[0] if sel else None - - def _cm_traceroute(self): - nid = self._get_selected_node_id() - if not nid: - messagebox.showinfo("Traceroute", "Select a node first.") - return - if not self.iface: - messagebox.showwarning("Traceroute", "Connect first.") - return - dest = self._resolve_node_dest_id(nid) - if not dest: - messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.") - return - self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})") - threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start() - - def _cm_delete_node(self): - nid = self._get_selected_node_id() - if not nid: - messagebox.showinfo("Delete node", "Select a node first.") - return - if not self.iface or not getattr(self.iface, "localNode", None): - messagebox.showwarning("Delete node", "Connect first.") - return - dest = self._resolve_node_dest_id(nid) - if not dest: - messagebox.showerror("Delete node", "Cannot determine node ID.") - return - label = self._node_label(nid) - if not messagebox.askyesno( - "Delete node", - f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\n" - "The device might reboot after this." - ): - return - try: - # Use python-meshtastic Node.removeNode API to remove from NodeDB - # https://python.meshtastic.org/node.html - self.iface.localNode.removeNode(dest) # type: ignore[attr-defined] - self._append(f"[admin] Requested delete of node {label} ({dest})") - except Exception as e: - messagebox.showerror("Delete node", f"Failed to delete node: {e}") - return - - # Also remove from UI for this session - try: - self.tv_nodes.delete(nid) - self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children())) - except Exception: - pass - - - def _resolve_node_dest_id(self, nid: str) -> Optional[str]: - # `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx) - if nid.startswith("!") or nid.isdigit(): - return nid - try: - if self.iface and getattr(self.iface, "nodes", None): - node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined] - user = (node or {}).get("user") or {} - node_id = user.get("id") or "" - if node_id: - return node_id - except Exception: - pass - if nid: - return "!" + nid if not nid.startswith("!") else nid - return None - - def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0): - # Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI. - if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"): - self._do_traceroute_via_interface(dest, hop_limit, channel_index) - else: - self._do_traceroute_via_cli(dest) - - def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int): - evt = threading.Event() - result: Dict[str, Any] = {} - - def _num_to_label(num: int) -> str: - try: - nbn = getattr(self.iface, "nodesByNum", None) - if nbn and num in nbn: - n = nbn[num] - user = (n or {}).get("user") or {} - sid = user.get("id") or f"!{num:08x}" - sn = user.get("shortName") or "" - ln = user.get("longName") or "" - label = (sn or ln or sid).strip() - return f"{label} ({sid})" if sid else label - except Exception: - pass - return f"!{int(num):08x}" - - def _on_response(p: dict): - try: - rd = mesh_pb2.RouteDiscovery() - rd.ParseFromString(p["decoded"]["payload"]) - as_dict = _json_format.MessageToDict(rd) - result["packet"] = p - result["data"] = as_dict - except Exception as e: # pragma: no cover - defensive - result["error"] = str(e) - finally: - evt.set() - - try: - r = mesh_pb2.RouteDiscovery() - # Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients - self.iface.sendData( - r, - destinationId=dest, - portNum=portnums_pb2.PortNum.TRACEROUTE_APP, - wantResponse=True, - onResponse=_on_response, - channelIndex=channel_index, - hopLimit=hop_limit, - ) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}")) - return - - if not evt.wait(30.0): - self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported).")) - return - - if "error" in result: - self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}")) - return - - p = result.get("packet") or {} - data = result.get("data") or {} - UNK = -128 - - try: - origin_num = int(p.get("to")) - dest_num = int(p.get("from")) - except Exception: - origin_num = None - dest_num = None - - def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]: - route_nums = [] - for v in data.get(route_key, []): - try: - route_nums.append(int(v)) - except Exception: - pass - snrs = [] - for v in data.get(snr_key, []): - try: - snrs.append(int(v)) - except Exception: - pass - if not start_num or not end_num: - return None - nodes = [start_num] + route_nums + [end_num] - if len(nodes) <= 1: - return None - parts = [] - for idx, num in enumerate(nodes): - label = _num_to_label(num) - if idx == 0: - parts.append(label) - else: - snr_txt = "? dB" - if (idx - 1) < len(snrs): - v = snrs[idx - 1] - if v != UNK: - snr_txt = f"{v / 4.0:.2f} dB" - parts.append(f"{label} ({snr_txt})") - return title + "\n" + " -> ".join(parts) - - lines = [] - fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num) - if fwd: - lines.append(fwd) - back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num) - if back: - lines.append(back) - - if not lines: - self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available.")) - return - - text = "\n\n".join(lines) - self.root.after(0, lambda: self._show_traceroute_window(text)) - - def _do_traceroute_via_cli(self, dest: str): - host = (self.host_var.get() or "").strip() or HOST_DEFAULT - cmd = ["meshtastic", "--host", host, "--traceroute", dest] - try: - proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40) - except Exception as e: # pragma: no cover - environment specific - self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}")) - return - out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "") - if not out.strip(): - self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute.")) - return - self.root.after(0, lambda: self._show_traceroute_window(out)) - - def _show_traceroute_window(self, text: str): - win = tk.Toplevel(self.root) - win.title("Traceroute") - self._style_toplevel(win) - frm = ttk.Frame(win, padding=8) - frm.pack(expand=True, fill="both") - txt = tk.Text(frm, wrap="word") - txt.pack(expand=True, fill="both") - is_dark = self.current_theme == "dark" - txt.configure( - bg=("#2d2d2d" if is_dark else "#ffffff"), - fg=("#ffffff" if is_dark else "#000000"), - insertbackground=("#ffffff" if is_dark else "#000000"), - ) - txt.insert("1.0", text.strip() or "No traceroute data.") - txt.configure(state="disabled") - def _toggle_send_target(self): - # Legacy helper – kept for compatibility, but no longer used. - # Double-clicking a node now opens the per-node chat window directly. - return def _popup_node_menu(self, event): iid = self.tv_nodes.identify_row(event.y) if iid: @@ -1256,12 +1194,13 @@ class MeshtasticGUI: return node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] win = tk.Toplevel(self.root) - win.title("Node: %s" % self._node_label(nid)) + win.title(f"Node: {self._node_label(nid)}") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") txt = tk.Text(frm, wrap="word") txt.pack(expand=True, fill="both") + is_dark = self.current_theme == "dark" txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), @@ -1269,93 +1208,98 @@ class MeshtasticGUI: insertbackground=("#ffffff" if is_dark else "#000000"), ) - if friendly: - def fmt_val(v, indent=0): - pad = " " * indent - if isinstance(v, dict): - lines = [] - for k, vv in v.items(): - if isinstance(vv, (dict, list)): - lines.append(f"{pad}{k}:") - lines.append(fmt_val(vv, indent + 1)) - else: - lines.append(f"{pad}{k}: {vv}") - return "\n".join(lines) - elif isinstance(v, list): - lines = [] - for i, item in enumerate(v): - if isinstance(item, (dict, list)): - lines.append(f"{pad}- [{i}]") - lines.append(fmt_val(item, indent + 1)) - else: - lines.append(f"{pad}- {item}") - return "\n".join(lines) - else: - return f"{pad}{v}" - - user = (node or {}).get("user") or {} - pos = (node or {}).get("position") or {} - caps = (node or {}).get("capabilities") or {} - config = (node or {}).get("config") or {} - - node_id = user.get("id") or node.get("id") or nid - macaddr = user.get("macaddr") or node.get("macaddr") or "" - publickey = user.get("publicKey") or node.get("publicKey") or "" - hw = user.get("hwModel", "") - - lines = [ - f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(), - f"ID: {node_id}", - f"MAC: {macaddr}", - f"HW: {hw}", - f"Public key: {publickey}", - "", - f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}", - "", - "Position:", - fmt_val(pos, 1), - ] - if caps: - lines.append("Capabilities:") - lines.append(fmt_val(caps, 1)) - if config: - lines.append("Config:") - lines.append(fmt_val(config, 1)) - lines.append("RAW fields:") - skip = {"user", "position", "capabilities", "config"} - other = {k: v for k, v in (node or {}).items() if k not in skip} - lines.append(fmt_val(other, 1)) - txt.insert("1.0", "\n".join(lines)) - else: + if not friendly: txt.insert("1.0", json.dumps(node, indent=2, default=str)) + txt.configure(state="disabled") + return + def fmt_val(v, indent=0): + pad = " " * indent + if isinstance(v, dict): + lines = [] + for k, vv in v.items(): + if isinstance(vv, (dict, list)): + lines.append(f"{pad}{k}:") + lines.append(fmt_val(vv, indent + 1)) + else: + lines.append(f"{pad}{k}: {vv}") + return "\n".join(lines) + if isinstance(v, list): + lines = [] + for i, item in enumerate(v): + if isinstance(item, (dict, list)): + lines.append(f"{pad}- [{i}]") + lines.append(fmt_val(item, indent + 1)) + else: + lines.append(f"{pad}- {item}") + return "\n".join(lines) + return f"{pad}{v}" + + user = (node or {}).get("user") or {} + pos = (node or {}).get("position") or {} + caps = (node or {}).get("capabilities") or {} + config = (node or {}).get("config") or {} + + node_id = user.get("id") or node.get("id") or nid + macaddr = user.get("macaddr") or node.get("macaddr") or "" + publickey = user.get("publicKey") or node.get("publicKey") or "" + hw = user.get("hwModel", "") + + lines = [ + f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(), + f"ID: {node_id}", + f"MAC: {macaddr}", + f"HW: {hw}", + f"Public key: {publickey}", + "", + f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}", + "", + "Position:", + fmt_val(pos, 1), + ] + if caps: + lines.append("Capabilities:") + lines.append(fmt_val(caps, 1)) + if config: + lines.append("Config:") + lines.append(fmt_val(config, 1)) + lines.append("RAW fields:") + skip = {"user", "position", "capabilities", "config"} + other = {k: v for k, v in (node or {}).items() if k not in skip} + lines.append(fmt_val(other, 1)) + + txt.insert("1.0", "\n".join(lines)) txt.configure(state="disabled") def show_neighbors_window(self): if not self.iface or not getattr(self.iface, "nodes", None): messagebox.showinfo("Neighbors", "No interface or nodes available.") return + win = tk.Toplevel(self.root) win.title("Neighbor table") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") + tree = ttk.Treeview(frm, columns=("from", "to", "snr", "lastheard"), show="headings") for col, txt in (("from", "From node"), ("to", "To node"), ("snr", "SNR"), ("lastheard", "Last heard")): tree.heading(col, text=txt) tree.column(col, width=160 if col in ("from", "to") else 90, anchor="w") tree.pack(side="left", expand=True, fill="both") + yscroll = ttk.Scrollbar(frm, orient="vertical", command=tree.yview) tree.configure(yscrollcommand=yscroll.set) yscroll.pack(side="right", fill="y") try: - nodes_snapshot = dict(self.iface.nodes or {}) + nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined] except Exception: nodes_snapshot = {} + rows = 0 for node_id, node in nodes_snapshot.items(): - from_label = self._node_label(node_id) + from_label = self._node_label(str(node_id)) neighbors = (node or {}).get("neighbors") or [] if isinstance(neighbors, dict): neighbors = neighbors.values() @@ -1378,6 +1322,290 @@ class MeshtasticGUI: if rows == 0: messagebox.showinfo("Neighbors", "No neighbor information found on nodes.") + # admin / traceroute --------------------------------------------- + def _resolve_node_dest_id(self, nid: str) -> Optional[str]: + nid = str(nid or "").strip() + if not nid: + return None + if nid.startswith("!") or nid.isdigit(): + return nid + try: + if self.iface and getattr(self.iface, "nodes", None): + node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined] + user = (node or {}).get("user") or {} + node_id = user.get("id") or "" + if node_id: + return str(node_id) + except Exception: + pass + return "!" + nid if not nid.startswith("!") else nid + + def _cm_traceroute(self): + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("Traceroute", "Select a node first.") + return + if not self.iface: + messagebox.showwarning("Traceroute", "Connect first.") + return + dest = self._resolve_node_dest_id(nid) + if not dest: + messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.") + return + self._status_update(info=f"Traceroute -> {self._node_label(nid)} ({dest})") + threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start() + + def _cm_delete_node(self): + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("Delete node", "Select a node first.") + return + if not self.iface or not getattr(self.iface, "localNode", None): + messagebox.showwarning("Delete node", "Connect first.") + return + + dest = self._resolve_node_dest_id(nid) + if not dest: + messagebox.showerror("Delete node", "Cannot determine node ID.") + return + + label = self._node_label(nid) + if not messagebox.askyesno( + "Delete node", + f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\nThe device might reboot after this." + ): + return + + try: + self.iface.localNode.removeNode(dest) # type: ignore[attr-defined] + self._status_update(info=f"Admin: delete requested for {label} ({dest})") + except Exception as e: + messagebox.showerror("Delete node", f"Failed to delete node: {e}") + return + + try: + self.tv_nodes.delete(nid) + self.nodes_frame.config(text=f"Nodes ({len(self.tv_nodes.get_children())})") + except Exception: + pass + + def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0): + if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"): + self._do_traceroute_via_interface(dest, hop_limit, channel_index) + else: + self._do_traceroute_via_cli(dest) + + def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int): + evt = threading.Event() + result: Dict[str, Any] = {} + + def _num_to_label(num: int) -> str: + try: + nbn = getattr(self.iface, "nodesByNum", None) + if nbn and num in nbn: + n = nbn[num] + user = (n or {}).get("user") or {} + sid = user.get("id") or f"!{num:08x}" + sn = user.get("shortName") or "" + ln = user.get("longName") or "" + label = (sn or ln or sid).strip() + return f"{label} ({sid})" if sid else label + except Exception: + pass + return f"!{int(num):08x}" + + def _on_response(p: dict): + try: + rd = mesh_pb2.RouteDiscovery() + rd.ParseFromString(p["decoded"]["payload"]) + result["packet"] = p + result["data"] = _json_format.MessageToDict(rd) + except Exception as e: + result["error"] = str(e) + finally: + evt.set() + + r = mesh_pb2.RouteDiscovery() + try: + # Some meshtastic-python versions don't expose hopLimit as a sendData kwarg. + self.iface.sendData( + r, + destinationId=dest, + portNum=portnums_pb2.PortNum.TRACEROUTE_APP, + wantResponse=True, + onResponse=_on_response, + channelIndex=channel_index, + hopLimit=hop_limit, + ) + except TypeError: + self.iface.sendData( + r, + destinationId=dest, + portNum=portnums_pb2.PortNum.TRACEROUTE_APP, + wantResponse=True, + onResponse=_on_response, + channelIndex=channel_index, + ) + except Exception as e: + self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}")) + return + + if not evt.wait(30.0): + self._ui(lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported).")) + return + + if "error" in result: + self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}")) + return + + p = result.get("packet") or {} + data = result.get("data") or {} + UNK = -128 + + try: + origin_num = int(p.get("to")) + dest_num = int(p.get("from")) + except Exception: + origin_num = None + dest_num = None + + def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]: + route_nums = [] + for v in data.get(route_key, []): + try: + route_nums.append(int(v)) + except Exception: + pass + snrs = [] + for v in data.get(snr_key, []): + try: + snrs.append(int(v)) + except Exception: + pass + + if start_num is None or end_num is None: + return None + nodes = [start_num] + route_nums + [end_num] + if len(nodes) <= 1: + return None + + parts = [] + for idx, num in enumerate(nodes): + label = _num_to_label(num) + if idx == 0: + parts.append(label) + else: + snr_txt = "? dB" + if (idx - 1) < len(snrs): + v = snrs[idx - 1] + if v != UNK: + snr_txt = f"{v / 4.0:.2f} dB" + parts.append(f"{label} ({snr_txt})") + return title + "\n" + " -> ".join(parts) + + lines = [] + fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num) + if fwd: + lines.append(fwd) + back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num) + if back: + lines.append(back) + + if not lines: + self._ui(lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available.")) + return + + text = "\n\n".join(lines) + self._ui(lambda: self._show_traceroute_window(text)) + + def _do_traceroute_via_cli(self, dest: str): + host = (self.host_var.get() or "").strip() or HOST_DEFAULT + cmd = ["meshtastic", "--host", host, "--traceroute", dest] + try: + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40) + except Exception as e: + self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}")) + return + out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "") + if not out.strip(): + self._ui(lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute.")) + return + self._ui(lambda: self._show_traceroute_window(out)) + + def _show_traceroute_window(self, text: str): + win = tk.Toplevel(self.root) + win.title("Traceroute") + self._style_toplevel(win) + frm = ttk.Frame(win, padding=8) + frm.pack(expand=True, fill="both") + txt = tk.Text(frm, wrap="word") + txt.pack(expand=True, fill="both") + is_dark = self.current_theme == "dark" + txt.configure( + bg=("#2d2d2d" if is_dark else "#ffffff"), + fg=("#ffffff" if is_dark else "#000000"), + insertbackground=("#ffffff" if is_dark else "#000000"), + ) + txt.insert("1.0", text.strip() or "No traceroute data.") + txt.configure(state="disabled") + + # chat windows ---------------------------------------------------- + def _cm_open_chat(self): + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("Chat", "Select a node first.") + return + self._open_node_chat(nid) + + def _open_node_chat(self, nid: str): + key = str(nid) + if key in self._per_node_chats: + win = self._per_node_chats[key] + try: + win.top.deiconify() + win.top.lift() + except Exception: + pass + return + label = self._node_label(nid) + chat = NodeChatWindow(self, key, label) + self._per_node_chats[key] = chat + + def _append_to_node_chat(self, node_id: str, line: str): + key = str(node_id) + win = self._per_node_chats.get(key) + if not win: + return + win.append(line) + + def _send_text_to_node(self, node_key: str, msg: str) -> bool: + if not self.iface: + messagebox.showwarning("Send", "Connect first.") + return False + msg = (msg or "").strip() + if not msg: + return False + + dest_id = self._resolve_node_dest_id(node_key) + if not dest_id: + messagebox.showerror("Send failed", "Cannot resolve destinationId for this node.") + return False + + # Use currently selected channelIndex (so DM uses the same channel selection) + choice = self.channel_var.get() or "" + info = self._channel_map.get(choice) or {"mode": "broadcast", "channelIndex": 0} + ch_index = int(info.get("channelIndex", 0) or 0) + + try: + self.iface.sendText(msg, destinationId=dest_id, wantAck=False, channelIndex=ch_index) + self._append(f"[ME -> {self._node_label(node_key)}] {msg}") + self._append_to_node_chat(node_key, "[ME] " + msg) + return True + except Exception as e: + messagebox.showerror("Send failed", str(e)) + return False + + # config viewers/editors ----------------------------------------- def show_radio_config_window(self): if not self.iface or not getattr(self.iface, "localNode", None): messagebox.showinfo("Radio config", "Connect to a device first.") @@ -1393,8 +1621,9 @@ class MeshtasticGUI: if cfg is None and mod is None: messagebox.showinfo("Radio config", "No config available yet from device.") return + win = tk.Toplevel(self.root) - win.title("Radio + module config (read‑only)") + win.title("Radio + module config (read-only)") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") @@ -1403,12 +1632,14 @@ class MeshtasticGUI: yscroll = ttk.Scrollbar(frm, orient="vertical", command=txt.yview) txt.configure(yscrollcommand=yscroll.set) yscroll.pack(side="right", fill="y") + is_dark = self.current_theme == "dark" txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), fg=("#ffffff" if is_dark else "#000000"), insertbackground=("#ffffff" if is_dark else "#000000"), ) + lines = [] if cfg is not None: lines.append("localConfig:") @@ -1509,7 +1740,7 @@ class MeshtasticGUI: elif hasattr(ch, "name"): ch.name = new_name ln.writeChannel(i) - self._append(f"[admin] Renamed channel {i} to '{new_name}'") + self._status_update(info=f"Admin: renamed channel {i} to '{new_name}'") self._update_channels_from_iface() listbox.delete(i) label = f"{i}: {new_name or '(no name)'}" @@ -1517,70 +1748,411 @@ class MeshtasticGUI: except Exception as e: messagebox.showerror("Channel editor", f"Failed to write channel: {e}") - btn_save = ttk.Button(frm, text="Save name to device", command=save_name) - btn_save.grid(row=3, column=0, sticky="w", pady=(6, 0)) + ttk.Button(frm, text="Save name to device", command=save_name).grid(row=3, column=0, sticky="w", pady=(6, 0)) - def _cm_open_chat(self): - nid = self._get_selected_node_id() - if not nid: - messagebox.showinfo("Chat", "Select a node first.") + + # SETTINGS (persist to ./meshtastic_client_settings.json) ------------------- + def _settings_path(self) -> pathlib.Path: + """Return the settings file path in the current working directory ('.').""" + return pathlib.Path.cwd() / CONFIG_FILENAME + + def load_settings(self, silent: bool = False) -> None: + """Load settings from JSON if the file exists.""" + path = self._settings_path() + if not path.exists(): + # Apply defaults if no config file + if not self._startup_geometry_applied: + try: + self.root.geometry(DEFAULT_GEOMETRY) + self._startup_geometry_applied = True + except Exception: + pass return - self._open_node_chat(nid) - def _open_node_chat(self, nid: str): - key = str(nid) - if key in self._per_node_chats: - win = self._per_node_chats[key] + try: + data = json.loads(path.read_text(encoding="utf-8")) + except Exception as e: + if not silent: + messagebox.showerror("Settings", f"Failed to read settings file:\n{path}\n\n{e}") + return + + # Host/port + try: + host = str(data.get("host", "")).strip() + if host: + self.host_var.set(host) + except Exception: + pass + try: + port = int(data.get("port", PORT_DEFAULT)) + if 1 <= port <= 65535: + self.port_var.set(port) + except Exception: + pass + + # Connection method and params + try: + ctype = str(data.get("connection_type", "")).strip().lower() + if ctype in ("tcp", "serial", "ble", "bluetooth"): + self.conn_type_var.set("ble" if ctype == "bluetooth" else ctype) + except Exception: + pass + try: + sp = str(data.get("serial_port", "")).strip() + self.serial_port_var.set(sp) + except Exception: + pass + try: + ba = str(data.get("ble_address", "")).strip() + self.ble_addr_var.set(ba) + except Exception: + pass + + # Theme + try: + theme = str(data.get("theme", "")).strip().lower() + if theme in ("light", "dark"): + self.current_theme = theme + except Exception: + pass + + # Default send target / channel choice (label string) + try: + ch_choice = str(data.get("channel_choice", "")).strip() + if ch_choice: + # only apply if exists now; otherwise keep pending until channels are loaded + try: + values = list(self.cbo_channel["values"]) + except Exception: + values = [] + if ch_choice in values: + self.channel_var.set(ch_choice) + else: + self._pending_channel_choice = ch_choice + except Exception: + pass + + # Window geometry + try: + geom = str(data.get("window_geometry", "")).strip() + if geom: + self.root.geometry(geom) + self._startup_geometry_applied = True + except Exception: + pass + + # Sash fraction + try: + frac = float(data.get("sash_fraction", DEFAULT_SASH_FRACTION)) + if 0.10 <= frac <= 0.90: + self._startup_sash_fraction = frac + except Exception: + pass + + self._update_title_with_host() + if not silent: + self._status_update(info=f"Settings loaded from {path}") + + def save_settings(self, silent: bool = False) -> None: + """Save settings to JSON in the current working directory ('.').""" + path = self._settings_path() + + # Best-effort sash fraction + sash_fraction = DEFAULT_SASH_FRACTION + try: + w = self.paned.winfo_width() or self.paned.winfo_reqwidth() or 1 + pos = self.paned.sashpos(0) + if w > 0: + sash_fraction = max(0.10, min(0.90, float(pos) / float(w))) + except Exception: + pass + + payload = { + "version": 2, + "host": (self.host_var.get() or "").strip(), + "port": int(self.port_var.get() or PORT_DEFAULT), + "connection_type": (self.conn_type_var.get() or "tcp").strip().lower(), + "serial_port": (self.serial_port_var.get() or "").strip(), + "ble_address": (self.ble_addr_var.get() or "").strip(), + "theme": self.current_theme, + "channel_choice": (self.channel_var.get() or "").strip(), + "window_geometry": self.root.winfo_geometry(), + "sash_fraction": sash_fraction, + } + + try: + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps(payload, indent=2), encoding="utf-8") + tmp.replace(path) + if not silent: + messagebox.showinfo("Settings", f"Saved settings to:\n{path}") + except Exception as e: + if not silent: + messagebox.showerror("Settings", f"Failed to save settings to:\n{path}\n\n{e}") + + def open_settings_dialog(self) -> None: + """Open a Settings window (loads/saves JSON in the current working dir).""" + win = tk.Toplevel(self.root) + win.title("Settings") + self._style_toplevel(win) + + frm = ttk.Frame(win, padding=10) + frm.grid(row=0, column=0, sticky="nsew") + win.rowconfigure(0, weight=1) + win.columnconfigure(0, weight=1) + + nb = ttk.Notebook(frm) + nb.grid(row=0, column=0, sticky="nsew") + frm.rowconfigure(0, weight=1) + frm.columnconfigure(0, weight=1) + + tab_general = ttk.Frame(nb, padding=10) + tab_conn = ttk.Frame(nb, padding=10) + tab_file = ttk.Frame(nb, padding=10) + nb.add(tab_general, text="General") + nb.add(tab_conn, text="Connection") + nb.add(tab_file, text="File") + + # --- General tab ------------------------------------------------ + theme_var = tk.StringVar(value=self.current_theme) + + ttk.Label(tab_general, text="Theme:").grid(row=0, column=0, sticky="w", pady=4) + cbo_theme = ttk.Combobox(tab_general, textvariable=theme_var, state="readonly", values=["dark", "light"], width=10) + cbo_theme.grid(row=0, column=1, sticky="w", pady=4, padx=6) + + ttk.Label(tab_general, text="Default send target:").grid(row=1, column=0, sticky="w", pady=4) + choice_var = tk.StringVar(value=self.channel_var.get()) + try: + choices = list(self.cbo_channel["values"]) + except Exception: + choices = [] + cbo_choice = ttk.Combobox(tab_general, textvariable=choice_var, state="readonly", values=choices, width=28) + cbo_choice.grid(row=1, column=1, sticky="w", pady=4, padx=6) + + tab_general.columnconfigure(1, weight=1) + + # --- Connection tab -------------------------------------------- + # Pretty labels in UI, but we store keys: tcp|serial|ble + key_to_label = {"tcp": "TCP (WiFi)", "serial": "USB/Serial", "ble": "Bluetooth (BLE)"} + label_to_key = {v: k for k, v in key_to_label.items()} + + method_label_var = tk.StringVar(value=key_to_label.get((self.conn_type_var.get() or "tcp").strip().lower(), "TCP (WiFi)")) + + ttk.Label(tab_conn, text="Method:").grid(row=0, column=0, sticky="w", pady=4) + cbo_method = ttk.Combobox(tab_conn, textvariable=method_label_var, state="readonly", + values=[key_to_label["tcp"], key_to_label["serial"], key_to_label["ble"]], + width=18) + cbo_method.grid(row=0, column=1, sticky="w", pady=4, padx=6) + + # TCP settings + tcp_box = ttk.Labelframe(tab_conn, text="TCP settings", padding=10) + ttk.Label(tcp_box, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4) + ent_host = ttk.Entry(tcp_box, textvariable=self.host_var, width=32) + ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=6) + + ttk.Label(tcp_box, text="Port:").grid(row=1, column=0, sticky="w", pady=4) + ent_port = ttk.Entry(tcp_box, textvariable=self.port_var, width=10) + ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=6) + + tcp_box.columnconfigure(1, weight=1) + + # Serial settings + serial_box = ttk.Labelframe(tab_conn, text="USB/Serial settings", padding=10) + serial_port_ui = tk.StringVar(value=(self.serial_port_var.get() or "").strip() or "(auto)") + + ttk.Label(serial_box, text="Serial port:").grid(row=0, column=0, sticky="w", pady=4) + + cbo_serial = ttk.Combobox(serial_box, textvariable=serial_port_ui, state="readonly", width=30) + cbo_serial.grid(row=0, column=1, sticky="w", pady=4, padx=6) + + def refresh_serial_ports(): + ports = [] + if list_ports: + try: + ports = [p.device for p in list_ports.comports()] + except Exception: + ports = [] + vals = ["(auto)"] + ports + # Ensure saved/manual value is visible + cur = (serial_port_ui.get() or "").strip() + if cur and cur not in vals: + vals.append(cur) + cbo_serial["values"] = vals + if not cur: + serial_port_ui.set("(auto)") + + ttk.Button(serial_box, text="Refresh", command=refresh_serial_ports).grid(row=0, column=2, padx=(6, 0), sticky="w") + refresh_serial_ports() + + # BLE settings + ble_box = ttk.Labelframe(tab_conn, text="Bluetooth (BLE) settings", padding=10) + ble_addr_ui = tk.StringVar(value=(self.ble_addr_var.get() or "").strip()) + + ttk.Label(ble_box, text="Device (name or address):").grid(row=0, column=0, sticky="w", pady=4) + ent_ble = ttk.Entry(ble_box, textvariable=ble_addr_ui, width=36) + ent_ble.grid(row=0, column=1, sticky="ew", pady=4, padx=6) + + def scan_ble_into_settings(): + if BLEInterface is None: + messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).") + return try: - win.top.deiconify() - win.top.lift() + devices = BLEInterface.scan() + except Exception as e: + messagebox.showerror("BLE scan failed", str(e)) + return + if not devices: + messagebox.showinfo("BLE", "No devices found.") + return + options = [ + f"{i + 1}. {getattr(d, 'name', '') or '(unnamed)'} [{getattr(d, 'address', '?')}]" + for i, d in enumerate(devices) + ] + choice = simpledialog.askinteger( + "Select BLE device", + "Enter number:\n" + "\n".join(options), + minvalue=1, + maxvalue=len(devices), + ) + if not choice: + return + addr = getattr(devices[choice - 1], "address", None) + if not addr: + messagebox.showerror("BLE", "Selected device has no address.") + return + ble_addr_ui.set(str(addr)) + + ttk.Button(ble_box, text="Scan...", command=scan_ble_into_settings).grid(row=0, column=2, padx=(6, 0), sticky="w") + ble_box.columnconfigure(1, weight=1) + + # place boxes (we show/hide depending on method) + tcp_box.grid(row=1, column=0, columnspan=3, sticky="ew", pady=(10, 6)) + serial_box.grid(row=2, column=0, columnspan=3, sticky="ew", pady=6) + ble_box.grid(row=3, column=0, columnspan=3, sticky="ew", pady=6) + + tab_conn.columnconfigure(1, weight=1) + + def _show_only(key: str): + key = key.strip().lower() + if key == "serial": + tcp_box.grid_remove() + ble_box.grid_remove() + serial_box.grid() + elif key == "ble": + tcp_box.grid_remove() + serial_box.grid_remove() + ble_box.grid() + else: + serial_box.grid_remove() + ble_box.grid_remove() + tcp_box.grid() + + def _on_method_change(*_): + key = label_to_key.get(method_label_var.get(), "tcp") + _show_only(key) + + cbo_method.bind("<>", _on_method_change) + _show_only(label_to_key.get(method_label_var.get(), "tcp")) + + # --- File tab --------------------------------------------------- + path = self._settings_path() + ttk.Label(tab_file, text="Settings file:").grid(row=0, column=0, sticky="w") + lbl_path = ttk.Entry(tab_file, width=60) + lbl_path.grid(row=1, column=0, sticky="ew", pady=(4, 10)) + lbl_path.insert(0, str(path)) + lbl_path.configure(state="readonly") + tab_file.columnconfigure(0, weight=1) + + def open_folder(): + folder = str(path.parent) + try: + if os.name == "nt": + os.startfile(folder) # type: ignore[attr-defined] + else: + subprocess.Popen(["xdg-open", folder]) + except Exception as e: + messagebox.showerror("Open folder", str(e)) + + ttk.Button(tab_file, text="Open folder", command=open_folder).grid(row=2, column=0, sticky="w") + + # --- Buttons ---------------------------------------------------- + btnbar = ttk.Frame(frm) + btnbar.grid(row=1, column=0, sticky="e", pady=(10, 0)) + + def apply_and_save(): + # Validate port if TCP + method_key = label_to_key.get(method_label_var.get(), "tcp") + if method_key == "tcp": + h = (self.host_var.get() or "").strip() + if not h: + messagebox.showerror("Settings", "Host/IP cannot be empty for TCP.") + return + try: + p = int(self.port_var.get()) + if not (1 <= p <= 65535): + raise ValueError + except Exception: + messagebox.showerror("Settings", "Port must be 1-65535") + return + + # Persist connection settings + self.conn_type_var.set(method_key) + sp = (serial_port_ui.get() or "").strip() + self.serial_port_var.set("" if sp.lower() == "(auto)" else sp) + self.ble_addr_var.set((ble_addr_ui.get() or "").strip()) + + # Theme + self.current_theme = theme_var.get() + self.apply_theme(self.current_theme) + + # Default send target + selected_choice = (choice_var.get() or "").strip() + if selected_choice: + try: + values_now = list(self.cbo_channel["values"]) + except Exception: + values_now = [] + if selected_choice in values_now: + self.channel_var.set(selected_choice) + else: + self._pending_channel_choice = selected_choice + + self.save_settings(silent=False) + + def do_load(): + self.load_settings(silent=False) + theme_var.set(self.current_theme) + choice_var.set(self.channel_var.get()) + # Refresh method + params + method_label_var.set(key_to_label.get((self.conn_type_var.get() or "tcp").strip().lower(), "TCP (WiFi)")) + serial_port_ui.set((self.serial_port_var.get() or "").strip() or "(auto)") + ble_addr_ui.set((self.ble_addr_var.get() or "").strip()) + refresh_serial_ports() + _on_method_change() + + ttk.Button(btnbar, text="Load", command=do_load).grid(row=0, column=0, padx=4) + ttk.Button(btnbar, text="Save", command=apply_and_save).grid(row=0, column=1, padx=4) + ttk.Button(btnbar, text="Close", command=win.destroy).grid(row=0, column=2, padx=4) + + def close_app(self) -> None: + """Graceful close: save settings, disconnect, then destroy.""" + try: + self.save_settings(silent=True) + except Exception: + pass + try: + self.disconnect() + finally: + try: + self.root.destroy() except Exception: pass - return - label = self._node_label(nid) - chat = NodeChatWindow(self, key, label) - self._per_node_chats[key] = chat - - def _append_to_node_chat(self, node_id: str, line: str): - key = str(node_id) - if not key: - return - win = self._per_node_chats.get(key) - if not win: - return - win.append(line) - - def _send_text_to_node(self, dest_id: str, msg: str) -> bool: - if not self.iface: - messagebox.showwarning("Send", "Connect first.") - return False - msg = (msg or "").strip() - if not msg: - return False - try: - choice = self.channel_var.get() if hasattr(self, "channel_var") else "" - info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or { - "mode": "broadcast", - "channelIndex": 0, - } - ch_index = int(info.get("channelIndex", 0) or 0) - except Exception: - ch_index = 0 - try: - self.iface.sendText(msg, destinationId=dest_id, wantAck=False, channelIndex=ch_index) - self._append("[ME -> %s] %s" % (self._node_label(dest_id), msg)) - self._append_to_node_chat(dest_id, "[ME] " + msg) - return True - except Exception as e: - messagebox.showerror("Send failed", str(e)) - return False - class NodeChatWindow: - def __init__(self, app: "MeshtasticGUI", node_id: str, label: str): + def __init__(self, app: MeshtasticGUI, node_key: str, label: str): self.app = app - self.node_id = node_id + self.node_key = node_key self.label = label self.top = tk.Toplevel(app.root) self.top.title(f"Chat: {label}") @@ -1598,19 +2170,21 @@ class NodeChatWindow: self.txt.configure(yscrollcommand=yscroll.set) yscroll.grid(row=0, column=2, sticky="ns") - is_dark = app.current_theme == "dark" + self.entry = ttk.Entry(frm) + self.entry.grid(row=1, column=0, sticky="nsew", pady=(6, 0)) + self.entry.bind("", self._on_send) + ttk.Button(frm, text="Send", command=self._on_send).grid(row=1, column=1, sticky="e", padx=(4, 0), pady=(6, 0)) + + self.apply_theme() + + def apply_theme(self): + is_dark = self.app.current_theme == "dark" self.txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), fg=("#ffffff" if is_dark else "#000000"), insertbackground=("#ffffff" if is_dark else "#000000"), ) - self.entry = ttk.Entry(frm) - self.entry.grid(row=1, column=0, sticky="nsew", pady=(6, 0)) - self.entry.bind("", self._on_send) - btn = ttk.Button(frm, text="Send", command=self._on_send) - btn.grid(row=1, column=1, sticky="e", padx=(4, 0), pady=(6, 0)) - def append(self, line: str): try: self.txt.insert("end", line + "\n") @@ -1622,22 +2196,21 @@ class NodeChatWindow: msg = self.entry.get().strip() if not msg: return - ok = self.app._send_text_to_node(self.node_id, msg) + ok = self.app._send_text_to_node(self.node_key, msg) if ok: self.entry.delete(0, "end") def _on_close(self): - key = str(self.node_id) try: - if key in self.app._per_node_chats: - del self.app._per_node_chats[key] + self.app._per_node_chats.pop(str(self.node_key), None) except Exception: pass self.top.destroy() + + def main(): app = MeshtasticGUI() - app.root.geometry("1500x820") - app.root.protocol("WM_DELETE_WINDOW", lambda: (app.disconnect(), app.root.destroy())) + app.root.protocol("WM_DELETE_WINDOW", app.close_app) app.root.mainloop()