diff --git a/Debian/meshtastic.ico b/Debian/meshtastic.ico new file mode 100644 index 0000000..e69de29 diff --git a/Debian/meshtastic.png b/Debian/meshtastic.png new file mode 100644 index 0000000..668347f Binary files /dev/null and b/Debian/meshtastic.png differ diff --git a/Debian/meshtastic_client.py b/Debian/meshtastic_client.py new file mode 100644 index 0000000..4d38c72 --- /dev/null +++ b/Debian/meshtastic_client.py @@ -0,0 +1,1336 @@ +# -*- coding: utf-8 -*- +#!/usr/bin/env python3 +from __future__ import annotations +import json, time, datetime, threading, pathlib, tkinter as tk +from tkinter import ttk, messagebox, simpledialog +from typing import Any, Dict, Optional +import os +import subprocess +import webbrowser + +try: + from pubsub import pub +except Exception: + pub = None + +try: + from meshtastic.tcp_interface import TCPInterface +except Exception: + TCPInterface = None +try: + from meshtastic.serial_interface import SerialInterface +except Exception: + SerialInterface = None +try: + from meshtastic.ble_interface import BLEInterface +except Exception: + BLEInterface = None + +try: + from meshtastic.protobuf import mesh_pb2, portnums_pb2 + import google.protobuf.json_format as _json_format +except Exception: + mesh_pb2 = None + portnums_pb2 = None + _json_format = None + +try: + from serial.tools import list_ports +except Exception: + list_ports = None + +HOST_DEFAULT = "192.168.0.156" +PORT_DEFAULT = 4403 +PROJECT_PATH = pathlib.Path(__file__).parent +ICON_PATH = PROJECT_PATH / "meshtastic.ico" + + +def _prefer_chrome(url: str): + """Open URL in a sensible browser on any OS. + On Flatpak/Linux we just use the default handler. + """ + # Try system default browser first (works well in Flatpak) + try: + webbrowser.open(url) + return + except Exception: + pass + + # Fallbacks for some common desktop setups + candidate_browsers = [ + # Linux / BSD + "xdg-open", + "gio open", + "sensible-browser", + "google-chrome-stable", + "chromium", + # Windows (if someone still runs it there) + r"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", + r"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe", + ] + + for cmd in candidate_browsers: + try: + if os.path.isabs(cmd) and os.path.exists(cmd): + subprocess.Popen([cmd, url]) + return + else: + # Let the shell resolve it from PATH + subprocess.Popen(cmd.split() + [url]) + return + except Exception: + continue + + +def _fmt_ago(epoch_seconds: Optional[float]) -> str: + if not epoch_seconds: + return "N/A" + try: + delta = time.time() - float(epoch_seconds) + except Exception: + return "N/A" + if delta < 0: + delta = 0 + mins = int(delta // 60) + hours = int(delta // 3600) + days = int(delta // 86400) + if delta < 60: + return "%ds" % int(delta) + if mins < 60: + return "%dm" % mins + if hours < 24: + return "%dh" % hours + if days < 7: + return "%dd" % days + dt = datetime.datetime.fromtimestamp(epoch_seconds) + return dt.strftime("%Y-%m-%d %H:%M") + + +class MeshtasticGUI: + def __init__(self, master: Optional[tk.Tk] = None): + self.root = master or tk.Tk() + self.root.title("Meshtastic Client") + + try: + if ICON_PATH.exists(): + self.root.iconbitmap(default=str(ICON_PATH)) + except Exception: + pass + + self.current_theme = "dark" + + self.root.rowconfigure(0, weight=1) + self.root.columnconfigure(0, weight=1) + + self.host_var = tk.StringVar(value=HOST_DEFAULT) + self.port_var = tk.IntVar(value=PORT_DEFAULT) + + self.menubar = tk.Menu(self.root) + m_conn = tk.Menu(self.menubar, tearoff=False) + m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp) + m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog) + m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog) + m_conn.add_command(label="Disconnect", command=self.disconnect) + m_conn.add_separator() + m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port) + self.menubar.add_cascade(label="Connection", menu=m_conn) + + m_tools = tk.Menu(self.menubar, tearoff=False) + m_tools.add_command(label="Clear messages", command=lambda: self.txt_messages.delete("1.0", "end")) + self.menubar.add_cascade(label="Tools", menu=m_tools) + + m_view = tk.Menu(self.menubar, tearoff=False) + m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light")) + m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark")) + self.menubar.add_cascade(label="View", menu=m_view) + + m_links = tk.Menu(self.menubar, tearoff=False) + m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client")) + m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/")) + m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/")) + m_links.add_command(label="Meshtastic Web Client", command=lambda: self._open_browser_url("https://client.meshtastic.org")) + m_links.add_command(label="Meshtastic docker client", command=lambda: self._open_browser_url("https://meshtastic.org/docs/software/linux/usage/#usage-with-docker")) + m_links.add_separator() + m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/")) + m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/")) + self.menubar.add_cascade(label="Links", menu=m_links) + + self.root.config(menu=self.menubar) + + self.rootframe = ttk.Frame(self.root) + self.rootframe.grid(row=0, column=0, sticky="nsew") + self.rootframe.rowconfigure(0, weight=1) + self.rootframe.columnconfigure(0, weight=1) + + self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal") + self.paned.grid(row=0, column=0, sticky="nsew") + + # messages + self.msg_frame = ttk.Frame(self.paned) + self.msg_frame.rowconfigure(1, weight=1) + self.msg_frame.columnconfigure(0, weight=1) + + ttk.Label(self.msg_frame, text="Messages").grid(row=0, column=0, sticky="w", pady=(2, 0)) + + self.txt_messages = tk.Text(self.msg_frame, wrap="word") + self.txt_messages.grid(row=1, column=0, sticky="nsew", padx=(0, 4), pady=(2, 2)) + yscroll_left = ttk.Scrollbar(self.msg_frame, orient="vertical", command=self.txt_messages.yview) + self.txt_messages.configure(yscrollcommand=yscroll_left.set) + yscroll_left.grid(row=1, column=1, sticky="ns") + + self.send_frame = ttk.Frame(self.msg_frame) + self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4)) + self.send_frame.columnconfigure(0, weight=1) + self.ent_message = ttk.Entry(self.send_frame) + self.ent_message.grid(row=0, column=0, sticky="nsew") + self.ent_message.bind("", lambda e: self.send_message()) + self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message) + self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew") + + # channel selector (public / selected / private channels) + self.channel_var = tk.StringVar() + self._channel_map = {} + self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=22) + self._reset_channel_choices() + self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w") + + # nodes + self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)") + self.nodes_frame.rowconfigure(1, weight=1) + self.nodes_frame.columnconfigure(0, weight=1) + + self.ent_search = ttk.Entry(self.nodes_frame) + self.ent_search.grid(row=0, column=0, sticky="nsew", padx=2, pady=2) + self.ent_search.bind("", lambda e: self.refresh_nodes()) + + self.cols_all = ( + "shortname", "longname", "since", "hops", + "distkm", "speed", "alt", + "lastheard", "hwmodel", "role", + "macaddr", "publickey", "isunmessagable", "id" + ) + self.cols_visible = ( + "shortname", "longname", "since", "hops", + "distkm", "speed", "alt", "hwmodel", "role" + ) + self.tv_nodes = ttk.Treeview( + self.nodes_frame, + columns=self.cols_all, + show="headings", + displaycolumns=self.cols_visible + ) + self.tv_nodes.grid(row=1, column=0, sticky="nsew", padx=(2, 0), pady=(0, 2)) + + headings = { + "shortname": "Short", + "longname": "Long", + "since": "Since", + "hops": "Hops", + "distkm": "Dist (km)", + "speed": "Speed", + "alt": "Alt (m)", + "hwmodel": "HW", + "role": "Role", + "lastheard": "", + "macaddr": "MAC", + "publickey": "Public key", + "isunmessagable": "Unmsg?", + "id": "ID", + } + for key, text in headings.items(): + self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False)) + + widths = { + "shortname": 90, + "longname": 200, + "since": 90, + "hops": 50, + "distkm": 70, + "speed": 70, + "alt": 70, + "hwmodel": 90, + "role": 110, + } + for key, w in widths.items(): + try: + self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt"))) + except Exception: + pass + + # hide technical columns + for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"): + try: + self.tv_nodes.column(col, width=0, minwidth=0, stretch=False) + except Exception: + pass + + yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview) + self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set) + yscroll_nodes.grid(row=1, column=1, sticky="ns") + + # right-click: + self.node_menu = tk.Menu(self.nodes_frame, tearoff=False) + self.node_menu.add_command(label="Node info", command=self._cm_show_node_details) + self.node_menu.add_command(label="Map", command=self._cm_open_map) + self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute) + self.node_menu.add_separator() + self.node_menu.add_command(label="Delete node", command=self._cm_delete_node) + + self.tv_nodes.bind("", self._popup_node_menu) + self.tv_nodes.bind("", lambda e: self._toggle_send_target()) + + self.paned.add(self.msg_frame, weight=3) + self.paned.add(self.nodes_frame, weight=4) + self.root.after(100, lambda: self._safe_set_sash(0.40)) + + self.iface: Optional[object] = None + self.connected_evt = threading.Event() + self._last_seen_overrides: Dict[str, float] = {} + self._last_sort_col: Optional[str] = "since" + self._last_sort_reverse: bool = True + + if pub is not None: + try: + pub.subscribe(self.on_connection_established, "meshtastic.connection.established") + pub.subscribe(self.on_connection_lost, "meshtastic.connection.lost") + pub.subscribe(self.on_receive, "meshtastic.receive") + pub.subscribe(self.on_node_updated, "meshtastic.node.updated") + except Exception as e: + print("pubsub subscribe failed:", e) + + self.apply_theme("light") + self._append("Ready. Connection -> Connect (TCP/USB/BLE)") + self._update_title_with_host() + + # helpers --------------------------------------------------------- + def _open_browser_url(self, url: str): + _prefer_chrome(url) + + def _update_title_with_host(self): + self.root.title("Meshtastic Client - %s:%s" % (self.host_var.get(), self.port_var.get())) + + def _safe_set_sash(self, fraction: float = 0.5): + try: + w = self.paned.winfo_width() or self.paned.winfo_reqwidth() + self.paned.sashpos(0, int(w * fraction)) + except Exception: + pass + + def _node_label(self, node_id: str) -> str: + if not self.iface or not getattr(self.iface, "nodes", None): + return node_id + node = self.iface.nodes.get(node_id, {}) # type: ignore[attr-defined] + user = (node or {}).get("user") or {} + shortname = user.get("shortName") or "" + longname = user.get("longName") or "" + label = ("%s %s" % (shortname, longname)).strip() + if label: + return label + return node_id + + # pubsub callbacks ------------------------------------------------ + def on_connection_established(self, interface=None, **kwargs): + self.connected_evt.set() + self._append("[+] Connected") + # refresh nodes and channel list when we connect + self.refresh_nodes() + try: + self._update_channels_from_iface() + except Exception: + pass + def on_connection_lost(self, interface=None, **kwargs): + self.connected_evt.clear() + self._append("[-] Connection lost") + + def on_node_updated(self, node=None, interface=None, **kwargs): + self.root.after(0, self.refresh_nodes) + + def on_receive(self, packet=None, interface=None, **kwargs): + self.root.after(0, lambda: self._handle_receive(packet or {})) + + # receive --------------------------------------------------------- + def _handle_receive(self, packet: dict): + decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} + app_name = decoded.get("app", "") + portnum = decoded.get("portnum") or app_name + from_id = packet.get("fromId") or packet.get("from") or "UNKNOWN" + label = self._node_label(from_id) if hasattr(self, "_node_label") else from_id + + tag_map = { + "TEXT_MESSAGE_APP": "MSG", + "TEXT_MESSAGE_COMPRESSED_APP": "MSG", + "POSITION_APP": "POS", + "TELEMETRY_APP": "TEL", + "NODEINFO_APP": "INFO", + "ROUTING_APP": "ROUT", + "MAP_REPORT_APP": "MAP", + "ADMIN_APP": "ADM", + "NEIGHBORINFO_APP": "NEI", + "STORE_FORWARD_APP": "SFWD", + "REMOTE_HARDWARE_APP": "RHW", + "PRIVATE_APP": "PRIV", + } + tag = tag_map.get(app_name or portnum, "INFO") + + if app_name in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"): + text = decoded.get("text") or decoded.get("payload", {}).get("text", "") + if text: + self._append(f"[MSG] {label}: {text}") + else: + self._append(f"[MSG] {label}") + + if hasattr(self, "refresh_nodes"): + self.refresh_nodes() + + decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} + portnum = decoded.get("portnum") + sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort") + if sender: + self._last_seen_overrides[str(sender)] = time.time() + + user = {} + if self.iface and getattr(self.iface, "nodes", None) and sender: + user = (self.iface.nodes.get(sender) or {}).get("user", {}) # type: ignore[attr-defined] + shortname = user.get("shortName") or "" + longname = user.get("longName") or "" + label = str(shortname or longname or sender or "Unknown").strip() + + text = "" + p = decoded.get("payload", "") + if isinstance(p, (bytes, bytearray)): + try: + text = p.decode("utf-8", errors="ignore") + except Exception: + text = repr(p) + elif isinstance(p, str): + text = p + else: + t = decoded.get("text") + if isinstance(t, bytes): + text = t.decode("utf-8", errors="ignore") + elif isinstance(t, str): + text = t + + rssi = packet.get("rxRssi") + + if portnum == "TEXT_MESSAGE_APP": + self._append("[MSG] %s: %s (RSSI=%s)" % (label, text, rssi)) + if isinstance(text, str) and text.strip().lower() == "ping": + self._send_pong(sender, label) + + self.refresh_nodes() + + def _send_pong(self, dest_id: Optional[str], label: str): + if not self.iface or not dest_id: + return + try: + self.iface.sendText("pong", destinationId=dest_id, wantAck=False) + self._append("[auto] pong -> %s" % label) + except Exception as e: + self._append("[auto] pong failed: %s" % e) + + # connection actions ---------------------------------------------- + def set_ip_port(self): + win = tk.Toplevel(self.root) + win.title("Set IP/Port") + self._style_toplevel(win) + frm = ttk.Frame(win, padding=8) + frm.grid(row=0, column=0, sticky="nsew") + win.columnconfigure(0, weight=1) + win.rowconfigure(0, weight=1) + + ttk.Label(frm, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4) + ent_host = ttk.Entry(frm, textvariable=self.host_var, width=28) + ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=4) + + ttk.Label(frm, text="Port:").grid(row=1, column=0, sticky="w", pady=4) + ent_port = ttk.Entry(frm, textvariable=self.port_var, width=10) + ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=4) + + frm.columnconfigure(1, weight=1) + + def save(): + h = self.host_var.get().strip() + try: + p = int(self.port_var.get()) + except Exception: + messagebox.showerror("Port", "Port must be 1-65535") + return + if not h: + messagebox.showerror("Host", "Host cannot be empty") + return + if not (1 <= p <= 65535): + messagebox.showerror("Port", "Port must be 1-65535") + return + self.host_var.set(h) + self.port_var.set(p) + self._update_title_with_host() + win.destroy() + + btnbar = ttk.Frame(frm) + btnbar.grid(row=2, column=0, columnspan=2, sticky="e") + ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4) + ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4) + + def connect_tcp(self): + if self.iface: + return + host = self.host_var.get().strip() + try: + port = int(self.port_var.get()) + except Exception: + messagebox.showerror("Port", "Invalid port") + return + self._append("Connecting TCP %s:%s ..." % (host, port)) + + def run(): + try: + if TCPInterface is None: + raise RuntimeError("meshtastic.tcp_interface not installed") + self.iface = TCPInterface(hostname=host, portNumber=port) + self.connected_evt.wait(timeout=5) + except Exception as e: + self.root.after(0, lambda: messagebox.showerror("TCP connect failed", str(e))) + threading.Thread(target=run, daemon=True).start() + + def connect_serial_dialog(self): + if SerialInterface is None: + messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.") + return + + ports = [] + if list_ports: + try: + ports = [p.device for p in list_ports.comports()] + except Exception: + ports = [] + presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"] + port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=presets[0]) + if port is None: + return + port = port.strip() + if port.lower() == "(auto)" or port == "": + port = None + self._append("Connecting Serial %s ..." % (port or "(auto)")) + + def run(): + try: + self.iface = SerialInterface(devPath=port) if port else SerialInterface() + self.connected_evt.wait(timeout=5) + except Exception as e: + self.root.after(0, lambda: messagebox.showerror("Serial connect failed", str(e))) + threading.Thread(target=run, daemon=True).start() + + def connect_ble_dialog(self): + if BLEInterface is None: + messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).") + return + self._append("Scanning BLE for Meshtastic devices ...") + try: + devices = BLEInterface.scan() + except Exception as e: + messagebox.showerror("BLE scan failed", str(e)) + return + if not devices: + messagebox.showinfo("BLE", "No devices found.") + return + options = ["%d. %s [%s]" % (i + 1, getattr(d, "name", "") or "(unnamed)", getattr(d, "address", "?")) for i, d in enumerate(devices)] + choice = simpledialog.askinteger( + "Select BLE device", + "Enter number:\n" + "\n".join(options), + minvalue=1, + maxvalue=len(devices), + ) + if not choice: + return + addr = getattr(devices[choice - 1], "address", None) + if not addr: + messagebox.showerror("BLE", "Selected device has no address.") + return + self._append("Connecting BLE %s ..." % addr) + + def run(): + try: + self.iface = BLEInterface(address=addr) + self.connected_evt.wait(timeout=8) + except Exception as e: + self.root.after(0, lambda: messagebox.showerror("BLE connect failed", str(e))) + threading.Thread(target=run, daemon=True).start() + + def disconnect(self): + try: + if self.iface: + self.iface.close() + except Exception: + pass + self.iface = None + self.connected_evt.clear() + self._append("[*] Disconnected") + + # send ------------------------------------------------------------ + + # channel selector helpers ---------------------------------------- + def _reset_channel_choices(self): + """Initialize channel selector with Public + To selected.""" + self._channel_map = {} + options = [] + + label_pub = "Public (broadcast)" + self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0} + options.append(label_pub) + + label_sel = "To selected node" + self._channel_map[label_sel] = {"mode": "selected", "channelIndex": 0} + options.append(label_sel) + + if hasattr(self, "cbo_channel"): + self.cbo_channel["values"] = options + if hasattr(self, "channel_var"): + self.channel_var.set(label_pub) + + def _set_channel_choice(self, label: str): + """Safely set the current channel choice, if it exists.""" + try: + values = list(self.cbo_channel["values"]) + except Exception: + return + if label not in values: + return + self.channel_var.set(label) + + + def _update_channels_from_iface(self): + """ + Populate channel selector with channels from the connected device. + + We keep: + * "Public (broadcast)" -> broadcast on channel 0 + * "To selected node" -> direct message to selected node on channel 0 + And then append additional channels (1..N) from the radio as: + * "Ch : " -> broadcast on that channel. + """ + iface = getattr(self, "iface", None) + if not iface: + return + local_node = getattr(iface, "localNode", None) + if not local_node: + return + + # Try to request channels if we don't have them yet. + chans = getattr(local_node, "channels", None) + try: + if (not chans) and hasattr(local_node, "requestChannels"): + local_node.requestChannels() + time.sleep(1.5) + chans = getattr(local_node, "channels", None) + except Exception: + chans = getattr(local_node, "channels", None) + + try: + options = list(self.cbo_channel["values"]) + except Exception: + return + + # If channel 0 has a name, update the "Public" label to show it. + try: + if chans and len(chans) > 0: + ch0 = chans[0] + try: + ch0_name = (getattr(ch0, "settings", None).name or "").strip() + except Exception: + try: + ch0_name = (ch0.settings.name or "").strip() + except Exception: + ch0_name = "" + if ch0_name: + # Find the existing public entry (mode=broadcast, channelIndex=0) + old_label = None + for lbl, meta in list(self._channel_map.items()): + if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0: + old_label = lbl + break + if old_label: + new_label = f"Public (ch0: {ch0_name})" + if new_label != old_label: + # Update mapping + self._channel_map[new_label] = self._channel_map.pop(old_label) + # Update combobox options + options = [new_label if v == old_label else v for v in options] + # Keep current selection if it was pointing to the old label + if self.channel_var.get() == old_label: + self.channel_var.set(new_label) + except Exception: + # Failing to pretty-print channel 0 is not fatal; just continue. + pass + + # Add remaining channels (1..N) as broadcast options. + for idx, ch in enumerate(chans or []): + if idx == 0: + # Skip channel 0 here – it's already represented by "Public". + continue + try: + name = (getattr(ch, "settings", None).name or "").strip() + except Exception: + # older protobufs might expose fields differently + try: + name = (ch.settings.name or "").strip() + except Exception: + name = "" + if not name: + label = f"Ch {idx}" + else: + label = f"Ch {idx}: {name}" + if label in self._channel_map: + continue + self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx} + options.append(label) + + try: + self.cbo_channel["values"] = options + except Exception: + pass + + + def send_message(self): + msg = self.ent_message.get().strip() + if not msg: + return + if not self.iface: + messagebox.showwarning("Not connected", "Connect first.") + return + + try: + choice = self.channel_var.get() if hasattr(self, "channel_var") else "" + info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or { + "mode": "broadcast", + "channelIndex": 0, + } + mode = info.get("mode", "broadcast") + ch_index = int(info.get("channelIndex", 0) or 0) + + if mode == "selected": + # Direct message to the currently selected node + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("No selection", "Select a node first.") + return + dest = self._resolve_node_dest_id(nid) + if not dest: + messagebox.showerror("Send failed", "Cannot resolve destination for selected node.") + return + self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index) + self._append("[ME -> %s] %s" % (self._node_label(nid), msg)) + else: + # Broadcast on chosen channel (public or private) + self.iface.sendText(msg, wantAck=False, channelIndex=ch_index) + label = self.channel_var.get() if hasattr(self, "channel_var") else "" + if mode == "broadcast_channel" and ch_index: + self._append("[ME ch%d] %s" % (ch_index, msg)) + else: + self._append("[ME] %s" % msg) + + self.ent_message.delete(0, "end") + except Exception as e: + messagebox.showerror("Send failed", str(e)) + + # nodes ----------------------------------------------------------- + def _get_lastheard_epoch(self, node_id: str, node: Dict[str, Any]) -> Optional[float]: + raw = (node or {}).get("lastHeard") + ts_iface = None + if raw is not None: + try: + val = float(raw) + ts_iface = val / 1000.0 if val > 10000000000 else val + except Exception: + ts_iface = None + ts_local = self._last_seen_overrides.get(str(node_id)) + if ts_iface and ts_local: + return max(ts_iface, ts_local) + return ts_iface or ts_local + + def _extract_latlon(self, node: dict) -> tuple[float | None, float | None]: + pos = (node or {}).get("position") or {} + lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i") + lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i") + try: + if lat is not None: + lat = float(lat) * (1e-7 if abs(float(lat)) > 90 else 1.0) + if lon is not None: + lon = float(lon) * (1e-7 if abs(float(lon)) > 180 else 1.0) + except Exception: + lat = lon = None + return lat, lon + + def _extract_speed_alt(self, node: dict) -> tuple[float | None, float | None]: + """Return (speed_kmh, alt_m) if present in node.position, else (None, None).""" + pos = (node or {}).get("position") or {} + speed = ( + pos.get("groundSpeedKmh") + or pos.get("groundSpeedKmhI") + or pos.get("groundSpeed") + or pos.get("ground_speed") + ) + alt = ( + pos.get("altitude") + or pos.get("altitudeM") + or pos.get("altitude_i") + or pos.get("altitudeI") + ) + try: + if speed is not None: + speed = float(speed) + if alt is not None: + alt = float(alt) + except Exception: + speed, alt = None, None + return speed, alt + + def _get_local_latlon(self) -> tuple[float | None, float | None]: + if not self.iface: + return (None, None) + try: + mi = getattr(self.iface, "myInfo", None) + nbn = getattr(self.iface, "nodesByNum", None) + if mi is not None and hasattr(mi, "my_node_num") and nbn: + n = nbn.get(mi.my_node_num) or {} + lat, lon = self._extract_latlon(n) + if lat is not None and lon is not None: + return lat, lon + except Exception: + pass + return (None, None) + + def _haversine_km(self, lat1, lon1, lat2, lon2) -> float: + import math + R = 6371.0088 + phi1 = math.radians(lat1) + phi2 = math.radians(lat2) + dphi = math.radians(lat2 - lat1) + dlmb = math.radians(lon2 - lon1) + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlmb / 2) ** 2 + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + return R * c + + def refresh_nodes(self): + if not self.iface or not getattr(self.iface, "nodes", None): + return + q = self.ent_search.get().strip().lower() + for iid in self.tv_nodes.get_children(""): + self.tv_nodes.delete(iid) + + try: + nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined] + except Exception: + nodes_snapshot = {} + + base_lat, base_lon = self._get_local_latlon() + + for node_id, node in nodes_snapshot.items(): + user = (node or {}).get("user") or {} + shortname = user.get("shortName") or "" + longname = user.get("longName") or "" + hwmodel = user.get("hwModel") or "" + role = user.get("role") or "" + macaddr = user.get("macaddr") or "" + publickey = user.get("publicKey") or "" + unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False + + lastheard_epoch = self._get_lastheard_epoch(node_id, node) + since_str = _fmt_ago(lastheard_epoch) + hops = node.get("hopsAway") + + lat, lon = self._extract_latlon(node) + if base_lat is not None and base_lon is not None and lat is not None and lon is not None: + try: + dist = self._haversine_km(base_lat, base_lon, lat, lon) + except Exception: + dist = None + else: + dist = None + dist_str = "%.1f" % dist if isinstance(dist, (int, float)) else "-" + + speed, alt = self._extract_speed_alt(node) + speed_str = "%.1f" % speed if isinstance(speed, (int, float)) else "-" + alt_str = "%.0f" % alt if isinstance(alt, (int, float)) else "-" + + values = ( + shortname, + longname, + since_str, + str(hops) if hops is not None else "-", + dist_str, + speed_str, + alt_str, + "%.0f" % (lastheard_epoch or 0), + hwmodel, + role, + macaddr, + publickey, + str(bool(unmsg)), + node_id, + ) + + if not q or any(q in str(v).lower() for v in values): + try: + self.tv_nodes.insert("", "end", iid=node_id, values=values) + except Exception: + self.tv_nodes.insert("", "end", values=values) + + if self._last_sort_col: + self.sort_by_column(self._last_sort_col, self._last_sort_reverse) + + self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children())) + + def sort_by_column(self, col: str, reverse: bool = False): + self._last_sort_col = col + self._last_sort_reverse = reverse + col_to_sort = "lastheard" if col == "since" else col + numeric = {"lastheard", "distkm", "hops", "speed", "alt"} + rows = [] + for iid in self.tv_nodes.get_children(""): + val = self.tv_nodes.set(iid, col_to_sort) + if col_to_sort in numeric: + try: + val = float(val if val != "-" else 0.0) + except Exception: + val = 0.0 + else: + val = val.casefold() + rows.append((val, iid)) + rows.sort(key=lambda t: t[0], reverse=reverse) + for index, (_, iid) in enumerate(rows): + self.tv_nodes.move(iid, "", index) + self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse)) + + # THEME ----------------------------------------------------------- + def apply_theme(self, mode: str = "light"): + self.current_theme = mode + is_dark = mode == "dark" + bg = "#1e1e1e" if is_dark else "#f5f5f5" + fg = "#ffffff" if is_dark else "#000000" + acc = "#2d2d2d" if is_dark else "#ffffff" + sel = "#555555" if is_dark else "#cce0ff" + + # Root window background (client area) + try: + self.root.configure(bg=bg) + except Exception: + pass + + style = ttk.Style(self.root) + try: + style.theme_use("clam") + except Exception: + pass + style.configure("TFrame", background=bg) + style.configure("TLabelframe", background=bg, foreground=fg) + style.configure("TLabelframe.Label", background=bg, foreground=fg) + style.configure("TLabel", background=bg, foreground=fg) + style.configure("TButton", background=acc, foreground=fg) + style.configure("TEntry", fieldbackground=acc, foreground=fg) + style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg) + style.map( + "TCombobox", + fieldbackground=[("readonly", acc)], + foreground=[("readonly", fg)], + background=[("readonly", acc)], + ) + style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0) + style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)]) + + # Menubar itself (stored as self.menubar when created) + try: + if hasattr(self, "menubar") and self.menubar is not None: + self.menubar.configure( + background=bg, + foreground=fg, + activebackground=sel, + activeforeground=fg, + borderwidth=0, + relief="flat", + ) + except Exception: + pass + + try: + self.txt_messages.configure( + bg=acc, + fg=fg, + insertbackground=fg, + selectbackground=sel, + selectforeground=fg, + ) + except Exception: + pass + try: + self.root.option_add("*Menu*background", bg) + self.root.option_add("*Menu*foreground", fg) + self.root.option_add("*Menu*activeBackground", sel) + self.root.option_add("*Menu*activeForeground", fg) + # Dark background for ttk.Combobox dropdown list + try: + self.root.option_add("*TCombobox*Listbox*background", acc) + self.root.option_add("*TCombobox*Listbox*foreground", fg) + except Exception: + pass + except Exception: + pass + + def _style_toplevel(self, win: tk.Toplevel): + is_dark = self.current_theme == "dark" + bg = "#1e1e1e" if is_dark else "#f5f5f5" + win.configure(bg=bg) + + # UTILS / CONTEXT ------------------------------------------------ + def _append(self, text: str): + self.txt_messages.insert("end", text + "\n") + self.txt_messages.see("end") + + def _get_selected_node_id(self) -> Optional[str]: + sel = self.tv_nodes.selection() + if not sel: + return None + return sel[0] + + + def _cm_traceroute(self): + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("Traceroute", "Select a node first.") + return + if not self.iface: + messagebox.showwarning("Traceroute", "Connect first.") + return + dest = self._resolve_node_dest_id(nid) + if not dest: + messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.") + return + self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})") + threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start() + + def _cm_delete_node(self): + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("Delete node", "Select a node first.") + return + if not self.iface or not getattr(self.iface, "localNode", None): + messagebox.showwarning("Delete node", "Connect first.") + return + dest = self._resolve_node_dest_id(nid) + if not dest: + messagebox.showerror("Delete node", "Cannot determine node ID.") + return + label = self._node_label(nid) + if not messagebox.askyesno( + "Delete node", + f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\n" + "The device might reboot after this." + ): + return + try: + # Use python-meshtastic Node.removeNode API to remove from NodeDB + # https://python.meshtastic.org/node.html + self.iface.localNode.removeNode(dest) # type: ignore[attr-defined] + self._append(f"[admin] Requested delete of node {label} ({dest})") + except Exception as e: + messagebox.showerror("Delete node", f"Failed to delete node: {e}") + return + + # Also remove from UI for this session + try: + self.tv_nodes.delete(nid) + self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children())) + except Exception: + pass + + + def _resolve_node_dest_id(self, nid: str) -> Optional[str]: + # `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx) + if nid.startswith("!") or nid.isdigit(): + return nid + try: + if self.iface and getattr(self.iface, "nodes", None): + node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined] + user = (node or {}).get("user") or {} + node_id = user.get("id") or "" + if node_id: + return node_id + except Exception: + pass + if nid: + return "!" + nid if not nid.startswith("!") else nid + return None + + def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0): + # Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI. + if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"): + self._do_traceroute_via_interface(dest, hop_limit, channel_index) + else: + self._do_traceroute_via_cli(dest) + + def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int): + evt = threading.Event() + result: Dict[str, Any] = {} + + def _num_to_label(num: int) -> str: + try: + nbn = getattr(self.iface, "nodesByNum", None) + if nbn and num in nbn: + n = nbn[num] + user = (n or {}).get("user") or {} + sid = user.get("id") or f"!{num:08x}" + sn = user.get("shortName") or "" + ln = user.get("longName") or "" + label = (sn or ln or sid).strip() + return f"{label} ({sid})" if sid else label + except Exception: + pass + return f"!{int(num):08x}" + + def _on_response(p: dict): + try: + rd = mesh_pb2.RouteDiscovery() + rd.ParseFromString(p["decoded"]["payload"]) + as_dict = _json_format.MessageToDict(rd) + result["packet"] = p + result["data"] = as_dict + except Exception as e: # pragma: no cover - defensive + result["error"] = str(e) + finally: + evt.set() + + try: + r = mesh_pb2.RouteDiscovery() + # Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients + self.iface.sendData( + r, + destinationId=dest, + portNum=portnums_pb2.PortNum.TRACEROUTE_APP, + wantResponse=True, + onResponse=_on_response, + channelIndex=channel_index, + hopLimit=hop_limit, + ) + except Exception as e: + self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}")) + return + + if not evt.wait(30.0): + self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported).")) + return + + if "error" in result: + self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}")) + return + + p = result.get("packet") or {} + data = result.get("data") or {} + UNK = -128 + + try: + origin_num = int(p.get("to")) + dest_num = int(p.get("from")) + except Exception: + origin_num = None + dest_num = None + + def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]: + route_nums = [] + for v in data.get(route_key, []): + try: + route_nums.append(int(v)) + except Exception: + pass + snrs = [] + for v in data.get(snr_key, []): + try: + snrs.append(int(v)) + except Exception: + pass + if not start_num or not end_num: + return None + nodes = [start_num] + route_nums + [end_num] + if len(nodes) <= 1: + return None + parts = [] + for idx, num in enumerate(nodes): + label = _num_to_label(num) + if idx == 0: + parts.append(label) + else: + snr_txt = "? dB" + if (idx - 1) < len(snrs): + v = snrs[idx - 1] + if v != UNK: + snr_txt = f"{v / 4.0:.2f} dB" + parts.append(f"{label} ({snr_txt})") + return title + "\n" + " -> ".join(parts) + + lines = [] + fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num) + if fwd: + lines.append(fwd) + back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num) + if back: + lines.append(back) + + if not lines: + self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available.")) + return + + text = "\n\n".join(lines) + self.root.after(0, lambda: self._show_traceroute_window(text)) + + def _do_traceroute_via_cli(self, dest: str): + host = (self.host_var.get() or "").strip() or HOST_DEFAULT + cmd = ["meshtastic", "--host", host, "--traceroute", dest] + try: + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40) + except Exception as e: # pragma: no cover - environment specific + self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}")) + return + out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "") + if not out.strip(): + self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute.")) + return + self.root.after(0, lambda: self._show_traceroute_window(out)) + + def _show_traceroute_window(self, text: str): + win = tk.Toplevel(self.root) + win.title("Traceroute") + self._style_toplevel(win) + frm = ttk.Frame(win, padding=8) + frm.pack(expand=True, fill="both") + txt = tk.Text(frm, wrap="word") + txt.pack(expand=True, fill="both") + is_dark = self.current_theme == "dark" + txt.configure( + bg=("#2d2d2d" if is_dark else "#ffffff"), + fg=("#ffffff" if is_dark else "#000000"), + insertbackground=("#ffffff" if is_dark else "#000000"), + ) + txt.insert("1.0", text.strip() or "No traceroute data.") + txt.configure(state="disabled") + def _toggle_send_target(self): + nid = self._get_selected_node_id() + if nid: + # Switch selector to "To selected node" when a node is double-clicked + self._set_channel_choice("To selected node") + self._append("[target] will send to %s" % self._node_label(nid)) + else: + # No selection -> fall back to public broadcast + self._set_channel_choice("Public (broadcast)") + def _popup_node_menu(self, event): + iid = self.tv_nodes.identify_row(event.y) + if iid: + self.tv_nodes.selection_set(iid) + self.node_menu.tk_popup(event.x_root, event.y_root) + self.node_menu.grab_release() + + def _cm_show_node_details(self): + self.show_raw_node(friendly=True) + + def _cm_open_map(self): + nid = self._get_selected_node_id() + if not nid or not self.iface or not getattr(self.iface, "nodes", None): + messagebox.showinfo("Map", "No node selected.") + return + node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] + lat, lon = self._extract_latlon(node) + if lat is None or lon is None: + messagebox.showinfo("Map", "Selected node has no GPS position.") + return + url = f"https://www.google.com/maps/search/?api=1&query={lat},{lon}" + self._open_browser_url(url) + + def show_raw_node(self, friendly: bool = False): + nid = self._get_selected_node_id() + if not nid or not self.iface or not getattr(self.iface, "nodes", None): + messagebox.showinfo("Node", "No node selected.") + return + node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] + win = tk.Toplevel(self.root) + win.title("Node: %s" % self._node_label(nid)) + self._style_toplevel(win) + frm = ttk.Frame(win, padding=8) + frm.pack(expand=True, fill="both") + txt = tk.Text(frm, wrap="word") + txt.pack(expand=True, fill="both") + is_dark = self.current_theme == "dark" + txt.configure( + bg=("#2d2d2d" if is_dark else "#ffffff"), + fg=("#ffffff" if is_dark else "#000000"), + insertbackground=("#ffffff" if is_dark else "#000000"), + ) + + if friendly: + def fmt_val(v, indent=0): + pad = " " * indent + if isinstance(v, dict): + lines = [] + for k, vv in v.items(): + if isinstance(vv, (dict, list)): + lines.append(f"{pad}{k}:") + lines.append(fmt_val(vv, indent + 1)) + else: + lines.append(f"{pad}{k}: {vv}") + return "\n".join(lines) + elif isinstance(v, list): + lines = [] + for i, item in enumerate(v): + if isinstance(item, (dict, list)): + lines.append(f"{pad}- [{i}]") + lines.append(fmt_val(item, indent + 1)) + else: + lines.append(f"{pad}- {item}") + return "\n".join(lines) + else: + return f"{pad}{v}" + + user = (node or {}).get("user") or {} + pos = (node or {}).get("position") or {} + caps = (node or {}).get("capabilities") or {} + config = (node or {}).get("config") or {} + + node_id = user.get("id") or node.get("id") or nid + macaddr = user.get("macaddr") or node.get("macaddr") or "" + publickey = user.get("publicKey") or node.get("publicKey") or "" + hw = user.get("hwModel", "") + + lines = [ + f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(), + f"ID: {node_id}", + f"MAC: {macaddr}", + f"HW: {hw}", + f"Public key: {publickey}", + "", + f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}", + "", + "Position:", + fmt_val(pos, 1), + ] + if caps: + lines.append("Capabilities:") + lines.append(fmt_val(caps, 1)) + if config: + lines.append("Config:") + lines.append(fmt_val(config, 1)) + lines.append("RAW fields:") + skip = {"user", "position", "capabilities", "config"} + other = {k: v for k, v in (node or {}).items() if k not in skip} + lines.append(fmt_val(other, 1)) + txt.insert("1.0", "\n".join(lines)) + else: + txt.insert("1.0", json.dumps(node, indent=2, default=str)) + txt.configure(state="disabled") + + +def main(): + app = MeshtasticGUI() + app.root.geometry("1500x820") + app.root.protocol("WM_DELETE_WINDOW", lambda: (app.disconnect(), app.root.destroy())) + app.root.mainloop() + + +if __name__ == "__main__": + main() \ No newline at end of file