Add files via upload

Add traceroute option to node context menu to visualize hop-by-hop route (forward/return), including node IDs, names and SNR, with automatic CLI fallback if API decode is unavailable.
This commit is contained in:
Knud Schrøder
2025-11-09 17:24:13 +01:00
committed by GitHub
parent 6b506a551d
commit ca89d9da67
2 changed files with 192 additions and 0 deletions
Binary file not shown.
+192
View File
@@ -26,6 +26,14 @@ try:
except Exception:
BLEInterface = None
try:
from meshtastic.protobuf import mesh_pb2, portnums_pb2
import google.protobuf.json_format as _json_format
except Exception:
mesh_pb2 = None
portnums_pb2 = None
_json_format = None
try:
from serial.tools import list_ports
except Exception:
@@ -237,6 +245,7 @@ class MeshtasticGUI:
self.node_menu = tk.Menu(self.nodes_frame, tearoff=False)
self.node_menu.add_command(label="Node info", command=self._cm_show_node_details)
self.node_menu.add_command(label="Map", command=self._cm_open_map)
self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute)
self.tv_nodes.bind("<Button-3>", self._popup_node_menu)
self.tv_nodes.bind("<Double-1>", lambda e: self._toggle_send_target())
@@ -766,6 +775,189 @@ class MeshtasticGUI:
return None
return sel[0]
def _cm_traceroute(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Traceroute", "Select a node first.")
return
if not self.iface:
messagebox.showwarning("Traceroute", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.")
return
self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})")
threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start()
def _resolve_node_dest_id(self, nid: str) -> Optional[str]:
# `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx)
if nid.startswith("!") or nid.isdigit():
return nid
try:
if self.iface and getattr(self.iface, "nodes", None):
node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
node_id = user.get("id") or ""
if node_id:
return node_id
except Exception:
pass
if nid:
return "!" + nid if not nid.startswith("!") else nid
return None
def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0):
# Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI.
if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"):
self._do_traceroute_via_interface(dest, hop_limit, channel_index)
else:
self._do_traceroute_via_cli(dest)
def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int):
evt = threading.Event()
result: Dict[str, Any] = {}
def _num_to_label(num: int) -> str:
try:
nbn = getattr(self.iface, "nodesByNum", None)
if nbn and num in nbn:
n = nbn[num]
user = (n or {}).get("user") or {}
sid = user.get("id") or f"!{num:08x}"
sn = user.get("shortName") or ""
ln = user.get("longName") or ""
label = (sn or ln or sid).strip()
return f"{label} ({sid})" if sid else label
except Exception:
pass
return f"!{int(num):08x}"
def _on_response(p: dict):
try:
rd = mesh_pb2.RouteDiscovery()
rd.ParseFromString(p["decoded"]["payload"])
as_dict = _json_format.MessageToDict(rd)
result["packet"] = p
result["data"] = as_dict
except Exception as e: # pragma: no cover - defensive
result["error"] = str(e)
finally:
evt.set()
try:
r = mesh_pb2.RouteDiscovery()
# Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients
self.iface.sendData(
r,
destinationId=dest,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=_on_response,
channelIndex=channel_index,
hopLimit=hop_limit,
)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}"))
return
if not evt.wait(30.0):
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported)."))
return
if "error" in result:
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}"))
return
p = result.get("packet") or {}
data = result.get("data") or {}
UNK = -128
try:
origin_num = int(p.get("to"))
dest_num = int(p.get("from"))
except Exception:
origin_num = None
dest_num = None
def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]:
route_nums = []
for v in data.get(route_key, []):
try:
route_nums.append(int(v))
except Exception:
pass
snrs = []
for v in data.get(snr_key, []):
try:
snrs.append(int(v))
except Exception:
pass
if not start_num or not end_num:
return None
nodes = [start_num] + route_nums + [end_num]
if len(nodes) <= 1:
return None
parts = []
for idx, num in enumerate(nodes):
label = _num_to_label(num)
if idx == 0:
parts.append(label)
else:
snr_txt = "? dB"
if (idx - 1) < len(snrs):
v = snrs[idx - 1]
if v != UNK:
snr_txt = f"{v / 4.0:.2f} dB"
parts.append(f"{label} ({snr_txt})")
return title + "\n" + " -> ".join(parts)
lines = []
fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num)
if fwd:
lines.append(fwd)
back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num)
if back:
lines.append(back)
if not lines:
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available."))
return
text = "\n\n".join(lines)
self.root.after(0, lambda: self._show_traceroute_window(text))
def _do_traceroute_via_cli(self, dest: str):
host = (self.host_var.get() or "").strip() or HOST_DEFAULT
cmd = ["meshtastic", "--host", host, "--traceroute", dest]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40)
except Exception as e: # pragma: no cover - environment specific
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}"))
return
out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "")
if not out.strip():
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute."))
return
self.root.after(0, lambda: self._show_traceroute_window(out))
def _show_traceroute_window(self, text: str):
win = tk.Toplevel(self.root)
win.title("Traceroute")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
txt.insert("1.0", text.strip() or "No traceroute data.")
txt.configure(state="disabled")
def _toggle_send_target(self):
nid = self._get_selected_node_id()
self.send_to_selected.set(bool(nid))