diff --git a/MeshtasticClient.exe b/MeshtasticClient.exe index 80859e7..58097c6 100644 Binary files a/MeshtasticClient.exe and b/MeshtasticClient.exe differ diff --git a/meshtastic_client.py b/meshtastic_client.py index af84170..7c85218 100644 --- a/meshtastic_client.py +++ b/meshtastic_client.py @@ -26,6 +26,14 @@ try: except Exception: BLEInterface = None +try: + from meshtastic.protobuf import mesh_pb2, portnums_pb2 + import google.protobuf.json_format as _json_format +except Exception: + mesh_pb2 = None + portnums_pb2 = None + _json_format = None + try: from serial.tools import list_ports except Exception: @@ -237,6 +245,7 @@ class MeshtasticGUI: self.node_menu = tk.Menu(self.nodes_frame, tearoff=False) self.node_menu.add_command(label="Node info", command=self._cm_show_node_details) self.node_menu.add_command(label="Map", command=self._cm_open_map) + self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute) self.tv_nodes.bind("", self._popup_node_menu) self.tv_nodes.bind("", lambda e: self._toggle_send_target()) @@ -766,6 +775,189 @@ class MeshtasticGUI: return None return sel[0] + + def _cm_traceroute(self): + nid = self._get_selected_node_id() + if not nid: + messagebox.showinfo("Traceroute", "Select a node first.") + return + if not self.iface: + messagebox.showwarning("Traceroute", "Connect first.") + return + dest = self._resolve_node_dest_id(nid) + if not dest: + messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.") + return + self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})") + threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start() + + def _resolve_node_dest_id(self, nid: str) -> Optional[str]: + # `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx) + if nid.startswith("!") or nid.isdigit(): + return nid + try: + if self.iface and getattr(self.iface, "nodes", None): + node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined] + user = (node or {}).get("user") or {} + node_id = user.get("id") or "" + if node_id: + return node_id + except Exception: + pass + if nid: + return "!" + nid if not nid.startswith("!") else nid + return None + + def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0): + # Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI. + if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"): + self._do_traceroute_via_interface(dest, hop_limit, channel_index) + else: + self._do_traceroute_via_cli(dest) + + def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int): + evt = threading.Event() + result: Dict[str, Any] = {} + + def _num_to_label(num: int) -> str: + try: + nbn = getattr(self.iface, "nodesByNum", None) + if nbn and num in nbn: + n = nbn[num] + user = (n or {}).get("user") or {} + sid = user.get("id") or f"!{num:08x}" + sn = user.get("shortName") or "" + ln = user.get("longName") or "" + label = (sn or ln or sid).strip() + return f"{label} ({sid})" if sid else label + except Exception: + pass + return f"!{int(num):08x}" + + def _on_response(p: dict): + try: + rd = mesh_pb2.RouteDiscovery() + rd.ParseFromString(p["decoded"]["payload"]) + as_dict = _json_format.MessageToDict(rd) + result["packet"] = p + result["data"] = as_dict + except Exception as e: # pragma: no cover - defensive + result["error"] = str(e) + finally: + evt.set() + + try: + r = mesh_pb2.RouteDiscovery() + # Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients + self.iface.sendData( + r, + destinationId=dest, + portNum=portnums_pb2.PortNum.TRACEROUTE_APP, + wantResponse=True, + onResponse=_on_response, + channelIndex=channel_index, + hopLimit=hop_limit, + ) + except Exception as e: + self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}")) + return + + if not evt.wait(30.0): + self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported).")) + return + + if "error" in result: + self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}")) + return + + p = result.get("packet") or {} + data = result.get("data") or {} + UNK = -128 + + try: + origin_num = int(p.get("to")) + dest_num = int(p.get("from")) + except Exception: + origin_num = None + dest_num = None + + def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]: + route_nums = [] + for v in data.get(route_key, []): + try: + route_nums.append(int(v)) + except Exception: + pass + snrs = [] + for v in data.get(snr_key, []): + try: + snrs.append(int(v)) + except Exception: + pass + if not start_num or not end_num: + return None + nodes = [start_num] + route_nums + [end_num] + if len(nodes) <= 1: + return None + parts = [] + for idx, num in enumerate(nodes): + label = _num_to_label(num) + if idx == 0: + parts.append(label) + else: + snr_txt = "? dB" + if (idx - 1) < len(snrs): + v = snrs[idx - 1] + if v != UNK: + snr_txt = f"{v / 4.0:.2f} dB" + parts.append(f"{label} ({snr_txt})") + return title + "\n" + " -> ".join(parts) + + lines = [] + fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num) + if fwd: + lines.append(fwd) + back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num) + if back: + lines.append(back) + + if not lines: + self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available.")) + return + + text = "\n\n".join(lines) + self.root.after(0, lambda: self._show_traceroute_window(text)) + + def _do_traceroute_via_cli(self, dest: str): + host = (self.host_var.get() or "").strip() or HOST_DEFAULT + cmd = ["meshtastic", "--host", host, "--traceroute", dest] + try: + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40) + except Exception as e: # pragma: no cover - environment specific + self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}")) + return + out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "") + if not out.strip(): + self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute.")) + return + self.root.after(0, lambda: self._show_traceroute_window(out)) + + def _show_traceroute_window(self, text: str): + win = tk.Toplevel(self.root) + win.title("Traceroute") + self._style_toplevel(win) + frm = ttk.Frame(win, padding=8) + frm.pack(expand=True, fill="both") + txt = tk.Text(frm, wrap="word") + txt.pack(expand=True, fill="both") + is_dark = self.current_theme == "dark" + txt.configure( + bg=("#2d2d2d" if is_dark else "#ffffff"), + fg=("#ffffff" if is_dark else "#000000"), + insertbackground=("#ffffff" if is_dark else "#000000"), + ) + txt.insert("1.0", text.strip() or "No traceroute data.") + txt.configure(state="disabled") def _toggle_send_target(self): nid = self._get_selected_node_id() self.send_to_selected.set(bool(nid))