# -*- coding: utf-8 -*- #!/usr/bin/env python3 """ Meshtastic Client (Tkinter) Improvements vs previous version: - Thread-safe UI updates (no Tk calls from background threads) - De-duplicated receive handler (only processes each packet once) - Added "To selected node (DM)" send target option - Per-node chat now resolves destinationId correctly before sending - Connection "connected" fallback when pubsub isn't available - Theme changes apply to open chat windows too """ from __future__ import annotations import datetime import json import logging import os import pathlib import subprocess import threading import time import tkinter as tk import webbrowser from tkinter import messagebox, simpledialog, ttk from typing import Any, Dict, Optional, Tuple # Optional deps ------------------------------------------------------- try: from pubsub import pub except Exception: pub = None try: from meshtastic.tcp_interface import TCPInterface except Exception: TCPInterface = None try: from meshtastic.serial_interface import SerialInterface except Exception: SerialInterface = None try: from meshtastic.ble_interface import BLEInterface except Exception: BLEInterface = None try: from meshtastic.protobuf import mesh_pb2, portnums_pb2, telemetry_pb2 import google.protobuf.json_format as _json_format except Exception: mesh_pb2 = None portnums_pb2 = None telemetry_pb2 = None _json_format = None try: from serial.tools import list_ports except Exception: list_ports = None # Defaults ------------------------------------------------------------ HOST_DEFAULT = "192.168.0.156" PORT_DEFAULT = 4403 PROJECT_PATH = pathlib.Path(__file__).resolve().parent ICON_PATH = PROJECT_PATH / "meshtastic.ico" CONFIG_FILENAME = "meshtastic_client_settings.json" DEFAULT_GEOMETRY = "1500x820" DEFAULT_SASH_FRACTION = 0.40 logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("meshtastic-client") def _prefer_chrome(url: str) -> None: """Open URL (prefer Chrome on Windows, otherwise default browser).""" chrome_paths = [ r"C:\Program Files\Google\Chrome\Application\chrome.exe", r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe", ] for p in chrome_paths: if os.path.exists(p): subprocess.Popen([p, url]) return webbrowser.open(url) def _fmt_ago(epoch_seconds: Optional[float]) -> str: if not epoch_seconds: return "N/A" try: delta = time.time() - float(epoch_seconds) except Exception: return "N/A" if delta < 0: delta = 0 mins = int(delta // 60) hours = int(delta // 3600) days = int(delta // 86400) if delta < 60: return f"{int(delta)}s" if mins < 60: return f"{mins}m" if hours < 24: return f"{hours}h" if days < 7: return f"{days}d" dt = datetime.datetime.fromtimestamp(epoch_seconds) return dt.strftime("%Y-%m-%d %H:%M") class MeshtasticGUI: def __init__(self, master: Optional[tk.Tk] = None): self.root = master or tk.Tk() self.root.title("Meshtastic Client") try: if ICON_PATH.exists(): self.root.iconbitmap(default=str(ICON_PATH)) except Exception: pass self.current_theme = "dark" self._pending_channel_choice: Optional[str] = None self._startup_sash_fraction: float = DEFAULT_SASH_FRACTION self._startup_geometry_applied: bool = False self.root.rowconfigure(0, weight=1) self.root.columnconfigure(0, weight=1) self.host_var = tk.StringVar(value=HOST_DEFAULT) self.port_var = tk.IntVar(value=PORT_DEFAULT) # Connection settings (persisted) # conn_type: "tcp" | "serial" | "ble" self.conn_type_var = tk.StringVar(value="tcp") self.serial_port_var = tk.StringVar(value="") # empty = auto self.ble_addr_var = tk.StringVar(value="") # State self.iface: Optional[object] = None self.connected_evt = threading.Event() self._last_seen_overrides: Dict[str, float] = {} self._last_sort_col: Optional[str] = "since" self._last_sort_reverse: bool = True self._telemetry: Dict[str, Dict[str, Any]] = {} self._per_node_chats: Dict[str, "NodeChatWindow"] = {} self._ui_thread_id = threading.get_ident() # Menu -------------------------------------------------------- self.menubar = tk.Menu(self.root) m_conn = tk.Menu(self.menubar, tearoff=False) m_conn.add_command(label="Connect (Saved settings)", command=self.connect_saved) m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp) m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog) m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog) m_conn.add_command(label="Disconnect", command=self.disconnect) m_conn.add_separator() m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port) self.menubar.add_cascade(label="Connection", menu=m_conn) m_tools = tk.Menu(self.menubar, tearoff=False) m_tools.add_command(label="Clear messages", command=self.clear_messages) m_tools.add_separator() m_tools.add_command(label="Settings...", command=self.open_settings_dialog) self.menubar.add_cascade(label="Tools", menu=m_tools) m_view = tk.Menu(self.menubar, tearoff=False) m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light")) m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark")) m_view.add_separator() m_view.add_command(label="Neighbor table", command=self.show_neighbors_window) m_view.add_command(label="Radio config (view)", command=self.show_radio_config_window) m_view.add_command(label="Channel editor", command=self.show_channel_editor_window) self.menubar.add_cascade(label="View", menu=m_view) m_links = tk.Menu(self.menubar, tearoff=False) m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client")) m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/")) m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/")) m_links.add_command(label="Meshtastic Web Client", command=lambda: self._open_browser_url("https://client.meshtastic.org")) m_links.add_command(label="Meshtastic docker client", command=lambda: self._open_browser_url("https://meshtastic.org/docs/software/linux/usage/#usage-with-docker")) m_links.add_separator() m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/")) m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/")) self.menubar.add_cascade(label="Links", menu=m_links) self.root.config(menu=self.menubar) # Layout ------------------------------------------------------ self.rootframe = ttk.Frame(self.root) self.rootframe.grid(row=0, column=0, sticky="nsew") self.rootframe.rowconfigure(0, weight=1) self.rootframe.rowconfigure(1, weight=0) self.rootframe.columnconfigure(0, weight=1) self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal") self.paned.grid(row=0, column=0, sticky="nsew") # Status bar (keeps non-chat info out of the Messages area) self.status_conn_var = tk.StringVar(value="Conn: Disconnected") self.status_pos_var = tk.StringVar(value="POS: -") self.status_tel_var = tk.StringVar(value="TEL: -") self.status_info_var = tk.StringVar(value="") self.statusbar = ttk.Frame(self.rootframe, padding=(6, 2)) self.statusbar.grid(row=1, column=0, sticky="ew") self.statusbar.columnconfigure(3, weight=1) ttk.Label(self.statusbar, textvariable=self.status_conn_var).grid(row=0, column=0, sticky="w", padx=(0, 12)) ttk.Label(self.statusbar, textvariable=self.status_pos_var).grid(row=0, column=1, sticky="w", padx=(0, 12)) ttk.Label(self.statusbar, textvariable=self.status_tel_var).grid(row=0, column=2, sticky="w", padx=(0, 12)) ttk.Label(self.statusbar, textvariable=self.status_info_var).grid(row=0, column=3, sticky="ew") # Messages pane self.msg_frame = ttk.Frame(self.paned) self.msg_frame.rowconfigure(1, weight=1) self.msg_frame.columnconfigure(0, weight=1) ttk.Label(self.msg_frame, text="Messages").grid(row=0, column=0, sticky="w", pady=(2, 0)) self.txt_messages = tk.Text(self.msg_frame, wrap="word") self.txt_messages.grid(row=1, column=0, sticky="nsew", padx=(0, 4), pady=(2, 2)) yscroll_left = ttk.Scrollbar(self.msg_frame, orient="vertical", command=self.txt_messages.yview) self.txt_messages.configure(yscrollcommand=yscroll_left.set) yscroll_left.grid(row=1, column=1, sticky="ns") self.send_frame = ttk.Frame(self.msg_frame) self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4)) self.send_frame.columnconfigure(0, weight=1) self.ent_message = ttk.Entry(self.send_frame) self.ent_message.grid(row=0, column=0, sticky="nsew") self.ent_message.bind("", lambda e: self.send_message()) self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message) self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew") # Channel selector self.channel_var = tk.StringVar() self._channel_map: Dict[str, Dict[str, Any]] = {} self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=26) self._reset_channel_choices() self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w") # Nodes pane self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)") self.nodes_frame.rowconfigure(1, weight=1) self.nodes_frame.columnconfigure(0, weight=1) self.ent_search = ttk.Entry(self.nodes_frame) self.ent_search.grid(row=0, column=0, sticky="nsew", padx=2, pady=2) self.ent_search.bind("", lambda e: self.refresh_nodes()) self.cols_all = ( "shortname", "longname", "since", "hops", "distkm", "speed", "alt", "battery", "voltage", "lastheard", "hwmodel", "role", "macaddr", "publickey", "isunmessagable", "id" ) self.cols_visible = ( "shortname", "longname", "since", "hops", "distkm", "speed", "alt", "battery", "voltage", "hwmodel", "role" ) self.tv_nodes = ttk.Treeview( self.nodes_frame, columns=self.cols_all, show="headings", displaycolumns=self.cols_visible ) self.tv_nodes.grid(row=1, column=0, sticky="nsew", padx=(2, 0), pady=(0, 2)) headings = { "shortname": "Short", "longname": "Long", "since": "Since", "hops": "Hops", "distkm": "Dist (km)", "speed": "Speed", "alt": "Alt (m)", "battery": "Batt %", "voltage": "Volt", "hwmodel": "HW", "role": "Role", "lastheard": "", "macaddr": "MAC", "publickey": "Public key", "isunmessagable": "Unmsg?", "id": "ID", } for key, text in headings.items(): self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False)) widths = { "shortname": 90, "longname": 200, "since": 90, "hops": 50, "distkm": 70, "speed": 70, "alt": 70, "battery": 70, "voltage": 80, "hwmodel": 90, "role": 110, } for key, w in widths.items(): try: self.tv_nodes.column( key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt", "battery", "voltage")), ) except Exception: pass # Hide technical columns for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"): try: self.tv_nodes.column(col, width=0, minwidth=0, stretch=False) except Exception: pass yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview) self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set) yscroll_nodes.grid(row=1, column=1, sticky="ns") # Node context menu self.node_menu = tk.Menu(self.nodes_frame, tearoff=False) self.node_menu.add_command(label="Node info", command=self._cm_show_node_details) self.node_menu.add_command(label="Map", command=self._cm_open_map) self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute) self.node_menu.add_command(label="Chat with node", command=self._cm_open_chat) self.node_menu.add_separator() self.node_menu.add_command(label="Delete node", command=self._cm_delete_node) self.tv_nodes.bind("", self._popup_node_menu) self.tv_nodes.bind("", lambda e: self._cm_open_chat()) self.paned.add(self.msg_frame, weight=3) self.paned.add(self.nodes_frame, weight=4) self.root.after(100, lambda: self._safe_set_sash(self._startup_sash_fraction)) # PubSub subscriptions --------------------------------------- if pub is not None: try: pub.subscribe(self.on_connection_established, "meshtastic.connection.established") pub.subscribe(self.on_connection_lost, "meshtastic.connection.lost") pub.subscribe(self.on_receive, "meshtastic.receive") pub.subscribe(self.on_node_updated, "meshtastic.node.updated") except Exception as e: log.warning("pubsub subscribe failed: %s", e) # Load persisted settings (from ./meshtastic_client_settings.json) if present self.load_settings(silent=True) self.apply_theme(self.current_theme) self._status_update(info="Ready. Connection -> Connect (TCP/USB/BLE)") self._update_title_with_host() # UI helpers ------------------------------------------------------ def _ui(self, fn, *args, **kwargs): """Run fn in Tk main thread.""" if threading.get_ident() == self._ui_thread_id: fn(*args, **kwargs) else: self.root.after(0, lambda: fn(*args, **kwargs)) def _status_update(self, *, conn: Optional[str] = None, pos: Optional[str] = None, tel: Optional[str] = None, info: Optional[str] = None): """Update the bottom status bar (UI thread safe).""" def _do(): ts = time.strftime("%H:%M:%S") if conn is not None: self.status_conn_var.set(f"Conn: {conn}") if pos is not None: self.status_pos_var.set(f"POS: {pos} @{ts}" if pos != "-" else "POS: -") if tel is not None: self.status_tel_var.set(f"TEL: {tel} @{ts}" if tel != "-" else "TEL: -") if info is not None: self.status_info_var.set(f"{info} @{ts}" if info else "") self._ui(_do) def _status_event(self, kind: str, label: str): """Convenience for updating POS/TEL and other packet status.""" kind = (kind or "").upper().strip() if kind == "POS": self._status_update(pos=label) elif kind == "TEL": self._status_update(tel=label) else: self._status_update(info=f"{kind}: {label}" if kind else label) def clear_messages(self): self._ui(lambda: self.txt_messages.delete("1.0", "end")) # helpers --------------------------------------------------------- def _open_browser_url(self, url: str): _prefer_chrome(url) def _update_title_with_host(self): self.root.title(f"Meshtastic Client - {self.host_var.get()}:{self.port_var.get()}") def _safe_set_sash(self, fraction: float = 0.5): try: w = self.paned.winfo_width() or self.paned.winfo_reqwidth() self.paned.sashpos(0, int(w * fraction)) except Exception: pass def _node_label(self, node_id: str) -> str: if not self.iface or not getattr(self.iface, "nodes", None): return node_id node = self.iface.nodes.get(node_id, {}) # type: ignore[attr-defined] user = (node or {}).get("user") or {} shortname = user.get("shortName") or "" longname = user.get("longName") or "" label = (f"{shortname} {longname}").strip() return label or node_id # pubsub callbacks ------------------------------------------------ def _mark_connected(self): if self.connected_evt.is_set(): return self.connected_evt.set() self._status_update(conn="Connected", info="Connected") self.refresh_nodes() try: self._update_channels_from_iface() except Exception: pass def on_connection_established(self, interface=None, **kwargs): self._ui(self._mark_connected) def on_connection_lost(self, interface=None, **kwargs): def _lost(): self.connected_evt.clear() self._status_update(conn="Disconnected", info="Connection lost") self._ui(_lost) def on_node_updated(self, node=None, interface=None, **kwargs): self._ui(self.refresh_nodes) def on_receive(self, packet=None, interface=None, **kwargs): self._ui(lambda: self._handle_receive(packet or {})) # receive --------------------------------------------------------- def _handle_receive(self, packet: dict): if not isinstance(packet, dict): return decoded = packet.get("decoded") or {} if not isinstance(decoded, dict): decoded = {} app_name = decoded.get("app") or "" portnum = decoded.get("portnum") or app_name or "" sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort") or "UNKNOWN" sender = str(sender) # Mark last seen (for more accurate "Since") self._last_seen_overrides[sender] = time.time() label = self._node_label(sender) # Telemetry parsing (device metrics) if portnum == "TELEMETRY_APP" and telemetry_pb2 is not None: try: payload = decoded.get("payload") if isinstance(payload, (bytes, bytearray)): tmsg = telemetry_pb2.Telemetry() tmsg.ParseFromString(payload) if tmsg.HasField("device_metrics"): dm = tmsg.device_metrics entry = self._telemetry.get(sender, {}) if getattr(dm, "battery_level", None) is not None: entry["battery"] = dm.battery_level if getattr(dm, "voltage", None) is not None: entry["voltage"] = dm.voltage self._telemetry[sender] = entry except Exception: pass # telemetry failures are non-fatal # Text message if portnum in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"): text = decoded.get("text") if isinstance(text, bytes): text = text.decode("utf-8", errors="ignore") if not isinstance(text, str): # Some packets provide bytes in decoded.payload p = decoded.get("payload") if isinstance(p, (bytes, bytearray)): text = p.decode("utf-8", errors="ignore") elif isinstance(p, dict): t2 = p.get("text") if isinstance(t2, str): text = t2 elif isinstance(t2, (bytes, bytearray)): text = t2.decode("utf-8", errors="ignore") else: text = "" rssi = packet.get("rxRssi") if text: self._append(f"[MSG] {label}: {text} (RSSI={rssi})") self._append_to_node_chat(sender, text) else: self._append(f"[MSG] {label} (RSSI={rssi})") if text.strip().lower() == "ping": self._send_pong(sender, label) else: # Non-text: keep lightweight log tag_map = { "POSITION_APP": "POS", "TELEMETRY_APP": "TEL", "NODEINFO_APP": "INFO", "ROUTING_APP": "ROUT", "MAP_REPORT_APP": "MAP", "ADMIN_APP": "ADM", "NEIGHBORINFO_APP": "NEI", "STORE_FORWARD_APP": "SFWD", "REMOTE_HARDWARE_APP": "RHW", "PRIVATE_APP": "PRIV", } tag = tag_map.get(str(app_name or portnum), "INFO") # Avoid spamming for every non-text packet; show only brief line self._status_event(tag, label) self.refresh_nodes() def _send_pong(self, dest_id: Optional[str], label: str): if not self.iface or not dest_id: return try: self.iface.sendText("pong", destinationId=dest_id, wantAck=False) self._status_update(info=f"auto pong -> {label}") self._append_to_node_chat(str(dest_id), "[auto] pong") except Exception as e: self._status_update(info=f"auto pong failed: {e}") # connection actions ---------------------------------------------- def set_ip_port(self): win = tk.Toplevel(self.root) win.title("Set IP/Port") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.grid(row=0, column=0, sticky="nsew") win.columnconfigure(0, weight=1) win.rowconfigure(0, weight=1) ttk.Label(frm, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4) ent_host = ttk.Entry(frm, textvariable=self.host_var, width=28) ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=4) ttk.Label(frm, text="Port:").grid(row=1, column=0, sticky="w", pady=4) ent_port = ttk.Entry(frm, textvariable=self.port_var, width=10) ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=4) frm.columnconfigure(1, weight=1) def save(): h = self.host_var.get().strip() try: p = int(self.port_var.get()) except Exception: messagebox.showerror("Port", "Port must be 1-65535") return if not h: messagebox.showerror("Host", "Host cannot be empty") return if not (1 <= p <= 65535): messagebox.showerror("Port", "Port must be 1-65535") return self.host_var.set(h) self.port_var.set(p) self._update_title_with_host() win.destroy() btnbar = ttk.Frame(frm) btnbar.grid(row=2, column=0, columnspan=2, sticky="e") ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4) ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4) def connect_saved(self): """Connect using the method selected in Settings.""" if self.iface: return method = (self.conn_type_var.get() or "tcp").strip().lower() if method == "serial": port = (self.serial_port_var.get() or "").strip() self.connect_serial(port if port else None) elif method in ("ble", "bluetooth"): addr = (self.ble_addr_var.get() or "").strip() if not addr: # If no saved address, fall back to interactive scan self.connect_ble_dialog() else: self.connect_ble(addr) else: self.connect_tcp() def connect_serial(self, port: Optional[str] = None): """Connect via USB/Serial. If port is None, auto-detect.""" if self.iface: return if SerialInterface is None: self._ui(lambda: messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.")) return self._status_update(conn="Connecting (Serial)", info=f"Connecting Serial {port or '(auto)'}...") def run(): try: self.iface = SerialInterface(devPath=port) if port else SerialInterface() if pub is None: self._ui(self._mark_connected) else: self.connected_evt.wait(timeout=6) except Exception as e: self._ui(lambda: messagebox.showerror("Serial connect failed", str(e))) threading.Thread(target=run, daemon=True).start() def connect_ble(self, address: str): """Connect via Bluetooth (BLE) using a saved address or name filter.""" if self.iface: return if BLEInterface is None: self._ui(lambda: messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).")) return self._status_update(conn="Connecting (BLE)", info=f"Connecting BLE {address}...") def run(): try: # BLEInterface accepts an address or name filter and will scan for matches. self.iface = BLEInterface(address=address) if pub is None: self._ui(self._mark_connected) else: self.connected_evt.wait(timeout=10) except Exception as e: self._ui(lambda: messagebox.showerror("BLE connect failed", str(e))) threading.Thread(target=run, daemon=True).start() def connect_tcp(self): if self.iface: return host = self.host_var.get().strip() try: port = int(self.port_var.get()) except Exception: messagebox.showerror("Port", "Invalid port") return self._status_update(conn="Connecting (TCP)", info=f"Connecting TCP {host}:{port}...") def run(): try: if TCPInterface is None: raise RuntimeError("meshtastic.tcp_interface not installed") self.iface = TCPInterface(hostname=host, portNumber=port) # If pubsub isn't available, we might not get the established event. if pub is None: self._ui(self._mark_connected) else: self.connected_evt.wait(timeout=6) except Exception as e: self._ui(lambda: messagebox.showerror("TCP connect failed", str(e))) threading.Thread(target=run, daemon=True).start() def connect_serial_dialog(self): """Interactive serial connect (also stores chosen port in Settings).""" if SerialInterface is None: messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.") return ports = [] if list_ports: try: ports = [p.device for p in list_ports.comports()] except Exception: ports = [] presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"] initial = "(auto)" try: saved = (self.serial_port_var.get() or "").strip() if saved: initial = saved except Exception: pass port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=initial) if port is None: return port = port.strip() if port.lower() == "(auto)" or port == "": port = None # Persist choice self.serial_port_var.set(port or "") self.conn_type_var.set("serial") self.connect_serial(port) def connect_ble_dialog(self): """Interactive BLE connect (also stores chosen device address in Settings).""" if BLEInterface is None: messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).") return self._status_update(conn="Scanning BLE", info="Scanning BLE for Meshtastic devices...") try: devices = BLEInterface.scan() except Exception as e: messagebox.showerror("BLE scan failed", str(e)) return if not devices: messagebox.showinfo("BLE", "No devices found.") return options = [ f"{i + 1}. {getattr(d, 'name', '') or '(unnamed)'} [{getattr(d, 'address', '?')}]" for i, d in enumerate(devices) ] choice = simpledialog.askinteger( "Select BLE device", "Enter number:\n" + "\n".join(options), minvalue=1, maxvalue=len(devices), ) if not choice: return addr = getattr(devices[choice - 1], "address", None) if not addr: messagebox.showerror("BLE", "Selected device has no address.") return # Persist choice self.ble_addr_var.set(str(addr)) self.conn_type_var.set("ble") self.connect_ble(str(addr)) def disconnect(self): try: if self.iface: self.iface.close() except Exception: pass self.iface = None self.connected_evt.clear() self._status_update(conn="Disconnected", info="Disconnected") # send ------------------------------------------------------------ def _reset_channel_choices(self): """Initialize channel selector with Public + To selected (DM).""" self._channel_map = {} options = [] label_dm = "To selected node (DM)" self._channel_map[label_dm] = {"mode": "selected", "channelIndex": 0} options.append(label_dm) label_pub = "Public (broadcast)" self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0} options.append(label_pub) self.cbo_channel["values"] = options # Apply pending default channel choice if it exists now if self._pending_channel_choice: if self._pending_channel_choice in options: self.channel_var.set(self._pending_channel_choice) self._pending_channel_choice = None self.channel_var.set(label_pub) def _update_channels_from_iface(self): """Populate channel selector with channels from the connected device (broadcast channels).""" iface = getattr(self, "iface", None) if not iface: return local_node = getattr(iface, "localNode", None) if not local_node: return chans = getattr(local_node, "channels", None) try: if (not chans) and hasattr(local_node, "requestChannels"): local_node.requestChannels() time.sleep(1.2) chans = getattr(local_node, "channels", None) except Exception: chans = getattr(local_node, "channels", None) try: options = list(self.cbo_channel["values"]) except Exception: return # Keep DM option first, keep Public second, then channels 1..N # Update "Public" label with ch0 name if available. try: if chans and len(chans) > 0: ch0 = chans[0] try: ch0_name = (getattr(ch0, "settings", None).name or "").strip() except Exception: try: ch0_name = (ch0.settings.name or "").strip() except Exception: ch0_name = "" if ch0_name: old_label = None for lbl, meta in list(self._channel_map.items()): if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0: old_label = lbl break if old_label: new_label = f"Public (ch0: {ch0_name})" if new_label != old_label: self._channel_map[new_label] = self._channel_map.pop(old_label) options = [new_label if v == old_label else v for v in options] if self.channel_var.get() == old_label: self.channel_var.set(new_label) except Exception: pass for idx, ch in enumerate(chans or []): if idx == 0: continue try: name = (getattr(ch, "settings", None).name or "").strip() except Exception: try: name = (ch.settings.name or "").strip() except Exception: name = "" label = f"Ch {idx}: {name}" if name else f"Ch {idx}" if label in self._channel_map: continue self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx} options.append(label) self.cbo_channel["values"] = options def send_message(self): msg = self.ent_message.get().strip() if not msg: return if not self.iface: messagebox.showwarning("Not connected", "Connect first.") return choice = self.channel_var.get() or "" info = self._channel_map.get(choice) or {"mode": "broadcast", "channelIndex": 0} mode = info.get("mode", "broadcast") ch_index = int(info.get("channelIndex", 0) or 0) try: if mode == "selected": nid = self._get_selected_node_id() if not nid: messagebox.showinfo("No selection", "Select a node first.") return dest = self._resolve_node_dest_id(nid) if not dest: messagebox.showerror("Send failed", "Cannot resolve destination for selected node.") return self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index) self._append(f"[ME -> {self._node_label(nid)}] {msg}") self._append_to_node_chat(nid, "[ME] " + msg) else: self.iface.sendText(msg, wantAck=False, channelIndex=ch_index) if mode == "broadcast_channel" and ch_index: self._append(f"[ME ch{ch_index}] {msg}") else: self._append(f"[ME] {msg}") self.ent_message.delete(0, "end") except Exception as e: messagebox.showerror("Send failed", str(e)) # nodes ----------------------------------------------------------- def _get_lastheard_epoch(self, node_id: str, node: Dict[str, Any]) -> Optional[float]: raw = (node or {}).get("lastHeard") ts_iface = None if raw is not None: try: val = float(raw) ts_iface = val / 1000.0 if val > 10000000000 else val except Exception: ts_iface = None ts_local = self._last_seen_overrides.get(str(node_id)) if ts_iface and ts_local: return max(ts_iface, ts_local) return ts_iface or ts_local def _extract_latlon(self, node: dict) -> Tuple[Optional[float], Optional[float]]: pos = (node or {}).get("position") or {} lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i") lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i") try: if lat is not None: lat = float(lat) * (1e-7 if abs(float(lat)) > 90 else 1.0) if lon is not None: lon = float(lon) * (1e-7 if abs(float(lon)) > 180 else 1.0) except Exception: lat = lon = None return lat, lon def _extract_speed_alt(self, node: dict) -> Tuple[Optional[float], Optional[float]]: pos = (node or {}).get("position") or {} speed = ( pos.get("groundSpeedKmh") or pos.get("groundSpeedKmhI") or pos.get("groundSpeed") or pos.get("ground_speed") ) alt = ( pos.get("altitude") or pos.get("altitudeM") or pos.get("altitude_i") or pos.get("altitudeI") ) try: speed = float(speed) if speed is not None else None alt = float(alt) if alt is not None else None except Exception: speed, alt = None, None return speed, alt def _get_local_latlon(self) -> Tuple[Optional[float], Optional[float]]: if not self.iface: return (None, None) try: mi = getattr(self.iface, "myInfo", None) nbn = getattr(self.iface, "nodesByNum", None) if mi is not None and hasattr(mi, "my_node_num") and nbn: n = nbn.get(mi.my_node_num) or {} lat, lon = self._extract_latlon(n) if lat is not None and lon is not None: return lat, lon except Exception: pass return (None, None) def _haversine_km(self, lat1, lon1, lat2, lon2) -> float: import math R = 6371.0088 phi1 = math.radians(lat1) phi2 = math.radians(lat2) dphi = math.radians(lat2 - lat1) dlmb = math.radians(lon2 - lon1) a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlmb / 2) ** 2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return R * c def refresh_nodes(self): if not self.iface or not getattr(self.iface, "nodes", None): return q = self.ent_search.get().strip().lower() for iid in self.tv_nodes.get_children(""): self.tv_nodes.delete(iid) try: nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined] except Exception: nodes_snapshot = {} base_lat, base_lon = self._get_local_latlon() for node_id, node in nodes_snapshot.items(): user = (node or {}).get("user") or {} shortname = user.get("shortName") or "" longname = user.get("longName") or "" hwmodel = user.get("hwModel") or "" role_raw = user.get("role") or "" role = role_raw if str(role_raw).strip() else "CLIENT" macaddr = user.get("macaddr") or "" publickey = user.get("publicKey") or "" unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False lastheard_epoch = self._get_lastheard_epoch(str(node_id), node) since_str = _fmt_ago(lastheard_epoch) hops = node.get("hopsAway") lat, lon = self._extract_latlon(node) dist = None if base_lat is not None and base_lon is not None and lat is not None and lon is not None: try: dist = self._haversine_km(base_lat, base_lon, lat, lon) except Exception: dist = None dist_str = f"{dist:.1f}" if isinstance(dist, (int, float)) else "-" speed, alt = self._extract_speed_alt(node) speed_str = f"{speed:.1f}" if isinstance(speed, (int, float)) else "-" alt_str = f"{alt:.0f}" if isinstance(alt, (int, float)) else "-" telem = self._telemetry.get(str(node_id), {}) bat = telem.get("battery") volt = telem.get("voltage") bat_str = f"{bat:.0f}" if isinstance(bat, (int, float)) else "-" volt_str = f"{volt:.2f}" if isinstance(volt, (int, float)) else "-" values = ( shortname, longname, since_str, str(hops) if hops is not None else "-", dist_str, speed_str, alt_str, bat_str, volt_str, f"{(lastheard_epoch or 0):.0f}", hwmodel, role, macaddr, publickey, str(bool(unmsg)), str(node_id), ) if not q or any(q in str(v).lower() for v in values): try: self.tv_nodes.insert("", "end", iid=str(node_id), values=values) except Exception: self.tv_nodes.insert("", "end", values=values) if self._last_sort_col: self.sort_by_column(self._last_sort_col, self._last_sort_reverse) self.nodes_frame.config(text=f"Nodes ({len(self.tv_nodes.get_children())})") def sort_by_column(self, col: str, reverse: bool = False): self._last_sort_col = col self._last_sort_reverse = reverse col_to_sort = "lastheard" if col == "since" else col numeric = {"lastheard", "distkm", "hops", "speed", "alt", "battery", "voltage"} rows = [] for iid in self.tv_nodes.get_children(""): val = self.tv_nodes.set(iid, col_to_sort) if col_to_sort in numeric: try: val = float(val if val != "-" else 0.0) except Exception: val = 0.0 else: val = str(val).casefold() rows.append((val, iid)) rows.sort(key=lambda t: t[0], reverse=reverse) for index, (_, iid) in enumerate(rows): self.tv_nodes.move(iid, "", index) self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse)) # THEME ----------------------------------------------------------- def apply_theme(self, mode: str = "light"): self.current_theme = mode is_dark = mode == "dark" bg = "#1e1e1e" if is_dark else "#f5f5f5" fg = "#ffffff" if is_dark else "#000000" acc = "#2d2d2d" if is_dark else "#ffffff" sel = "#555555" if is_dark else "#cce0ff" try: self.root.configure(bg=bg) except Exception: pass style = ttk.Style(self.root) try: style.theme_use("clam") except Exception: pass style.configure("TFrame", background=bg) style.configure("TLabelframe", background=bg, foreground=fg) style.configure("TLabelframe.Label", background=bg, foreground=fg) style.configure("TLabel", background=bg, foreground=fg) style.configure("TButton", background=acc, foreground=fg) style.configure("TEntry", fieldbackground=acc, foreground=fg) style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg) style.map( "TCombobox", fieldbackground=[("readonly", acc)], foreground=[("readonly", fg)], background=[("readonly", acc)], ) style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0) style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)]) try: if self.menubar is not None: self.menubar.configure( background=bg, foreground=fg, activebackground=sel, activeforeground=fg, borderwidth=0, relief="flat", ) except Exception: pass try: self.txt_messages.configure( bg=acc, fg=fg, insertbackground=fg, selectbackground=sel, selectforeground=fg, ) except Exception: pass try: self.root.option_add("*Menu*background", bg) self.root.option_add("*Menu*foreground", fg) self.root.option_add("*Menu*activeBackground", sel) self.root.option_add("*Menu*activeForeground", fg) self.root.option_add("*TCombobox*Listbox*background", acc) self.root.option_add("*TCombobox*Listbox*foreground", fg) except Exception: pass # Apply to open chat windows for win in list(self._per_node_chats.values()): try: win.apply_theme() except Exception: pass def _style_toplevel(self, win: tk.Toplevel): is_dark = self.current_theme == "dark" bg = "#1e1e1e" if is_dark else "#f5f5f5" try: win.configure(bg=bg) except Exception: pass # UTILS / CONTEXT ------------------------------------------------ def _append(self, text: str): def _do(): self.txt_messages.insert("end", text + "\n") self.txt_messages.see("end") self._ui(_do) def _get_selected_node_id(self) -> Optional[str]: sel = self.tv_nodes.selection() return sel[0] if sel else None def _popup_node_menu(self, event): iid = self.tv_nodes.identify_row(event.y) if iid: self.tv_nodes.selection_set(iid) self.node_menu.tk_popup(event.x_root, event.y_root) self.node_menu.grab_release() def _cm_show_node_details(self): self.show_raw_node(friendly=True) def _cm_open_map(self): nid = self._get_selected_node_id() if not nid or not self.iface or not getattr(self.iface, "nodes", None): messagebox.showinfo("Map", "No node selected.") return node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] lat, lon = self._extract_latlon(node) if lat is None or lon is None: messagebox.showinfo("Map", "Selected node has no GPS position.") return url = f"https://www.google.com/maps/search/?api=1&query={lat},{lon}" self._open_browser_url(url) def show_raw_node(self, friendly: bool = False): nid = self._get_selected_node_id() if not nid or not self.iface or not getattr(self.iface, "nodes", None): messagebox.showinfo("Node", "No node selected.") return node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] win = tk.Toplevel(self.root) win.title(f"Node: {self._node_label(nid)}") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") txt = tk.Text(frm, wrap="word") txt.pack(expand=True, fill="both") is_dark = self.current_theme == "dark" txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), fg=("#ffffff" if is_dark else "#000000"), insertbackground=("#ffffff" if is_dark else "#000000"), ) if not friendly: txt.insert("1.0", json.dumps(node, indent=2, default=str)) txt.configure(state="disabled") return def fmt_val(v, indent=0): pad = " " * indent if isinstance(v, dict): lines = [] for k, vv in v.items(): if isinstance(vv, (dict, list)): lines.append(f"{pad}{k}:") lines.append(fmt_val(vv, indent + 1)) else: lines.append(f"{pad}{k}: {vv}") return "\n".join(lines) if isinstance(v, list): lines = [] for i, item in enumerate(v): if isinstance(item, (dict, list)): lines.append(f"{pad}- [{i}]") lines.append(fmt_val(item, indent + 1)) else: lines.append(f"{pad}- {item}") return "\n".join(lines) return f"{pad}{v}" user = (node or {}).get("user") or {} pos = (node or {}).get("position") or {} caps = (node or {}).get("capabilities") or {} config = (node or {}).get("config") or {} node_id = user.get("id") or node.get("id") or nid macaddr = user.get("macaddr") or node.get("macaddr") or "" publickey = user.get("publicKey") or node.get("publicKey") or "" hw = user.get("hwModel", "") lines = [ f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(), f"ID: {node_id}", f"MAC: {macaddr}", f"HW: {hw}", f"Public key: {publickey}", "", f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}", "", "Position:", fmt_val(pos, 1), ] if caps: lines.append("Capabilities:") lines.append(fmt_val(caps, 1)) if config: lines.append("Config:") lines.append(fmt_val(config, 1)) lines.append("RAW fields:") skip = {"user", "position", "capabilities", "config"} other = {k: v for k, v in (node or {}).items() if k not in skip} lines.append(fmt_val(other, 1)) txt.insert("1.0", "\n".join(lines)) txt.configure(state="disabled") def show_neighbors_window(self): if not self.iface or not getattr(self.iface, "nodes", None): messagebox.showinfo("Neighbors", "No interface or nodes available.") return win = tk.Toplevel(self.root) win.title("Neighbor table") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") tree = ttk.Treeview(frm, columns=("from", "to", "snr", "lastheard"), show="headings") for col, txt in (("from", "From node"), ("to", "To node"), ("snr", "SNR"), ("lastheard", "Last heard")): tree.heading(col, text=txt) tree.column(col, width=160 if col in ("from", "to") else 90, anchor="w") tree.pack(side="left", expand=True, fill="both") yscroll = ttk.Scrollbar(frm, orient="vertical", command=tree.yview) tree.configure(yscrollcommand=yscroll.set) yscroll.pack(side="right", fill="y") try: nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined] except Exception: nodes_snapshot = {} rows = 0 for node_id, node in nodes_snapshot.items(): from_label = self._node_label(str(node_id)) neighbors = (node or {}).get("neighbors") or [] if isinstance(neighbors, dict): neighbors = neighbors.values() for n in neighbors: try: to_id = (n or {}).get("nodeId") or (n or {}).get("id") or "" to_label = self._node_label(str(to_id)) if to_id else str(to_id) snr = (n or {}).get("snr") or (n or {}).get("snrDb") last = (n or {}).get("lastHeard") try: last_epoch = float(last) if last is not None else None except Exception: last_epoch = None last_str = _fmt_ago(last_epoch) except Exception: continue tree.insert("", "end", values=(from_label, to_label, snr if snr is not None else "-", last_str)) rows += 1 if rows == 0: messagebox.showinfo("Neighbors", "No neighbor information found on nodes.") # admin / traceroute --------------------------------------------- def _resolve_node_dest_id(self, nid: str) -> Optional[str]: nid = str(nid or "").strip() if not nid: return None if nid.startswith("!") or nid.isdigit(): return nid try: if self.iface and getattr(self.iface, "nodes", None): node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined] user = (node or {}).get("user") or {} node_id = user.get("id") or "" if node_id: return str(node_id) except Exception: pass return "!" + nid if not nid.startswith("!") else nid def _cm_traceroute(self): nid = self._get_selected_node_id() if not nid: messagebox.showinfo("Traceroute", "Select a node first.") return if not self.iface: messagebox.showwarning("Traceroute", "Connect first.") return dest = self._resolve_node_dest_id(nid) if not dest: messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.") return self._status_update(info=f"Traceroute -> {self._node_label(nid)} ({dest})") threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start() def _cm_delete_node(self): nid = self._get_selected_node_id() if not nid: messagebox.showinfo("Delete node", "Select a node first.") return if not self.iface or not getattr(self.iface, "localNode", None): messagebox.showwarning("Delete node", "Connect first.") return dest = self._resolve_node_dest_id(nid) if not dest: messagebox.showerror("Delete node", "Cannot determine node ID.") return label = self._node_label(nid) if not messagebox.askyesno( "Delete node", f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\nThe device might reboot after this." ): return try: self.iface.localNode.removeNode(dest) # type: ignore[attr-defined] self._status_update(info=f"Admin: delete requested for {label} ({dest})") except Exception as e: messagebox.showerror("Delete node", f"Failed to delete node: {e}") return try: self.tv_nodes.delete(nid) self.nodes_frame.config(text=f"Nodes ({len(self.tv_nodes.get_children())})") except Exception: pass def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0): if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"): self._do_traceroute_via_interface(dest, hop_limit, channel_index) else: self._do_traceroute_via_cli(dest) def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int): evt = threading.Event() result: Dict[str, Any] = {} def _num_to_label(num: int) -> str: try: nbn = getattr(self.iface, "nodesByNum", None) if nbn and num in nbn: n = nbn[num] user = (n or {}).get("user") or {} sid = user.get("id") or f"!{num:08x}" sn = user.get("shortName") or "" ln = user.get("longName") or "" label = (sn or ln or sid).strip() return f"{label} ({sid})" if sid else label except Exception: pass return f"!{int(num):08x}" def _on_response(p: dict): try: rd = mesh_pb2.RouteDiscovery() rd.ParseFromString(p["decoded"]["payload"]) result["packet"] = p result["data"] = _json_format.MessageToDict(rd) except Exception as e: result["error"] = str(e) finally: evt.set() r = mesh_pb2.RouteDiscovery() try: # Some meshtastic-python versions don't expose hopLimit as a sendData kwarg. self.iface.sendData( r, destinationId=dest, portNum=portnums_pb2.PortNum.TRACEROUTE_APP, wantResponse=True, onResponse=_on_response, channelIndex=channel_index, hopLimit=hop_limit, ) except TypeError: self.iface.sendData( r, destinationId=dest, portNum=portnums_pb2.PortNum.TRACEROUTE_APP, wantResponse=True, onResponse=_on_response, channelIndex=channel_index, ) except Exception as e: self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}")) return if not evt.wait(30.0): self._ui(lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported).")) return if "error" in result: self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}")) return p = result.get("packet") or {} data = result.get("data") or {} UNK = -128 try: origin_num = int(p.get("to")) dest_num = int(p.get("from")) except Exception: origin_num = None dest_num = None def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]: route_nums = [] for v in data.get(route_key, []): try: route_nums.append(int(v)) except Exception: pass snrs = [] for v in data.get(snr_key, []): try: snrs.append(int(v)) except Exception: pass if start_num is None or end_num is None: return None nodes = [start_num] + route_nums + [end_num] if len(nodes) <= 1: return None parts = [] for idx, num in enumerate(nodes): label = _num_to_label(num) if idx == 0: parts.append(label) else: snr_txt = "? dB" if (idx - 1) < len(snrs): v = snrs[idx - 1] if v != UNK: snr_txt = f"{v / 4.0:.2f} dB" parts.append(f"{label} ({snr_txt})") return title + "\n" + " -> ".join(parts) lines = [] fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num) if fwd: lines.append(fwd) back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num) if back: lines.append(back) if not lines: self._ui(lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available.")) return text = "\n\n".join(lines) self._ui(lambda: self._show_traceroute_window(text)) def _do_traceroute_via_cli(self, dest: str): host = (self.host_var.get() or "").strip() or HOST_DEFAULT cmd = ["meshtastic", "--host", host, "--traceroute", dest] try: proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40) except Exception as e: self._ui(lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}")) return out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "") if not out.strip(): self._ui(lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute.")) return self._ui(lambda: self._show_traceroute_window(out)) def _show_traceroute_window(self, text: str): win = tk.Toplevel(self.root) win.title("Traceroute") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") txt = tk.Text(frm, wrap="word") txt.pack(expand=True, fill="both") is_dark = self.current_theme == "dark" txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), fg=("#ffffff" if is_dark else "#000000"), insertbackground=("#ffffff" if is_dark else "#000000"), ) txt.insert("1.0", text.strip() or "No traceroute data.") txt.configure(state="disabled") # chat windows ---------------------------------------------------- def _cm_open_chat(self): nid = self._get_selected_node_id() if not nid: messagebox.showinfo("Chat", "Select a node first.") return self._open_node_chat(nid) def _open_node_chat(self, nid: str): key = str(nid) if key in self._per_node_chats: win = self._per_node_chats[key] try: win.top.deiconify() win.top.lift() except Exception: pass return label = self._node_label(nid) chat = NodeChatWindow(self, key, label) self._per_node_chats[key] = chat def _append_to_node_chat(self, node_id: str, line: str): key = str(node_id) win = self._per_node_chats.get(key) if not win: return win.append(line) def _send_text_to_node(self, node_key: str, msg: str) -> bool: if not self.iface: messagebox.showwarning("Send", "Connect first.") return False msg = (msg or "").strip() if not msg: return False dest_id = self._resolve_node_dest_id(node_key) if not dest_id: messagebox.showerror("Send failed", "Cannot resolve destinationId for this node.") return False # Use currently selected channelIndex (so DM uses the same channel selection) choice = self.channel_var.get() or "" info = self._channel_map.get(choice) or {"mode": "broadcast", "channelIndex": 0} ch_index = int(info.get("channelIndex", 0) or 0) try: self.iface.sendText(msg, destinationId=dest_id, wantAck=False, channelIndex=ch_index) self._append(f"[ME -> {self._node_label(node_key)}] {msg}") self._append_to_node_chat(node_key, "[ME] " + msg) return True except Exception as e: messagebox.showerror("Send failed", str(e)) return False # config viewers/editors ----------------------------------------- def show_radio_config_window(self): if not self.iface or not getattr(self.iface, "localNode", None): messagebox.showinfo("Radio config", "Connect to a device first.") return ln = self.iface.localNode try: if getattr(ln, "localConfig", None) is None and hasattr(ln, "waitForConfig"): ln.waitForConfig("localConfig") except Exception: pass cfg = getattr(ln, "localConfig", None) mod = getattr(ln, "moduleConfig", None) if cfg is None and mod is None: messagebox.showinfo("Radio config", "No config available yet from device.") return win = tk.Toplevel(self.root) win.title("Radio + module config (read-only)") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.pack(expand=True, fill="both") txt = tk.Text(frm, wrap="none") txt.pack(side="left", expand=True, fill="both") yscroll = ttk.Scrollbar(frm, orient="vertical", command=txt.yview) txt.configure(yscrollcommand=yscroll.set) yscroll.pack(side="right", fill="y") is_dark = self.current_theme == "dark" txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), fg=("#ffffff" if is_dark else "#000000"), insertbackground=("#ffffff" if is_dark else "#000000"), ) lines = [] if cfg is not None: lines.append("localConfig:") try: lines.append(str(cfg)) except Exception: lines.append(repr(cfg)) if mod is not None: lines.append("\nmoduleConfig:") try: lines.append(str(mod)) except Exception: lines.append(repr(mod)) txt.insert("1.0", "\n".join(lines)) txt.configure(state="disabled") def show_channel_editor_window(self): if not self.iface or not getattr(self.iface, "localNode", None): messagebox.showinfo("Channels", "Connect to a device first.") return ln = self.iface.localNode try: if getattr(ln, "channels", None) in (None, {}) and hasattr(ln, "waitForConfig"): ln.waitForConfig("channels") except Exception: pass chans = getattr(ln, "channels", None) if not chans: messagebox.showinfo("Channels", "No channels available from device.") return win = tk.Toplevel(self.root) win.title("Channel editor") self._style_toplevel(win) frm = ttk.Frame(win, padding=8) frm.grid(row=0, column=0, sticky="nsew") win.rowconfigure(0, weight=1) win.columnconfigure(0, weight=1) listbox = tk.Listbox(frm, height=10) listbox.grid(row=0, column=0, columnspan=2, sticky="nsew") frm.rowconfigure(0, weight=1) frm.columnconfigure(0, weight=1) for idx, ch in enumerate(chans): if ch is None: label = f"Ch {idx} (empty)" else: try: name = (getattr(ch, "settings", None).name or "").strip() except Exception: try: name = (ch.settings.name or "").strip() except Exception: name = "" label = f"{idx}: {name or '(no name)'}" listbox.insert("end", label) ttk.Label(frm, text="Channel name:").grid(row=1, column=0, sticky="w", pady=(6, 2)) name_var = tk.StringVar() ent_name = ttk.Entry(frm, textvariable=name_var, width=40) ent_name.grid(row=2, column=0, columnspan=2, sticky="ew") def on_select(evt=None): sel = listbox.curselection() if not sel: return i = sel[0] ch = chans[i] if ch is None: name_var.set("") return try: nm = (getattr(ch, "settings", None).name or "").strip() except Exception: try: nm = (ch.settings.name or "").strip() except Exception: nm = "" name_var.set(nm) listbox.bind("<>", on_select) def save_name(): sel = listbox.curselection() if not sel: messagebox.showinfo("Channel editor", "Select a channel first.") return i = sel[0] ch = chans[i] if ch is None: messagebox.showerror("Channel editor", "Selected channel object is empty, cannot rename.") return new_name = name_var.get().strip() try: if hasattr(ch, "settings"): ch.settings.name = new_name elif hasattr(ch, "name"): ch.name = new_name ln.writeChannel(i) self._status_update(info=f"Admin: renamed channel {i} to '{new_name}'") self._update_channels_from_iface() listbox.delete(i) label = f"{i}: {new_name or '(no name)'}" listbox.insert(i, label) except Exception as e: messagebox.showerror("Channel editor", f"Failed to write channel: {e}") ttk.Button(frm, text="Save name to device", command=save_name).grid(row=3, column=0, sticky="w", pady=(6, 0)) # SETTINGS (persist to ./meshtastic_client_settings.json) ------------------- def _settings_path(self) -> pathlib.Path: """Return the settings file path in the current working directory ('.').""" return pathlib.Path.cwd() / CONFIG_FILENAME def load_settings(self, silent: bool = False) -> None: """Load settings from JSON if the file exists.""" path = self._settings_path() if not path.exists(): # Apply defaults if no config file if not self._startup_geometry_applied: try: self.root.geometry(DEFAULT_GEOMETRY) self._startup_geometry_applied = True except Exception: pass return try: data = json.loads(path.read_text(encoding="utf-8")) except Exception as e: if not silent: messagebox.showerror("Settings", f"Failed to read settings file:\n{path}\n\n{e}") return # Host/port try: host = str(data.get("host", "")).strip() if host: self.host_var.set(host) except Exception: pass try: port = int(data.get("port", PORT_DEFAULT)) if 1 <= port <= 65535: self.port_var.set(port) except Exception: pass # Connection method and params try: ctype = str(data.get("connection_type", "")).strip().lower() if ctype in ("tcp", "serial", "ble", "bluetooth"): self.conn_type_var.set("ble" if ctype == "bluetooth" else ctype) except Exception: pass try: sp = str(data.get("serial_port", "")).strip() self.serial_port_var.set(sp) except Exception: pass try: ba = str(data.get("ble_address", "")).strip() self.ble_addr_var.set(ba) except Exception: pass # Theme try: theme = str(data.get("theme", "")).strip().lower() if theme in ("light", "dark"): self.current_theme = theme except Exception: pass # Default send target / channel choice (label string) try: ch_choice = str(data.get("channel_choice", "")).strip() if ch_choice: # only apply if exists now; otherwise keep pending until channels are loaded try: values = list(self.cbo_channel["values"]) except Exception: values = [] if ch_choice in values: self.channel_var.set(ch_choice) else: self._pending_channel_choice = ch_choice except Exception: pass # Window geometry try: geom = str(data.get("window_geometry", "")).strip() if geom: self.root.geometry(geom) self._startup_geometry_applied = True except Exception: pass # Sash fraction try: frac = float(data.get("sash_fraction", DEFAULT_SASH_FRACTION)) if 0.10 <= frac <= 0.90: self._startup_sash_fraction = frac except Exception: pass self._update_title_with_host() if not silent: self._status_update(info=f"Settings loaded from {path}") def save_settings(self, silent: bool = False) -> None: """Save settings to JSON in the current working directory ('.').""" path = self._settings_path() # Best-effort sash fraction sash_fraction = DEFAULT_SASH_FRACTION try: w = self.paned.winfo_width() or self.paned.winfo_reqwidth() or 1 pos = self.paned.sashpos(0) if w > 0: sash_fraction = max(0.10, min(0.90, float(pos) / float(w))) except Exception: pass payload = { "version": 2, "host": (self.host_var.get() or "").strip(), "port": int(self.port_var.get() or PORT_DEFAULT), "connection_type": (self.conn_type_var.get() or "tcp").strip().lower(), "serial_port": (self.serial_port_var.get() or "").strip(), "ble_address": (self.ble_addr_var.get() or "").strip(), "theme": self.current_theme, "channel_choice": (self.channel_var.get() or "").strip(), "window_geometry": self.root.winfo_geometry(), "sash_fraction": sash_fraction, } try: tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(payload, indent=2), encoding="utf-8") tmp.replace(path) if not silent: messagebox.showinfo("Settings", f"Saved settings to:\n{path}") except Exception as e: if not silent: messagebox.showerror("Settings", f"Failed to save settings to:\n{path}\n\n{e}") def open_settings_dialog(self) -> None: """Open a Settings window (loads/saves JSON in the current working dir).""" win = tk.Toplevel(self.root) win.title("Settings") self._style_toplevel(win) frm = ttk.Frame(win, padding=10) frm.grid(row=0, column=0, sticky="nsew") win.rowconfigure(0, weight=1) win.columnconfigure(0, weight=1) nb = ttk.Notebook(frm) nb.grid(row=0, column=0, sticky="nsew") frm.rowconfigure(0, weight=1) frm.columnconfigure(0, weight=1) tab_general = ttk.Frame(nb, padding=10) tab_conn = ttk.Frame(nb, padding=10) tab_file = ttk.Frame(nb, padding=10) nb.add(tab_general, text="General") nb.add(tab_conn, text="Connection") nb.add(tab_file, text="File") # --- General tab ------------------------------------------------ theme_var = tk.StringVar(value=self.current_theme) ttk.Label(tab_general, text="Theme:").grid(row=0, column=0, sticky="w", pady=4) cbo_theme = ttk.Combobox(tab_general, textvariable=theme_var, state="readonly", values=["dark", "light"], width=10) cbo_theme.grid(row=0, column=1, sticky="w", pady=4, padx=6) ttk.Label(tab_general, text="Default send target:").grid(row=1, column=0, sticky="w", pady=4) choice_var = tk.StringVar(value=self.channel_var.get()) try: choices = list(self.cbo_channel["values"]) except Exception: choices = [] cbo_choice = ttk.Combobox(tab_general, textvariable=choice_var, state="readonly", values=choices, width=28) cbo_choice.grid(row=1, column=1, sticky="w", pady=4, padx=6) tab_general.columnconfigure(1, weight=1) # --- Connection tab -------------------------------------------- # Pretty labels in UI, but we store keys: tcp|serial|ble key_to_label = {"tcp": "TCP (WiFi)", "serial": "USB/Serial", "ble": "Bluetooth (BLE)"} label_to_key = {v: k for k, v in key_to_label.items()} method_label_var = tk.StringVar(value=key_to_label.get((self.conn_type_var.get() or "tcp").strip().lower(), "TCP (WiFi)")) ttk.Label(tab_conn, text="Method:").grid(row=0, column=0, sticky="w", pady=4) cbo_method = ttk.Combobox(tab_conn, textvariable=method_label_var, state="readonly", values=[key_to_label["tcp"], key_to_label["serial"], key_to_label["ble"]], width=18) cbo_method.grid(row=0, column=1, sticky="w", pady=4, padx=6) # TCP settings tcp_box = ttk.Labelframe(tab_conn, text="TCP settings", padding=10) ttk.Label(tcp_box, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4) ent_host = ttk.Entry(tcp_box, textvariable=self.host_var, width=32) ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=6) ttk.Label(tcp_box, text="Port:").grid(row=1, column=0, sticky="w", pady=4) ent_port = ttk.Entry(tcp_box, textvariable=self.port_var, width=10) ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=6) tcp_box.columnconfigure(1, weight=1) # Serial settings serial_box = ttk.Labelframe(tab_conn, text="USB/Serial settings", padding=10) serial_port_ui = tk.StringVar(value=(self.serial_port_var.get() or "").strip() or "(auto)") ttk.Label(serial_box, text="Serial port:").grid(row=0, column=0, sticky="w", pady=4) cbo_serial = ttk.Combobox(serial_box, textvariable=serial_port_ui, state="readonly", width=30) cbo_serial.grid(row=0, column=1, sticky="w", pady=4, padx=6) def refresh_serial_ports(): ports = [] if list_ports: try: ports = [p.device for p in list_ports.comports()] except Exception: ports = [] vals = ["(auto)"] + ports # Ensure saved/manual value is visible cur = (serial_port_ui.get() or "").strip() if cur and cur not in vals: vals.append(cur) cbo_serial["values"] = vals if not cur: serial_port_ui.set("(auto)") ttk.Button(serial_box, text="Refresh", command=refresh_serial_ports).grid(row=0, column=2, padx=(6, 0), sticky="w") refresh_serial_ports() # BLE settings ble_box = ttk.Labelframe(tab_conn, text="Bluetooth (BLE) settings", padding=10) ble_addr_ui = tk.StringVar(value=(self.ble_addr_var.get() or "").strip()) ttk.Label(ble_box, text="Device (name or address):").grid(row=0, column=0, sticky="w", pady=4) ent_ble = ttk.Entry(ble_box, textvariable=ble_addr_ui, width=36) ent_ble.grid(row=0, column=1, sticky="ew", pady=4, padx=6) def scan_ble_into_settings(): if BLEInterface is None: messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).") return try: devices = BLEInterface.scan() except Exception as e: messagebox.showerror("BLE scan failed", str(e)) return if not devices: messagebox.showinfo("BLE", "No devices found.") return options = [ f"{i + 1}. {getattr(d, 'name', '') or '(unnamed)'} [{getattr(d, 'address', '?')}]" for i, d in enumerate(devices) ] choice = simpledialog.askinteger( "Select BLE device", "Enter number:\n" + "\n".join(options), minvalue=1, maxvalue=len(devices), ) if not choice: return addr = getattr(devices[choice - 1], "address", None) if not addr: messagebox.showerror("BLE", "Selected device has no address.") return ble_addr_ui.set(str(addr)) ttk.Button(ble_box, text="Scan...", command=scan_ble_into_settings).grid(row=0, column=2, padx=(6, 0), sticky="w") ble_box.columnconfigure(1, weight=1) # place boxes (we show/hide depending on method) tcp_box.grid(row=1, column=0, columnspan=3, sticky="ew", pady=(10, 6)) serial_box.grid(row=2, column=0, columnspan=3, sticky="ew", pady=6) ble_box.grid(row=3, column=0, columnspan=3, sticky="ew", pady=6) tab_conn.columnconfigure(1, weight=1) def _show_only(key: str): key = key.strip().lower() if key == "serial": tcp_box.grid_remove() ble_box.grid_remove() serial_box.grid() elif key == "ble": tcp_box.grid_remove() serial_box.grid_remove() ble_box.grid() else: serial_box.grid_remove() ble_box.grid_remove() tcp_box.grid() def _on_method_change(*_): key = label_to_key.get(method_label_var.get(), "tcp") _show_only(key) cbo_method.bind("<>", _on_method_change) _show_only(label_to_key.get(method_label_var.get(), "tcp")) # --- File tab --------------------------------------------------- path = self._settings_path() ttk.Label(tab_file, text="Settings file:").grid(row=0, column=0, sticky="w") lbl_path = ttk.Entry(tab_file, width=60) lbl_path.grid(row=1, column=0, sticky="ew", pady=(4, 10)) lbl_path.insert(0, str(path)) lbl_path.configure(state="readonly") tab_file.columnconfigure(0, weight=1) def open_folder(): folder = str(path.parent) try: if os.name == "nt": os.startfile(folder) # type: ignore[attr-defined] else: subprocess.Popen(["xdg-open", folder]) except Exception as e: messagebox.showerror("Open folder", str(e)) ttk.Button(tab_file, text="Open folder", command=open_folder).grid(row=2, column=0, sticky="w") # --- Buttons ---------------------------------------------------- btnbar = ttk.Frame(frm) btnbar.grid(row=1, column=0, sticky="e", pady=(10, 0)) def apply_and_save(): # Validate port if TCP method_key = label_to_key.get(method_label_var.get(), "tcp") if method_key == "tcp": h = (self.host_var.get() or "").strip() if not h: messagebox.showerror("Settings", "Host/IP cannot be empty for TCP.") return try: p = int(self.port_var.get()) if not (1 <= p <= 65535): raise ValueError except Exception: messagebox.showerror("Settings", "Port must be 1-65535") return # Persist connection settings self.conn_type_var.set(method_key) sp = (serial_port_ui.get() or "").strip() self.serial_port_var.set("" if sp.lower() == "(auto)" else sp) self.ble_addr_var.set((ble_addr_ui.get() or "").strip()) # Theme self.current_theme = theme_var.get() self.apply_theme(self.current_theme) # Default send target selected_choice = (choice_var.get() or "").strip() if selected_choice: try: values_now = list(self.cbo_channel["values"]) except Exception: values_now = [] if selected_choice in values_now: self.channel_var.set(selected_choice) else: self._pending_channel_choice = selected_choice self.save_settings(silent=False) def do_load(): self.load_settings(silent=False) theme_var.set(self.current_theme) choice_var.set(self.channel_var.get()) # Refresh method + params method_label_var.set(key_to_label.get((self.conn_type_var.get() or "tcp").strip().lower(), "TCP (WiFi)")) serial_port_ui.set((self.serial_port_var.get() or "").strip() or "(auto)") ble_addr_ui.set((self.ble_addr_var.get() or "").strip()) refresh_serial_ports() _on_method_change() ttk.Button(btnbar, text="Load", command=do_load).grid(row=0, column=0, padx=4) ttk.Button(btnbar, text="Save", command=apply_and_save).grid(row=0, column=1, padx=4) ttk.Button(btnbar, text="Close", command=win.destroy).grid(row=0, column=2, padx=4) def close_app(self) -> None: """Graceful close: save settings, disconnect, then destroy.""" try: self.save_settings(silent=True) except Exception: pass try: self.disconnect() finally: try: self.root.destroy() except Exception: pass class NodeChatWindow: def __init__(self, app: MeshtasticGUI, node_key: str, label: str): self.app = app self.node_key = node_key self.label = label self.top = tk.Toplevel(app.root) self.top.title(f"Chat: {label}") app._style_toplevel(self.top) self.top.protocol("WM_DELETE_WINDOW", self._on_close) frm = ttk.Frame(self.top, padding=8) frm.pack(expand=True, fill="both") frm.rowconfigure(0, weight=1) frm.columnconfigure(0, weight=1) self.txt = tk.Text(frm, wrap="word") self.txt.grid(row=0, column=0, columnspan=2, sticky="nsew") yscroll = ttk.Scrollbar(frm, orient="vertical", command=self.txt.yview) self.txt.configure(yscrollcommand=yscroll.set) yscroll.grid(row=0, column=2, sticky="ns") self.entry = ttk.Entry(frm) self.entry.grid(row=1, column=0, sticky="nsew", pady=(6, 0)) self.entry.bind("", self._on_send) ttk.Button(frm, text="Send", command=self._on_send).grid(row=1, column=1, sticky="e", padx=(4, 0), pady=(6, 0)) self.apply_theme() def apply_theme(self): is_dark = self.app.current_theme == "dark" self.txt.configure( bg=("#2d2d2d" if is_dark else "#ffffff"), fg=("#ffffff" if is_dark else "#000000"), insertbackground=("#ffffff" if is_dark else "#000000"), ) def append(self, line: str): try: self.txt.insert("end", line + "\n") self.txt.see("end") except Exception: pass def _on_send(self, *_): msg = self.entry.get().strip() if not msg: return ok = self.app._send_text_to_node(self.node_key, msg) if ok: self.entry.delete(0, "end") def _on_close(self): try: self.app._per_node_chats.pop(str(self.node_key), None) except Exception: pass self.top.destroy() def main(): app = MeshtasticGUI() app.root.protocol("WM_DELETE_WINDOW", app.close_app) app.root.mainloop() if __name__ == "__main__": main()