diff --git a/Flatpak/meshtastic-client_linuxmint.desktop b/Flatpak/meshtastic-client_linuxmint.desktop deleted file mode 100644 index cd7ac60..0000000 --- a/Flatpak/meshtastic-client_linuxmint.desktop +++ /dev/null @@ -1,11 +0,0 @@ -[Desktop Entry] -Name=Meshtastic Client -Comment=Meshtastic Desktop Client -Exec=meshtastic-client -Terminal=false -Type=Application -Icon=meshtastic -Categories=Network;Utility; -NoDisplay=false -Name[en_US]=meshtastic-client_linuxmint.desktop - diff --git a/Flatpak/meshtastic.ico b/Flatpak/meshtastic.ico deleted file mode 100644 index e69de29..0000000 diff --git a/Flatpak/meshtastic.png b/Flatpak/meshtastic.png deleted file mode 100644 index 668347f..0000000 Binary files a/Flatpak/meshtastic.png and /dev/null differ diff --git a/Flatpak/meshtastic_client.py b/Flatpak/meshtastic_client.py deleted file mode 100644 index 4d38c72..0000000 --- a/Flatpak/meshtastic_client.py +++ /dev/null @@ -1,1336 +0,0 @@ -# -*- coding: utf-8 -*- -#!/usr/bin/env python3 -from __future__ import annotations -import json, time, datetime, threading, pathlib, tkinter as tk -from tkinter import ttk, messagebox, simpledialog -from typing import Any, Dict, Optional -import os -import subprocess -import webbrowser - -try: - from pubsub import pub -except Exception: - pub = None - -try: - from meshtastic.tcp_interface import TCPInterface -except Exception: - TCPInterface = None -try: - from meshtastic.serial_interface import SerialInterface -except Exception: - SerialInterface = None -try: - from meshtastic.ble_interface import BLEInterface -except Exception: - BLEInterface = None - -try: - from meshtastic.protobuf import mesh_pb2, portnums_pb2 - import google.protobuf.json_format as _json_format -except Exception: - mesh_pb2 = None - portnums_pb2 = None - _json_format = None - -try: - from serial.tools import list_ports -except Exception: - list_ports = None - -HOST_DEFAULT = "192.168.0.156" -PORT_DEFAULT = 4403 -PROJECT_PATH = pathlib.Path(__file__).parent -ICON_PATH = PROJECT_PATH / "meshtastic.ico" - - -def _prefer_chrome(url: str): - """Open URL in a sensible browser on any OS. - On Flatpak/Linux we just use the default handler. - """ - # Try system default browser first (works well in Flatpak) - try: - webbrowser.open(url) - return - except Exception: - pass - - # Fallbacks for some common desktop setups - candidate_browsers = [ - # Linux / BSD - "xdg-open", - "gio open", - "sensible-browser", - "google-chrome-stable", - "chromium", - # Windows (if someone still runs it there) - r"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe", - r"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe", - ] - - for cmd in candidate_browsers: - try: - if os.path.isabs(cmd) and os.path.exists(cmd): - subprocess.Popen([cmd, url]) - return - else: - # Let the shell resolve it from PATH - subprocess.Popen(cmd.split() + [url]) - return - except Exception: - continue - - -def _fmt_ago(epoch_seconds: Optional[float]) -> str: - if not epoch_seconds: - return "N/A" - try: - delta = time.time() - float(epoch_seconds) - except Exception: - return "N/A" - if delta < 0: - delta = 0 - mins = int(delta // 60) - hours = int(delta // 3600) - days = int(delta // 86400) - if delta < 60: - return "%ds" % int(delta) - if mins < 60: - return "%dm" % mins - if hours < 24: - return "%dh" % hours - if days < 7: - return "%dd" % days - dt = datetime.datetime.fromtimestamp(epoch_seconds) - return dt.strftime("%Y-%m-%d %H:%M") - - -class MeshtasticGUI: - def __init__(self, master: Optional[tk.Tk] = None): - self.root = master or tk.Tk() - self.root.title("Meshtastic Client") - - try: - if ICON_PATH.exists(): - self.root.iconbitmap(default=str(ICON_PATH)) - except Exception: - pass - - self.current_theme = "dark" - - self.root.rowconfigure(0, weight=1) - self.root.columnconfigure(0, weight=1) - - self.host_var = tk.StringVar(value=HOST_DEFAULT) - self.port_var = tk.IntVar(value=PORT_DEFAULT) - - self.menubar = tk.Menu(self.root) - m_conn = tk.Menu(self.menubar, tearoff=False) - m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp) - m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog) - m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog) - m_conn.add_command(label="Disconnect", command=self.disconnect) - m_conn.add_separator() - m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port) - self.menubar.add_cascade(label="Connection", menu=m_conn) - - m_tools = tk.Menu(self.menubar, tearoff=False) - m_tools.add_command(label="Clear messages", command=lambda: self.txt_messages.delete("1.0", "end")) - self.menubar.add_cascade(label="Tools", menu=m_tools) - - m_view = tk.Menu(self.menubar, tearoff=False) - m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light")) - m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark")) - self.menubar.add_cascade(label="View", menu=m_view) - - m_links = tk.Menu(self.menubar, tearoff=False) - m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client")) - m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/")) - m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/")) - m_links.add_command(label="Meshtastic Web Client", command=lambda: self._open_browser_url("https://client.meshtastic.org")) - m_links.add_command(label="Meshtastic docker client", command=lambda: self._open_browser_url("https://meshtastic.org/docs/software/linux/usage/#usage-with-docker")) - m_links.add_separator() - m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/")) - m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/")) - self.menubar.add_cascade(label="Links", menu=m_links) - - self.root.config(menu=self.menubar) - - self.rootframe = ttk.Frame(self.root) - self.rootframe.grid(row=0, column=0, sticky="nsew") - self.rootframe.rowconfigure(0, weight=1) - self.rootframe.columnconfigure(0, weight=1) - - self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal") - self.paned.grid(row=0, column=0, sticky="nsew") - - # messages - self.msg_frame = ttk.Frame(self.paned) - self.msg_frame.rowconfigure(1, weight=1) - self.msg_frame.columnconfigure(0, weight=1) - - ttk.Label(self.msg_frame, text="Messages").grid(row=0, column=0, sticky="w", pady=(2, 0)) - - self.txt_messages = tk.Text(self.msg_frame, wrap="word") - self.txt_messages.grid(row=1, column=0, sticky="nsew", padx=(0, 4), pady=(2, 2)) - yscroll_left = ttk.Scrollbar(self.msg_frame, orient="vertical", command=self.txt_messages.yview) - self.txt_messages.configure(yscrollcommand=yscroll_left.set) - yscroll_left.grid(row=1, column=1, sticky="ns") - - self.send_frame = ttk.Frame(self.msg_frame) - self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4)) - self.send_frame.columnconfigure(0, weight=1) - self.ent_message = ttk.Entry(self.send_frame) - self.ent_message.grid(row=0, column=0, sticky="nsew") - self.ent_message.bind("", lambda e: self.send_message()) - self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message) - self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew") - - # channel selector (public / selected / private channels) - self.channel_var = tk.StringVar() - self._channel_map = {} - self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=22) - self._reset_channel_choices() - self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w") - - # nodes - self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)") - self.nodes_frame.rowconfigure(1, weight=1) - self.nodes_frame.columnconfigure(0, weight=1) - - self.ent_search = ttk.Entry(self.nodes_frame) - self.ent_search.grid(row=0, column=0, sticky="nsew", padx=2, pady=2) - self.ent_search.bind("", lambda e: self.refresh_nodes()) - - self.cols_all = ( - "shortname", "longname", "since", "hops", - "distkm", "speed", "alt", - "lastheard", "hwmodel", "role", - "macaddr", "publickey", "isunmessagable", "id" - ) - self.cols_visible = ( - "shortname", "longname", "since", "hops", - "distkm", "speed", "alt", "hwmodel", "role" - ) - self.tv_nodes = ttk.Treeview( - self.nodes_frame, - columns=self.cols_all, - show="headings", - displaycolumns=self.cols_visible - ) - self.tv_nodes.grid(row=1, column=0, sticky="nsew", padx=(2, 0), pady=(0, 2)) - - headings = { - "shortname": "Short", - "longname": "Long", - "since": "Since", - "hops": "Hops", - "distkm": "Dist (km)", - "speed": "Speed", - "alt": "Alt (m)", - "hwmodel": "HW", - "role": "Role", - "lastheard": "", - "macaddr": "MAC", - "publickey": "Public key", - "isunmessagable": "Unmsg?", - "id": "ID", - } - for key, text in headings.items(): - self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False)) - - widths = { - "shortname": 90, - "longname": 200, - "since": 90, - "hops": 50, - "distkm": 70, - "speed": 70, - "alt": 70, - "hwmodel": 90, - "role": 110, - } - for key, w in widths.items(): - try: - self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt"))) - except Exception: - pass - - # hide technical columns - for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"): - try: - self.tv_nodes.column(col, width=0, minwidth=0, stretch=False) - except Exception: - pass - - yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview) - self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set) - yscroll_nodes.grid(row=1, column=1, sticky="ns") - - # right-click: - self.node_menu = tk.Menu(self.nodes_frame, tearoff=False) - self.node_menu.add_command(label="Node info", command=self._cm_show_node_details) - self.node_menu.add_command(label="Map", command=self._cm_open_map) - self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute) - self.node_menu.add_separator() - self.node_menu.add_command(label="Delete node", command=self._cm_delete_node) - - self.tv_nodes.bind("", self._popup_node_menu) - self.tv_nodes.bind("", lambda e: self._toggle_send_target()) - - self.paned.add(self.msg_frame, weight=3) - self.paned.add(self.nodes_frame, weight=4) - self.root.after(100, lambda: self._safe_set_sash(0.40)) - - self.iface: Optional[object] = None - self.connected_evt = threading.Event() - self._last_seen_overrides: Dict[str, float] = {} - self._last_sort_col: Optional[str] = "since" - self._last_sort_reverse: bool = True - - if pub is not None: - try: - pub.subscribe(self.on_connection_established, "meshtastic.connection.established") - pub.subscribe(self.on_connection_lost, "meshtastic.connection.lost") - pub.subscribe(self.on_receive, "meshtastic.receive") - pub.subscribe(self.on_node_updated, "meshtastic.node.updated") - except Exception as e: - print("pubsub subscribe failed:", e) - - self.apply_theme("light") - self._append("Ready. Connection -> Connect (TCP/USB/BLE)") - self._update_title_with_host() - - # helpers --------------------------------------------------------- - def _open_browser_url(self, url: str): - _prefer_chrome(url) - - def _update_title_with_host(self): - self.root.title("Meshtastic Client - %s:%s" % (self.host_var.get(), self.port_var.get())) - - def _safe_set_sash(self, fraction: float = 0.5): - try: - w = self.paned.winfo_width() or self.paned.winfo_reqwidth() - self.paned.sashpos(0, int(w * fraction)) - except Exception: - pass - - def _node_label(self, node_id: str) -> str: - if not self.iface or not getattr(self.iface, "nodes", None): - return node_id - node = self.iface.nodes.get(node_id, {}) # type: ignore[attr-defined] - user = (node or {}).get("user") or {} - shortname = user.get("shortName") or "" - longname = user.get("longName") or "" - label = ("%s %s" % (shortname, longname)).strip() - if label: - return label - return node_id - - # pubsub callbacks ------------------------------------------------ - def on_connection_established(self, interface=None, **kwargs): - self.connected_evt.set() - self._append("[+] Connected") - # refresh nodes and channel list when we connect - self.refresh_nodes() - try: - self._update_channels_from_iface() - except Exception: - pass - def on_connection_lost(self, interface=None, **kwargs): - self.connected_evt.clear() - self._append("[-] Connection lost") - - def on_node_updated(self, node=None, interface=None, **kwargs): - self.root.after(0, self.refresh_nodes) - - def on_receive(self, packet=None, interface=None, **kwargs): - self.root.after(0, lambda: self._handle_receive(packet or {})) - - # receive --------------------------------------------------------- - def _handle_receive(self, packet: dict): - decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} - app_name = decoded.get("app", "") - portnum = decoded.get("portnum") or app_name - from_id = packet.get("fromId") or packet.get("from") or "UNKNOWN" - label = self._node_label(from_id) if hasattr(self, "_node_label") else from_id - - tag_map = { - "TEXT_MESSAGE_APP": "MSG", - "TEXT_MESSAGE_COMPRESSED_APP": "MSG", - "POSITION_APP": "POS", - "TELEMETRY_APP": "TEL", - "NODEINFO_APP": "INFO", - "ROUTING_APP": "ROUT", - "MAP_REPORT_APP": "MAP", - "ADMIN_APP": "ADM", - "NEIGHBORINFO_APP": "NEI", - "STORE_FORWARD_APP": "SFWD", - "REMOTE_HARDWARE_APP": "RHW", - "PRIVATE_APP": "PRIV", - } - tag = tag_map.get(app_name or portnum, "INFO") - - if app_name in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"): - text = decoded.get("text") or decoded.get("payload", {}).get("text", "") - if text: - self._append(f"[MSG] {label}: {text}") - else: - self._append(f"[MSG] {label}") - - if hasattr(self, "refresh_nodes"): - self.refresh_nodes() - - decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} - portnum = decoded.get("portnum") - sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort") - if sender: - self._last_seen_overrides[str(sender)] = time.time() - - user = {} - if self.iface and getattr(self.iface, "nodes", None) and sender: - user = (self.iface.nodes.get(sender) or {}).get("user", {}) # type: ignore[attr-defined] - shortname = user.get("shortName") or "" - longname = user.get("longName") or "" - label = str(shortname or longname or sender or "Unknown").strip() - - text = "" - p = decoded.get("payload", "") - if isinstance(p, (bytes, bytearray)): - try: - text = p.decode("utf-8", errors="ignore") - except Exception: - text = repr(p) - elif isinstance(p, str): - text = p - else: - t = decoded.get("text") - if isinstance(t, bytes): - text = t.decode("utf-8", errors="ignore") - elif isinstance(t, str): - text = t - - rssi = packet.get("rxRssi") - - if portnum == "TEXT_MESSAGE_APP": - self._append("[MSG] %s: %s (RSSI=%s)" % (label, text, rssi)) - if isinstance(text, str) and text.strip().lower() == "ping": - self._send_pong(sender, label) - - self.refresh_nodes() - - def _send_pong(self, dest_id: Optional[str], label: str): - if not self.iface or not dest_id: - return - try: - self.iface.sendText("pong", destinationId=dest_id, wantAck=False) - self._append("[auto] pong -> %s" % label) - except Exception as e: - self._append("[auto] pong failed: %s" % e) - - # connection actions ---------------------------------------------- - def set_ip_port(self): - win = tk.Toplevel(self.root) - win.title("Set IP/Port") - self._style_toplevel(win) - frm = ttk.Frame(win, padding=8) - frm.grid(row=0, column=0, sticky="nsew") - win.columnconfigure(0, weight=1) - win.rowconfigure(0, weight=1) - - ttk.Label(frm, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4) - ent_host = ttk.Entry(frm, textvariable=self.host_var, width=28) - ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=4) - - ttk.Label(frm, text="Port:").grid(row=1, column=0, sticky="w", pady=4) - ent_port = ttk.Entry(frm, textvariable=self.port_var, width=10) - ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=4) - - frm.columnconfigure(1, weight=1) - - def save(): - h = self.host_var.get().strip() - try: - p = int(self.port_var.get()) - except Exception: - messagebox.showerror("Port", "Port must be 1-65535") - return - if not h: - messagebox.showerror("Host", "Host cannot be empty") - return - if not (1 <= p <= 65535): - messagebox.showerror("Port", "Port must be 1-65535") - return - self.host_var.set(h) - self.port_var.set(p) - self._update_title_with_host() - win.destroy() - - btnbar = ttk.Frame(frm) - btnbar.grid(row=2, column=0, columnspan=2, sticky="e") - ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4) - ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4) - - def connect_tcp(self): - if self.iface: - return - host = self.host_var.get().strip() - try: - port = int(self.port_var.get()) - except Exception: - messagebox.showerror("Port", "Invalid port") - return - self._append("Connecting TCP %s:%s ..." % (host, port)) - - def run(): - try: - if TCPInterface is None: - raise RuntimeError("meshtastic.tcp_interface not installed") - self.iface = TCPInterface(hostname=host, portNumber=port) - self.connected_evt.wait(timeout=5) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("TCP connect failed", str(e))) - threading.Thread(target=run, daemon=True).start() - - def connect_serial_dialog(self): - if SerialInterface is None: - messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.") - return - - ports = [] - if list_ports: - try: - ports = [p.device for p in list_ports.comports()] - except Exception: - ports = [] - presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"] - port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=presets[0]) - if port is None: - return - port = port.strip() - if port.lower() == "(auto)" or port == "": - port = None - self._append("Connecting Serial %s ..." % (port or "(auto)")) - - def run(): - try: - self.iface = SerialInterface(devPath=port) if port else SerialInterface() - self.connected_evt.wait(timeout=5) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("Serial connect failed", str(e))) - threading.Thread(target=run, daemon=True).start() - - def connect_ble_dialog(self): - if BLEInterface is None: - messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).") - return - self._append("Scanning BLE for Meshtastic devices ...") - try: - devices = BLEInterface.scan() - except Exception as e: - messagebox.showerror("BLE scan failed", str(e)) - return - if not devices: - messagebox.showinfo("BLE", "No devices found.") - return - options = ["%d. %s [%s]" % (i + 1, getattr(d, "name", "") or "(unnamed)", getattr(d, "address", "?")) for i, d in enumerate(devices)] - choice = simpledialog.askinteger( - "Select BLE device", - "Enter number:\n" + "\n".join(options), - minvalue=1, - maxvalue=len(devices), - ) - if not choice: - return - addr = getattr(devices[choice - 1], "address", None) - if not addr: - messagebox.showerror("BLE", "Selected device has no address.") - return - self._append("Connecting BLE %s ..." % addr) - - def run(): - try: - self.iface = BLEInterface(address=addr) - self.connected_evt.wait(timeout=8) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("BLE connect failed", str(e))) - threading.Thread(target=run, daemon=True).start() - - def disconnect(self): - try: - if self.iface: - self.iface.close() - except Exception: - pass - self.iface = None - self.connected_evt.clear() - self._append("[*] Disconnected") - - # send ------------------------------------------------------------ - - # channel selector helpers ---------------------------------------- - def _reset_channel_choices(self): - """Initialize channel selector with Public + To selected.""" - self._channel_map = {} - options = [] - - label_pub = "Public (broadcast)" - self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0} - options.append(label_pub) - - label_sel = "To selected node" - self._channel_map[label_sel] = {"mode": "selected", "channelIndex": 0} - options.append(label_sel) - - if hasattr(self, "cbo_channel"): - self.cbo_channel["values"] = options - if hasattr(self, "channel_var"): - self.channel_var.set(label_pub) - - def _set_channel_choice(self, label: str): - """Safely set the current channel choice, if it exists.""" - try: - values = list(self.cbo_channel["values"]) - except Exception: - return - if label not in values: - return - self.channel_var.set(label) - - - def _update_channels_from_iface(self): - """ - Populate channel selector with channels from the connected device. - - We keep: - * "Public (broadcast)" -> broadcast on channel 0 - * "To selected node" -> direct message to selected node on channel 0 - And then append additional channels (1..N) from the radio as: - * "Ch : " -> broadcast on that channel. - """ - iface = getattr(self, "iface", None) - if not iface: - return - local_node = getattr(iface, "localNode", None) - if not local_node: - return - - # Try to request channels if we don't have them yet. - chans = getattr(local_node, "channels", None) - try: - if (not chans) and hasattr(local_node, "requestChannels"): - local_node.requestChannels() - time.sleep(1.5) - chans = getattr(local_node, "channels", None) - except Exception: - chans = getattr(local_node, "channels", None) - - try: - options = list(self.cbo_channel["values"]) - except Exception: - return - - # If channel 0 has a name, update the "Public" label to show it. - try: - if chans and len(chans) > 0: - ch0 = chans[0] - try: - ch0_name = (getattr(ch0, "settings", None).name or "").strip() - except Exception: - try: - ch0_name = (ch0.settings.name or "").strip() - except Exception: - ch0_name = "" - if ch0_name: - # Find the existing public entry (mode=broadcast, channelIndex=0) - old_label = None - for lbl, meta in list(self._channel_map.items()): - if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0: - old_label = lbl - break - if old_label: - new_label = f"Public (ch0: {ch0_name})" - if new_label != old_label: - # Update mapping - self._channel_map[new_label] = self._channel_map.pop(old_label) - # Update combobox options - options = [new_label if v == old_label else v for v in options] - # Keep current selection if it was pointing to the old label - if self.channel_var.get() == old_label: - self.channel_var.set(new_label) - except Exception: - # Failing to pretty-print channel 0 is not fatal; just continue. - pass - - # Add remaining channels (1..N) as broadcast options. - for idx, ch in enumerate(chans or []): - if idx == 0: - # Skip channel 0 here – it's already represented by "Public". - continue - try: - name = (getattr(ch, "settings", None).name or "").strip() - except Exception: - # older protobufs might expose fields differently - try: - name = (ch.settings.name or "").strip() - except Exception: - name = "" - if not name: - label = f"Ch {idx}" - else: - label = f"Ch {idx}: {name}" - if label in self._channel_map: - continue - self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx} - options.append(label) - - try: - self.cbo_channel["values"] = options - except Exception: - pass - - - def send_message(self): - msg = self.ent_message.get().strip() - if not msg: - return - if not self.iface: - messagebox.showwarning("Not connected", "Connect first.") - return - - try: - choice = self.channel_var.get() if hasattr(self, "channel_var") else "" - info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or { - "mode": "broadcast", - "channelIndex": 0, - } - mode = info.get("mode", "broadcast") - ch_index = int(info.get("channelIndex", 0) or 0) - - if mode == "selected": - # Direct message to the currently selected node - nid = self._get_selected_node_id() - if not nid: - messagebox.showinfo("No selection", "Select a node first.") - return - dest = self._resolve_node_dest_id(nid) - if not dest: - messagebox.showerror("Send failed", "Cannot resolve destination for selected node.") - return - self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index) - self._append("[ME -> %s] %s" % (self._node_label(nid), msg)) - else: - # Broadcast on chosen channel (public or private) - self.iface.sendText(msg, wantAck=False, channelIndex=ch_index) - label = self.channel_var.get() if hasattr(self, "channel_var") else "" - if mode == "broadcast_channel" and ch_index: - self._append("[ME ch%d] %s" % (ch_index, msg)) - else: - self._append("[ME] %s" % msg) - - self.ent_message.delete(0, "end") - except Exception as e: - messagebox.showerror("Send failed", str(e)) - - # nodes ----------------------------------------------------------- - def _get_lastheard_epoch(self, node_id: str, node: Dict[str, Any]) -> Optional[float]: - raw = (node or {}).get("lastHeard") - ts_iface = None - if raw is not None: - try: - val = float(raw) - ts_iface = val / 1000.0 if val > 10000000000 else val - except Exception: - ts_iface = None - ts_local = self._last_seen_overrides.get(str(node_id)) - if ts_iface and ts_local: - return max(ts_iface, ts_local) - return ts_iface or ts_local - - def _extract_latlon(self, node: dict) -> tuple[float | None, float | None]: - pos = (node or {}).get("position") or {} - lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i") - lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i") - try: - if lat is not None: - lat = float(lat) * (1e-7 if abs(float(lat)) > 90 else 1.0) - if lon is not None: - lon = float(lon) * (1e-7 if abs(float(lon)) > 180 else 1.0) - except Exception: - lat = lon = None - return lat, lon - - def _extract_speed_alt(self, node: dict) -> tuple[float | None, float | None]: - """Return (speed_kmh, alt_m) if present in node.position, else (None, None).""" - pos = (node or {}).get("position") or {} - speed = ( - pos.get("groundSpeedKmh") - or pos.get("groundSpeedKmhI") - or pos.get("groundSpeed") - or pos.get("ground_speed") - ) - alt = ( - pos.get("altitude") - or pos.get("altitudeM") - or pos.get("altitude_i") - or pos.get("altitudeI") - ) - try: - if speed is not None: - speed = float(speed) - if alt is not None: - alt = float(alt) - except Exception: - speed, alt = None, None - return speed, alt - - def _get_local_latlon(self) -> tuple[float | None, float | None]: - if not self.iface: - return (None, None) - try: - mi = getattr(self.iface, "myInfo", None) - nbn = getattr(self.iface, "nodesByNum", None) - if mi is not None and hasattr(mi, "my_node_num") and nbn: - n = nbn.get(mi.my_node_num) or {} - lat, lon = self._extract_latlon(n) - if lat is not None and lon is not None: - return lat, lon - except Exception: - pass - return (None, None) - - def _haversine_km(self, lat1, lon1, lat2, lon2) -> float: - import math - R = 6371.0088 - phi1 = math.radians(lat1) - phi2 = math.radians(lat2) - dphi = math.radians(lat2 - lat1) - dlmb = math.radians(lon2 - lon1) - a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlmb / 2) ** 2 - c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) - return R * c - - def refresh_nodes(self): - if not self.iface or not getattr(self.iface, "nodes", None): - return - q = self.ent_search.get().strip().lower() - for iid in self.tv_nodes.get_children(""): - self.tv_nodes.delete(iid) - - try: - nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined] - except Exception: - nodes_snapshot = {} - - base_lat, base_lon = self._get_local_latlon() - - for node_id, node in nodes_snapshot.items(): - user = (node or {}).get("user") or {} - shortname = user.get("shortName") or "" - longname = user.get("longName") or "" - hwmodel = user.get("hwModel") or "" - role = user.get("role") or "" - macaddr = user.get("macaddr") or "" - publickey = user.get("publicKey") or "" - unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False - - lastheard_epoch = self._get_lastheard_epoch(node_id, node) - since_str = _fmt_ago(lastheard_epoch) - hops = node.get("hopsAway") - - lat, lon = self._extract_latlon(node) - if base_lat is not None and base_lon is not None and lat is not None and lon is not None: - try: - dist = self._haversine_km(base_lat, base_lon, lat, lon) - except Exception: - dist = None - else: - dist = None - dist_str = "%.1f" % dist if isinstance(dist, (int, float)) else "-" - - speed, alt = self._extract_speed_alt(node) - speed_str = "%.1f" % speed if isinstance(speed, (int, float)) else "-" - alt_str = "%.0f" % alt if isinstance(alt, (int, float)) else "-" - - values = ( - shortname, - longname, - since_str, - str(hops) if hops is not None else "-", - dist_str, - speed_str, - alt_str, - "%.0f" % (lastheard_epoch or 0), - hwmodel, - role, - macaddr, - publickey, - str(bool(unmsg)), - node_id, - ) - - if not q or any(q in str(v).lower() for v in values): - try: - self.tv_nodes.insert("", "end", iid=node_id, values=values) - except Exception: - self.tv_nodes.insert("", "end", values=values) - - if self._last_sort_col: - self.sort_by_column(self._last_sort_col, self._last_sort_reverse) - - self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children())) - - def sort_by_column(self, col: str, reverse: bool = False): - self._last_sort_col = col - self._last_sort_reverse = reverse - col_to_sort = "lastheard" if col == "since" else col - numeric = {"lastheard", "distkm", "hops", "speed", "alt"} - rows = [] - for iid in self.tv_nodes.get_children(""): - val = self.tv_nodes.set(iid, col_to_sort) - if col_to_sort in numeric: - try: - val = float(val if val != "-" else 0.0) - except Exception: - val = 0.0 - else: - val = val.casefold() - rows.append((val, iid)) - rows.sort(key=lambda t: t[0], reverse=reverse) - for index, (_, iid) in enumerate(rows): - self.tv_nodes.move(iid, "", index) - self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse)) - - # THEME ----------------------------------------------------------- - def apply_theme(self, mode: str = "light"): - self.current_theme = mode - is_dark = mode == "dark" - bg = "#1e1e1e" if is_dark else "#f5f5f5" - fg = "#ffffff" if is_dark else "#000000" - acc = "#2d2d2d" if is_dark else "#ffffff" - sel = "#555555" if is_dark else "#cce0ff" - - # Root window background (client area) - try: - self.root.configure(bg=bg) - except Exception: - pass - - style = ttk.Style(self.root) - try: - style.theme_use("clam") - except Exception: - pass - style.configure("TFrame", background=bg) - style.configure("TLabelframe", background=bg, foreground=fg) - style.configure("TLabelframe.Label", background=bg, foreground=fg) - style.configure("TLabel", background=bg, foreground=fg) - style.configure("TButton", background=acc, foreground=fg) - style.configure("TEntry", fieldbackground=acc, foreground=fg) - style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg) - style.map( - "TCombobox", - fieldbackground=[("readonly", acc)], - foreground=[("readonly", fg)], - background=[("readonly", acc)], - ) - style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0) - style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)]) - - # Menubar itself (stored as self.menubar when created) - try: - if hasattr(self, "menubar") and self.menubar is not None: - self.menubar.configure( - background=bg, - foreground=fg, - activebackground=sel, - activeforeground=fg, - borderwidth=0, - relief="flat", - ) - except Exception: - pass - - try: - self.txt_messages.configure( - bg=acc, - fg=fg, - insertbackground=fg, - selectbackground=sel, - selectforeground=fg, - ) - except Exception: - pass - try: - self.root.option_add("*Menu*background", bg) - self.root.option_add("*Menu*foreground", fg) - self.root.option_add("*Menu*activeBackground", sel) - self.root.option_add("*Menu*activeForeground", fg) - # Dark background for ttk.Combobox dropdown list - try: - self.root.option_add("*TCombobox*Listbox*background", acc) - self.root.option_add("*TCombobox*Listbox*foreground", fg) - except Exception: - pass - except Exception: - pass - - def _style_toplevel(self, win: tk.Toplevel): - is_dark = self.current_theme == "dark" - bg = "#1e1e1e" if is_dark else "#f5f5f5" - win.configure(bg=bg) - - # UTILS / CONTEXT ------------------------------------------------ - def _append(self, text: str): - self.txt_messages.insert("end", text + "\n") - self.txt_messages.see("end") - - def _get_selected_node_id(self) -> Optional[str]: - sel = self.tv_nodes.selection() - if not sel: - return None - return sel[0] - - - def _cm_traceroute(self): - nid = self._get_selected_node_id() - if not nid: - messagebox.showinfo("Traceroute", "Select a node first.") - return - if not self.iface: - messagebox.showwarning("Traceroute", "Connect first.") - return - dest = self._resolve_node_dest_id(nid) - if not dest: - messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.") - return - self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})") - threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start() - - def _cm_delete_node(self): - nid = self._get_selected_node_id() - if not nid: - messagebox.showinfo("Delete node", "Select a node first.") - return - if not self.iface or not getattr(self.iface, "localNode", None): - messagebox.showwarning("Delete node", "Connect first.") - return - dest = self._resolve_node_dest_id(nid) - if not dest: - messagebox.showerror("Delete node", "Cannot determine node ID.") - return - label = self._node_label(nid) - if not messagebox.askyesno( - "Delete node", - f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\n" - "The device might reboot after this." - ): - return - try: - # Use python-meshtastic Node.removeNode API to remove from NodeDB - # https://python.meshtastic.org/node.html - self.iface.localNode.removeNode(dest) # type: ignore[attr-defined] - self._append(f"[admin] Requested delete of node {label} ({dest})") - except Exception as e: - messagebox.showerror("Delete node", f"Failed to delete node: {e}") - return - - # Also remove from UI for this session - try: - self.tv_nodes.delete(nid) - self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children())) - except Exception: - pass - - - def _resolve_node_dest_id(self, nid: str) -> Optional[str]: - # `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx) - if nid.startswith("!") or nid.isdigit(): - return nid - try: - if self.iface and getattr(self.iface, "nodes", None): - node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined] - user = (node or {}).get("user") or {} - node_id = user.get("id") or "" - if node_id: - return node_id - except Exception: - pass - if nid: - return "!" + nid if not nid.startswith("!") else nid - return None - - def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0): - # Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI. - if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"): - self._do_traceroute_via_interface(dest, hop_limit, channel_index) - else: - self._do_traceroute_via_cli(dest) - - def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int): - evt = threading.Event() - result: Dict[str, Any] = {} - - def _num_to_label(num: int) -> str: - try: - nbn = getattr(self.iface, "nodesByNum", None) - if nbn and num in nbn: - n = nbn[num] - user = (n or {}).get("user") or {} - sid = user.get("id") or f"!{num:08x}" - sn = user.get("shortName") or "" - ln = user.get("longName") or "" - label = (sn or ln or sid).strip() - return f"{label} ({sid})" if sid else label - except Exception: - pass - return f"!{int(num):08x}" - - def _on_response(p: dict): - try: - rd = mesh_pb2.RouteDiscovery() - rd.ParseFromString(p["decoded"]["payload"]) - as_dict = _json_format.MessageToDict(rd) - result["packet"] = p - result["data"] = as_dict - except Exception as e: # pragma: no cover - defensive - result["error"] = str(e) - finally: - evt.set() - - try: - r = mesh_pb2.RouteDiscovery() - # Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients - self.iface.sendData( - r, - destinationId=dest, - portNum=portnums_pb2.PortNum.TRACEROUTE_APP, - wantResponse=True, - onResponse=_on_response, - channelIndex=channel_index, - hopLimit=hop_limit, - ) - except Exception as e: - self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}")) - return - - if not evt.wait(30.0): - self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported).")) - return - - if "error" in result: - self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}")) - return - - p = result.get("packet") or {} - data = result.get("data") or {} - UNK = -128 - - try: - origin_num = int(p.get("to")) - dest_num = int(p.get("from")) - except Exception: - origin_num = None - dest_num = None - - def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]: - route_nums = [] - for v in data.get(route_key, []): - try: - route_nums.append(int(v)) - except Exception: - pass - snrs = [] - for v in data.get(snr_key, []): - try: - snrs.append(int(v)) - except Exception: - pass - if not start_num or not end_num: - return None - nodes = [start_num] + route_nums + [end_num] - if len(nodes) <= 1: - return None - parts = [] - for idx, num in enumerate(nodes): - label = _num_to_label(num) - if idx == 0: - parts.append(label) - else: - snr_txt = "? dB" - if (idx - 1) < len(snrs): - v = snrs[idx - 1] - if v != UNK: - snr_txt = f"{v / 4.0:.2f} dB" - parts.append(f"{label} ({snr_txt})") - return title + "\n" + " -> ".join(parts) - - lines = [] - fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num) - if fwd: - lines.append(fwd) - back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num) - if back: - lines.append(back) - - if not lines: - self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available.")) - return - - text = "\n\n".join(lines) - self.root.after(0, lambda: self._show_traceroute_window(text)) - - def _do_traceroute_via_cli(self, dest: str): - host = (self.host_var.get() or "").strip() or HOST_DEFAULT - cmd = ["meshtastic", "--host", host, "--traceroute", dest] - try: - proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40) - except Exception as e: # pragma: no cover - environment specific - self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}")) - return - out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "") - if not out.strip(): - self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute.")) - return - self.root.after(0, lambda: self._show_traceroute_window(out)) - - def _show_traceroute_window(self, text: str): - win = tk.Toplevel(self.root) - win.title("Traceroute") - self._style_toplevel(win) - frm = ttk.Frame(win, padding=8) - frm.pack(expand=True, fill="both") - txt = tk.Text(frm, wrap="word") - txt.pack(expand=True, fill="both") - is_dark = self.current_theme == "dark" - txt.configure( - bg=("#2d2d2d" if is_dark else "#ffffff"), - fg=("#ffffff" if is_dark else "#000000"), - insertbackground=("#ffffff" if is_dark else "#000000"), - ) - txt.insert("1.0", text.strip() or "No traceroute data.") - txt.configure(state="disabled") - def _toggle_send_target(self): - nid = self._get_selected_node_id() - if nid: - # Switch selector to "To selected node" when a node is double-clicked - self._set_channel_choice("To selected node") - self._append("[target] will send to %s" % self._node_label(nid)) - else: - # No selection -> fall back to public broadcast - self._set_channel_choice("Public (broadcast)") - def _popup_node_menu(self, event): - iid = self.tv_nodes.identify_row(event.y) - if iid: - self.tv_nodes.selection_set(iid) - self.node_menu.tk_popup(event.x_root, event.y_root) - self.node_menu.grab_release() - - def _cm_show_node_details(self): - self.show_raw_node(friendly=True) - - def _cm_open_map(self): - nid = self._get_selected_node_id() - if not nid or not self.iface or not getattr(self.iface, "nodes", None): - messagebox.showinfo("Map", "No node selected.") - return - node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] - lat, lon = self._extract_latlon(node) - if lat is None or lon is None: - messagebox.showinfo("Map", "Selected node has no GPS position.") - return - url = f"https://www.google.com/maps/search/?api=1&query={lat},{lon}" - self._open_browser_url(url) - - def show_raw_node(self, friendly: bool = False): - nid = self._get_selected_node_id() - if not nid or not self.iface or not getattr(self.iface, "nodes", None): - messagebox.showinfo("Node", "No node selected.") - return - node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined] - win = tk.Toplevel(self.root) - win.title("Node: %s" % self._node_label(nid)) - self._style_toplevel(win) - frm = ttk.Frame(win, padding=8) - frm.pack(expand=True, fill="both") - txt = tk.Text(frm, wrap="word") - txt.pack(expand=True, fill="both") - is_dark = self.current_theme == "dark" - txt.configure( - bg=("#2d2d2d" if is_dark else "#ffffff"), - fg=("#ffffff" if is_dark else "#000000"), - insertbackground=("#ffffff" if is_dark else "#000000"), - ) - - if friendly: - def fmt_val(v, indent=0): - pad = " " * indent - if isinstance(v, dict): - lines = [] - for k, vv in v.items(): - if isinstance(vv, (dict, list)): - lines.append(f"{pad}{k}:") - lines.append(fmt_val(vv, indent + 1)) - else: - lines.append(f"{pad}{k}: {vv}") - return "\n".join(lines) - elif isinstance(v, list): - lines = [] - for i, item in enumerate(v): - if isinstance(item, (dict, list)): - lines.append(f"{pad}- [{i}]") - lines.append(fmt_val(item, indent + 1)) - else: - lines.append(f"{pad}- {item}") - return "\n".join(lines) - else: - return f"{pad}{v}" - - user = (node or {}).get("user") or {} - pos = (node or {}).get("position") or {} - caps = (node or {}).get("capabilities") or {} - config = (node or {}).get("config") or {} - - node_id = user.get("id") or node.get("id") or nid - macaddr = user.get("macaddr") or node.get("macaddr") or "" - publickey = user.get("publicKey") or node.get("publicKey") or "" - hw = user.get("hwModel", "") - - lines = [ - f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(), - f"ID: {node_id}", - f"MAC: {macaddr}", - f"HW: {hw}", - f"Public key: {publickey}", - "", - f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}", - "", - "Position:", - fmt_val(pos, 1), - ] - if caps: - lines.append("Capabilities:") - lines.append(fmt_val(caps, 1)) - if config: - lines.append("Config:") - lines.append(fmt_val(config, 1)) - lines.append("RAW fields:") - skip = {"user", "position", "capabilities", "config"} - other = {k: v for k, v in (node or {}).items() if k not in skip} - lines.append(fmt_val(other, 1)) - txt.insert("1.0", "\n".join(lines)) - else: - txt.insert("1.0", json.dumps(node, indent=2, default=str)) - txt.configure(state="disabled") - - -def main(): - app = MeshtasticGUI() - app.root.geometry("1500x820") - app.root.protocol("WM_DELETE_WINDOW", lambda: (app.disconnect(), app.root.destroy())) - app.root.mainloop() - - -if __name__ == "__main__": - main() \ No newline at end of file