Files
Meshtastic-Client/LinuxMint/meshtastic_client.py
2025-11-24 18:56:18 +01:00

1336 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
from __future__ import annotations
import json, time, datetime, threading, pathlib, tkinter as tk
from tkinter import ttk, messagebox, simpledialog
from typing import Any, Dict, Optional
import os
import subprocess
import webbrowser
try:
from pubsub import pub
except Exception:
pub = None
try:
from meshtastic.tcp_interface import TCPInterface
except Exception:
TCPInterface = None
try:
from meshtastic.serial_interface import SerialInterface
except Exception:
SerialInterface = None
try:
from meshtastic.ble_interface import BLEInterface
except Exception:
BLEInterface = None
try:
from meshtastic.protobuf import mesh_pb2, portnums_pb2
import google.protobuf.json_format as _json_format
except Exception:
mesh_pb2 = None
portnums_pb2 = None
_json_format = None
try:
from serial.tools import list_ports
except Exception:
list_ports = None
HOST_DEFAULT = "192.168.0.156"
PORT_DEFAULT = 4403
PROJECT_PATH = pathlib.Path(__file__).parent
ICON_PATH = PROJECT_PATH / "meshtastic.ico"
def _prefer_chrome(url: str):
"""Open URL in a sensible browser on any OS.
On Flatpak/Linux we just use the default handler.
"""
# Try system default browser first (works well in Flatpak)
try:
webbrowser.open(url)
return
except Exception:
pass
# Fallbacks for some common desktop setups
candidate_browsers = [
# Linux / BSD
"xdg-open",
"gio open",
"sensible-browser",
"google-chrome-stable",
"chromium",
# Windows (if someone still runs it there)
r"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
r"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
]
for cmd in candidate_browsers:
try:
if os.path.isabs(cmd) and os.path.exists(cmd):
subprocess.Popen([cmd, url])
return
else:
# Let the shell resolve it from PATH
subprocess.Popen(cmd.split() + [url])
return
except Exception:
continue
def _fmt_ago(epoch_seconds: Optional[float]) -> str:
if not epoch_seconds:
return "N/A"
try:
delta = time.time() - float(epoch_seconds)
except Exception:
return "N/A"
if delta < 0:
delta = 0
mins = int(delta // 60)
hours = int(delta // 3600)
days = int(delta // 86400)
if delta < 60:
return "%ds" % int(delta)
if mins < 60:
return "%dm" % mins
if hours < 24:
return "%dh" % hours
if days < 7:
return "%dd" % days
dt = datetime.datetime.fromtimestamp(epoch_seconds)
return dt.strftime("%Y-%m-%d %H:%M")
class MeshtasticGUI:
def __init__(self, master: Optional[tk.Tk] = None):
self.root = master or tk.Tk()
self.root.title("Meshtastic Client")
try:
if ICON_PATH.exists():
self.root.iconbitmap(default=str(ICON_PATH))
except Exception:
pass
self.current_theme = "dark"
self.root.rowconfigure(0, weight=1)
self.root.columnconfigure(0, weight=1)
self.host_var = tk.StringVar(value=HOST_DEFAULT)
self.port_var = tk.IntVar(value=PORT_DEFAULT)
self.menubar = tk.Menu(self.root)
m_conn = tk.Menu(self.menubar, tearoff=False)
m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp)
m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog)
m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog)
m_conn.add_command(label="Disconnect", command=self.disconnect)
m_conn.add_separator()
m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port)
self.menubar.add_cascade(label="Connection", menu=m_conn)
m_tools = tk.Menu(self.menubar, tearoff=False)
m_tools.add_command(label="Clear messages", command=lambda: self.txt_messages.delete("1.0", "end"))
self.menubar.add_cascade(label="Tools", menu=m_tools)
m_view = tk.Menu(self.menubar, tearoff=False)
m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light"))
m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark"))
self.menubar.add_cascade(label="View", menu=m_view)
m_links = tk.Menu(self.menubar, tearoff=False)
m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client"))
m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/"))
m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/"))
m_links.add_command(label="Meshtastic Web Client", command=lambda: self._open_browser_url("https://client.meshtastic.org"))
m_links.add_command(label="Meshtastic docker client", command=lambda: self._open_browser_url("https://meshtastic.org/docs/software/linux/usage/#usage-with-docker"))
m_links.add_separator()
m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/"))
m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/"))
self.menubar.add_cascade(label="Links", menu=m_links)
self.root.config(menu=self.menubar)
self.rootframe = ttk.Frame(self.root)
self.rootframe.grid(row=0, column=0, sticky="nsew")
self.rootframe.rowconfigure(0, weight=1)
self.rootframe.columnconfigure(0, weight=1)
self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal")
self.paned.grid(row=0, column=0, sticky="nsew")
# messages
self.msg_frame = ttk.Frame(self.paned)
self.msg_frame.rowconfigure(1, weight=1)
self.msg_frame.columnconfigure(0, weight=1)
ttk.Label(self.msg_frame, text="Messages").grid(row=0, column=0, sticky="w", pady=(2, 0))
self.txt_messages = tk.Text(self.msg_frame, wrap="word")
self.txt_messages.grid(row=1, column=0, sticky="nsew", padx=(0, 4), pady=(2, 2))
yscroll_left = ttk.Scrollbar(self.msg_frame, orient="vertical", command=self.txt_messages.yview)
self.txt_messages.configure(yscrollcommand=yscroll_left.set)
yscroll_left.grid(row=1, column=1, sticky="ns")
self.send_frame = ttk.Frame(self.msg_frame)
self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4))
self.send_frame.columnconfigure(0, weight=1)
self.ent_message = ttk.Entry(self.send_frame)
self.ent_message.grid(row=0, column=0, sticky="nsew")
self.ent_message.bind("<Return>", lambda e: self.send_message())
self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message)
self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew")
# channel selector (public / selected / private channels)
self.channel_var = tk.StringVar()
self._channel_map = {}
self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=22)
self._reset_channel_choices()
self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w")
# nodes
self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)")
self.nodes_frame.rowconfigure(1, weight=1)
self.nodes_frame.columnconfigure(0, weight=1)
self.ent_search = ttk.Entry(self.nodes_frame)
self.ent_search.grid(row=0, column=0, sticky="nsew", padx=2, pady=2)
self.ent_search.bind("<KeyRelease>", lambda e: self.refresh_nodes())
self.cols_all = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt",
"lastheard", "hwmodel", "role",
"macaddr", "publickey", "isunmessagable", "id"
)
self.cols_visible = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt", "hwmodel", "role"
)
self.tv_nodes = ttk.Treeview(
self.nodes_frame,
columns=self.cols_all,
show="headings",
displaycolumns=self.cols_visible
)
self.tv_nodes.grid(row=1, column=0, sticky="nsew", padx=(2, 0), pady=(0, 2))
headings = {
"shortname": "Short",
"longname": "Long",
"since": "Since",
"hops": "Hops",
"distkm": "Dist (km)",
"speed": "Speed",
"alt": "Alt (m)",
"hwmodel": "HW",
"role": "Role",
"lastheard": "",
"macaddr": "MAC",
"publickey": "Public key",
"isunmessagable": "Unmsg?",
"id": "ID",
}
for key, text in headings.items():
self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False))
widths = {
"shortname": 90,
"longname": 200,
"since": 90,
"hops": 50,
"distkm": 70,
"speed": 70,
"alt": 70,
"hwmodel": 90,
"role": 110,
}
for key, w in widths.items():
try:
self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt")))
except Exception:
pass
# hide technical columns
for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"):
try:
self.tv_nodes.column(col, width=0, minwidth=0, stretch=False)
except Exception:
pass
yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview)
self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set)
yscroll_nodes.grid(row=1, column=1, sticky="ns")
# right-click:
self.node_menu = tk.Menu(self.nodes_frame, tearoff=False)
self.node_menu.add_command(label="Node info", command=self._cm_show_node_details)
self.node_menu.add_command(label="Map", command=self._cm_open_map)
self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute)
self.node_menu.add_separator()
self.node_menu.add_command(label="Delete node", command=self._cm_delete_node)
self.tv_nodes.bind("<Button-3>", self._popup_node_menu)
self.tv_nodes.bind("<Double-1>", lambda e: self._toggle_send_target())
self.paned.add(self.msg_frame, weight=3)
self.paned.add(self.nodes_frame, weight=4)
self.root.after(100, lambda: self._safe_set_sash(0.40))
self.iface: Optional[object] = None
self.connected_evt = threading.Event()
self._last_seen_overrides: Dict[str, float] = {}
self._last_sort_col: Optional[str] = "since"
self._last_sort_reverse: bool = True
if pub is not None:
try:
pub.subscribe(self.on_connection_established, "meshtastic.connection.established")
pub.subscribe(self.on_connection_lost, "meshtastic.connection.lost")
pub.subscribe(self.on_receive, "meshtastic.receive")
pub.subscribe(self.on_node_updated, "meshtastic.node.updated")
except Exception as e:
print("pubsub subscribe failed:", e)
self.apply_theme("light")
self._append("Ready. Connection -> Connect (TCP/USB/BLE)")
self._update_title_with_host()
# helpers ---------------------------------------------------------
def _open_browser_url(self, url: str):
_prefer_chrome(url)
def _update_title_with_host(self):
self.root.title("Meshtastic Client - %s:%s" % (self.host_var.get(), self.port_var.get()))
def _safe_set_sash(self, fraction: float = 0.5):
try:
w = self.paned.winfo_width() or self.paned.winfo_reqwidth()
self.paned.sashpos(0, int(w * fraction))
except Exception:
pass
def _node_label(self, node_id: str) -> str:
if not self.iface or not getattr(self.iface, "nodes", None):
return node_id
node = self.iface.nodes.get(node_id, {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
label = ("%s %s" % (shortname, longname)).strip()
if label:
return label
return node_id
# pubsub callbacks ------------------------------------------------
def on_connection_established(self, interface=None, **kwargs):
self.connected_evt.set()
self._append("[+] Connected")
# refresh nodes and channel list when we connect
self.refresh_nodes()
try:
self._update_channels_from_iface()
except Exception:
pass
def on_connection_lost(self, interface=None, **kwargs):
self.connected_evt.clear()
self._append("[-] Connection lost")
def on_node_updated(self, node=None, interface=None, **kwargs):
self.root.after(0, self.refresh_nodes)
def on_receive(self, packet=None, interface=None, **kwargs):
self.root.after(0, lambda: self._handle_receive(packet or {}))
# receive ---------------------------------------------------------
def _handle_receive(self, packet: dict):
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
app_name = decoded.get("app", "")
portnum = decoded.get("portnum") or app_name
from_id = packet.get("fromId") or packet.get("from") or "UNKNOWN"
label = self._node_label(from_id) if hasattr(self, "_node_label") else from_id
tag_map = {
"TEXT_MESSAGE_APP": "MSG",
"TEXT_MESSAGE_COMPRESSED_APP": "MSG",
"POSITION_APP": "POS",
"TELEMETRY_APP": "TEL",
"NODEINFO_APP": "INFO",
"ROUTING_APP": "ROUT",
"MAP_REPORT_APP": "MAP",
"ADMIN_APP": "ADM",
"NEIGHBORINFO_APP": "NEI",
"STORE_FORWARD_APP": "SFWD",
"REMOTE_HARDWARE_APP": "RHW",
"PRIVATE_APP": "PRIV",
}
tag = tag_map.get(app_name or portnum, "INFO")
if app_name in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"):
text = decoded.get("text") or decoded.get("payload", {}).get("text", "")
if text:
self._append(f"[MSG] {label}: {text}")
else:
self._append(f"[MSG] {label}")
if hasattr(self, "refresh_nodes"):
self.refresh_nodes()
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
portnum = decoded.get("portnum")
sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort")
if sender:
self._last_seen_overrides[str(sender)] = time.time()
user = {}
if self.iface and getattr(self.iface, "nodes", None) and sender:
user = (self.iface.nodes.get(sender) or {}).get("user", {}) # type: ignore[attr-defined]
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
label = str(shortname or longname or sender or "Unknown").strip()
text = ""
p = decoded.get("payload", "")
if isinstance(p, (bytes, bytearray)):
try:
text = p.decode("utf-8", errors="ignore")
except Exception:
text = repr(p)
elif isinstance(p, str):
text = p
else:
t = decoded.get("text")
if isinstance(t, bytes):
text = t.decode("utf-8", errors="ignore")
elif isinstance(t, str):
text = t
rssi = packet.get("rxRssi")
if portnum == "TEXT_MESSAGE_APP":
self._append("[MSG] %s: %s (RSSI=%s)" % (label, text, rssi))
if isinstance(text, str) and text.strip().lower() == "ping":
self._send_pong(sender, label)
self.refresh_nodes()
def _send_pong(self, dest_id: Optional[str], label: str):
if not self.iface or not dest_id:
return
try:
self.iface.sendText("pong", destinationId=dest_id, wantAck=False)
self._append("[auto] pong -> %s" % label)
except Exception as e:
self._append("[auto] pong failed: %s" % e)
# connection actions ----------------------------------------------
def set_ip_port(self):
win = tk.Toplevel(self.root)
win.title("Set IP/Port")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.grid(row=0, column=0, sticky="nsew")
win.columnconfigure(0, weight=1)
win.rowconfigure(0, weight=1)
ttk.Label(frm, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4)
ent_host = ttk.Entry(frm, textvariable=self.host_var, width=28)
ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=4)
ttk.Label(frm, text="Port:").grid(row=1, column=0, sticky="w", pady=4)
ent_port = ttk.Entry(frm, textvariable=self.port_var, width=10)
ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=4)
frm.columnconfigure(1, weight=1)
def save():
h = self.host_var.get().strip()
try:
p = int(self.port_var.get())
except Exception:
messagebox.showerror("Port", "Port must be 1-65535")
return
if not h:
messagebox.showerror("Host", "Host cannot be empty")
return
if not (1 <= p <= 65535):
messagebox.showerror("Port", "Port must be 1-65535")
return
self.host_var.set(h)
self.port_var.set(p)
self._update_title_with_host()
win.destroy()
btnbar = ttk.Frame(frm)
btnbar.grid(row=2, column=0, columnspan=2, sticky="e")
ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4)
ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4)
def connect_tcp(self):
if self.iface:
return
host = self.host_var.get().strip()
try:
port = int(self.port_var.get())
except Exception:
messagebox.showerror("Port", "Invalid port")
return
self._append("Connecting TCP %s:%s ..." % (host, port))
def run():
try:
if TCPInterface is None:
raise RuntimeError("meshtastic.tcp_interface not installed")
self.iface = TCPInterface(hostname=host, portNumber=port)
self.connected_evt.wait(timeout=5)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("TCP connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_serial_dialog(self):
if SerialInterface is None:
messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.")
return
ports = []
if list_ports:
try:
ports = [p.device for p in list_ports.comports()]
except Exception:
ports = []
presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"]
port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=presets[0])
if port is None:
return
port = port.strip()
if port.lower() == "(auto)" or port == "":
port = None
self._append("Connecting Serial %s ..." % (port or "(auto)"))
def run():
try:
self.iface = SerialInterface(devPath=port) if port else SerialInterface()
self.connected_evt.wait(timeout=5)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Serial connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_ble_dialog(self):
if BLEInterface is None:
messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).")
return
self._append("Scanning BLE for Meshtastic devices ...")
try:
devices = BLEInterface.scan()
except Exception as e:
messagebox.showerror("BLE scan failed", str(e))
return
if not devices:
messagebox.showinfo("BLE", "No devices found.")
return
options = ["%d. %s [%s]" % (i + 1, getattr(d, "name", "") or "(unnamed)", getattr(d, "address", "?")) for i, d in enumerate(devices)]
choice = simpledialog.askinteger(
"Select BLE device",
"Enter number:\n" + "\n".join(options),
minvalue=1,
maxvalue=len(devices),
)
if not choice:
return
addr = getattr(devices[choice - 1], "address", None)
if not addr:
messagebox.showerror("BLE", "Selected device has no address.")
return
self._append("Connecting BLE %s ..." % addr)
def run():
try:
self.iface = BLEInterface(address=addr)
self.connected_evt.wait(timeout=8)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("BLE connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def disconnect(self):
try:
if self.iface:
self.iface.close()
except Exception:
pass
self.iface = None
self.connected_evt.clear()
self._append("[*] Disconnected")
# send ------------------------------------------------------------
# channel selector helpers ----------------------------------------
def _reset_channel_choices(self):
"""Initialize channel selector with Public + To selected."""
self._channel_map = {}
options = []
label_pub = "Public (broadcast)"
self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0}
options.append(label_pub)
label_sel = "To selected node"
self._channel_map[label_sel] = {"mode": "selected", "channelIndex": 0}
options.append(label_sel)
if hasattr(self, "cbo_channel"):
self.cbo_channel["values"] = options
if hasattr(self, "channel_var"):
self.channel_var.set(label_pub)
def _set_channel_choice(self, label: str):
"""Safely set the current channel choice, if it exists."""
try:
values = list(self.cbo_channel["values"])
except Exception:
return
if label not in values:
return
self.channel_var.set(label)
def _update_channels_from_iface(self):
"""
Populate channel selector with channels from the connected device.
We keep:
* "Public (broadcast)" -> broadcast on channel 0
* "To selected node" -> direct message to selected node on channel 0
And then append additional channels (1..N) from the radio as:
* "Ch <idx>: <name>" -> broadcast on that channel.
"""
iface = getattr(self, "iface", None)
if not iface:
return
local_node = getattr(iface, "localNode", None)
if not local_node:
return
# Try to request channels if we don't have them yet.
chans = getattr(local_node, "channels", None)
try:
if (not chans) and hasattr(local_node, "requestChannels"):
local_node.requestChannels()
time.sleep(1.5)
chans = getattr(local_node, "channels", None)
except Exception:
chans = getattr(local_node, "channels", None)
try:
options = list(self.cbo_channel["values"])
except Exception:
return
# If channel 0 has a name, update the "Public" label to show it.
try:
if chans and len(chans) > 0:
ch0 = chans[0]
try:
ch0_name = (getattr(ch0, "settings", None).name or "").strip()
except Exception:
try:
ch0_name = (ch0.settings.name or "").strip()
except Exception:
ch0_name = ""
if ch0_name:
# Find the existing public entry (mode=broadcast, channelIndex=0)
old_label = None
for lbl, meta in list(self._channel_map.items()):
if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0:
old_label = lbl
break
if old_label:
new_label = f"Public (ch0: {ch0_name})"
if new_label != old_label:
# Update mapping
self._channel_map[new_label] = self._channel_map.pop(old_label)
# Update combobox options
options = [new_label if v == old_label else v for v in options]
# Keep current selection if it was pointing to the old label
if self.channel_var.get() == old_label:
self.channel_var.set(new_label)
except Exception:
# Failing to pretty-print channel 0 is not fatal; just continue.
pass
# Add remaining channels (1..N) as broadcast options.
for idx, ch in enumerate(chans or []):
if idx == 0:
# Skip channel 0 here it's already represented by "Public".
continue
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
# older protobufs might expose fields differently
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
if not name:
label = f"Ch {idx}"
else:
label = f"Ch {idx}: {name}"
if label in self._channel_map:
continue
self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx}
options.append(label)
try:
self.cbo_channel["values"] = options
except Exception:
pass
def send_message(self):
msg = self.ent_message.get().strip()
if not msg:
return
if not self.iface:
messagebox.showwarning("Not connected", "Connect first.")
return
try:
choice = self.channel_var.get() if hasattr(self, "channel_var") else ""
info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or {
"mode": "broadcast",
"channelIndex": 0,
}
mode = info.get("mode", "broadcast")
ch_index = int(info.get("channelIndex", 0) or 0)
if mode == "selected":
# Direct message to the currently selected node
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("No selection", "Select a node first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Send failed", "Cannot resolve destination for selected node.")
return
self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index)
self._append("[ME -> %s] %s" % (self._node_label(nid), msg))
else:
# Broadcast on chosen channel (public or private)
self.iface.sendText(msg, wantAck=False, channelIndex=ch_index)
label = self.channel_var.get() if hasattr(self, "channel_var") else ""
if mode == "broadcast_channel" and ch_index:
self._append("[ME ch%d] %s" % (ch_index, msg))
else:
self._append("[ME] %s" % msg)
self.ent_message.delete(0, "end")
except Exception as e:
messagebox.showerror("Send failed", str(e))
# nodes -----------------------------------------------------------
def _get_lastheard_epoch(self, node_id: str, node: Dict[str, Any]) -> Optional[float]:
raw = (node or {}).get("lastHeard")
ts_iface = None
if raw is not None:
try:
val = float(raw)
ts_iface = val / 1000.0 if val > 10000000000 else val
except Exception:
ts_iface = None
ts_local = self._last_seen_overrides.get(str(node_id))
if ts_iface and ts_local:
return max(ts_iface, ts_local)
return ts_iface or ts_local
def _extract_latlon(self, node: dict) -> tuple[float | None, float | None]:
pos = (node or {}).get("position") or {}
lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i")
lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i")
try:
if lat is not None:
lat = float(lat) * (1e-7 if abs(float(lat)) > 90 else 1.0)
if lon is not None:
lon = float(lon) * (1e-7 if abs(float(lon)) > 180 else 1.0)
except Exception:
lat = lon = None
return lat, lon
def _extract_speed_alt(self, node: dict) -> tuple[float | None, float | None]:
"""Return (speed_kmh, alt_m) if present in node.position, else (None, None)."""
pos = (node or {}).get("position") or {}
speed = (
pos.get("groundSpeedKmh")
or pos.get("groundSpeedKmhI")
or pos.get("groundSpeed")
or pos.get("ground_speed")
)
alt = (
pos.get("altitude")
or pos.get("altitudeM")
or pos.get("altitude_i")
or pos.get("altitudeI")
)
try:
if speed is not None:
speed = float(speed)
if alt is not None:
alt = float(alt)
except Exception:
speed, alt = None, None
return speed, alt
def _get_local_latlon(self) -> tuple[float | None, float | None]:
if not self.iface:
return (None, None)
try:
mi = getattr(self.iface, "myInfo", None)
nbn = getattr(self.iface, "nodesByNum", None)
if mi is not None and hasattr(mi, "my_node_num") and nbn:
n = nbn.get(mi.my_node_num) or {}
lat, lon = self._extract_latlon(n)
if lat is not None and lon is not None:
return lat, lon
except Exception:
pass
return (None, None)
def _haversine_km(self, lat1, lon1, lat2, lon2) -> float:
import math
R = 6371.0088
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlmb = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlmb / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def refresh_nodes(self):
if not self.iface or not getattr(self.iface, "nodes", None):
return
q = self.ent_search.get().strip().lower()
for iid in self.tv_nodes.get_children(""):
self.tv_nodes.delete(iid)
try:
nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined]
except Exception:
nodes_snapshot = {}
base_lat, base_lon = self._get_local_latlon()
for node_id, node in nodes_snapshot.items():
user = (node or {}).get("user") or {}
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
hwmodel = user.get("hwModel") or ""
role = user.get("role") or ""
macaddr = user.get("macaddr") or ""
publickey = user.get("publicKey") or ""
unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False
lastheard_epoch = self._get_lastheard_epoch(node_id, node)
since_str = _fmt_ago(lastheard_epoch)
hops = node.get("hopsAway")
lat, lon = self._extract_latlon(node)
if base_lat is not None and base_lon is not None and lat is not None and lon is not None:
try:
dist = self._haversine_km(base_lat, base_lon, lat, lon)
except Exception:
dist = None
else:
dist = None
dist_str = "%.1f" % dist if isinstance(dist, (int, float)) else "-"
speed, alt = self._extract_speed_alt(node)
speed_str = "%.1f" % speed if isinstance(speed, (int, float)) else "-"
alt_str = "%.0f" % alt if isinstance(alt, (int, float)) else "-"
values = (
shortname,
longname,
since_str,
str(hops) if hops is not None else "-",
dist_str,
speed_str,
alt_str,
"%.0f" % (lastheard_epoch or 0),
hwmodel,
role,
macaddr,
publickey,
str(bool(unmsg)),
node_id,
)
if not q or any(q in str(v).lower() for v in values):
try:
self.tv_nodes.insert("", "end", iid=node_id, values=values)
except Exception:
self.tv_nodes.insert("", "end", values=values)
if self._last_sort_col:
self.sort_by_column(self._last_sort_col, self._last_sort_reverse)
self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children()))
def sort_by_column(self, col: str, reverse: bool = False):
self._last_sort_col = col
self._last_sort_reverse = reverse
col_to_sort = "lastheard" if col == "since" else col
numeric = {"lastheard", "distkm", "hops", "speed", "alt"}
rows = []
for iid in self.tv_nodes.get_children(""):
val = self.tv_nodes.set(iid, col_to_sort)
if col_to_sort in numeric:
try:
val = float(val if val != "-" else 0.0)
except Exception:
val = 0.0
else:
val = val.casefold()
rows.append((val, iid))
rows.sort(key=lambda t: t[0], reverse=reverse)
for index, (_, iid) in enumerate(rows):
self.tv_nodes.move(iid, "", index)
self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse))
# THEME -----------------------------------------------------------
def apply_theme(self, mode: str = "light"):
self.current_theme = mode
is_dark = mode == "dark"
bg = "#1e1e1e" if is_dark else "#f5f5f5"
fg = "#ffffff" if is_dark else "#000000"
acc = "#2d2d2d" if is_dark else "#ffffff"
sel = "#555555" if is_dark else "#cce0ff"
# Root window background (client area)
try:
self.root.configure(bg=bg)
except Exception:
pass
style = ttk.Style(self.root)
try:
style.theme_use("clam")
except Exception:
pass
style.configure("TFrame", background=bg)
style.configure("TLabelframe", background=bg, foreground=fg)
style.configure("TLabelframe.Label", background=bg, foreground=fg)
style.configure("TLabel", background=bg, foreground=fg)
style.configure("TButton", background=acc, foreground=fg)
style.configure("TEntry", fieldbackground=acc, foreground=fg)
style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg)
style.map(
"TCombobox",
fieldbackground=[("readonly", acc)],
foreground=[("readonly", fg)],
background=[("readonly", acc)],
)
style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0)
style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)])
# Menubar itself (stored as self.menubar when created)
try:
if hasattr(self, "menubar") and self.menubar is not None:
self.menubar.configure(
background=bg,
foreground=fg,
activebackground=sel,
activeforeground=fg,
borderwidth=0,
relief="flat",
)
except Exception:
pass
try:
self.txt_messages.configure(
bg=acc,
fg=fg,
insertbackground=fg,
selectbackground=sel,
selectforeground=fg,
)
except Exception:
pass
try:
self.root.option_add("*Menu*background", bg)
self.root.option_add("*Menu*foreground", fg)
self.root.option_add("*Menu*activeBackground", sel)
self.root.option_add("*Menu*activeForeground", fg)
# Dark background for ttk.Combobox dropdown list
try:
self.root.option_add("*TCombobox*Listbox*background", acc)
self.root.option_add("*TCombobox*Listbox*foreground", fg)
except Exception:
pass
except Exception:
pass
def _style_toplevel(self, win: tk.Toplevel):
is_dark = self.current_theme == "dark"
bg = "#1e1e1e" if is_dark else "#f5f5f5"
win.configure(bg=bg)
# UTILS / CONTEXT ------------------------------------------------
def _append(self, text: str):
self.txt_messages.insert("end", text + "\n")
self.txt_messages.see("end")
def _get_selected_node_id(self) -> Optional[str]:
sel = self.tv_nodes.selection()
if not sel:
return None
return sel[0]
def _cm_traceroute(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Traceroute", "Select a node first.")
return
if not self.iface:
messagebox.showwarning("Traceroute", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.")
return
self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})")
threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start()
def _cm_delete_node(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Delete node", "Select a node first.")
return
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showwarning("Delete node", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Delete node", "Cannot determine node ID.")
return
label = self._node_label(nid)
if not messagebox.askyesno(
"Delete node",
f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\n"
"The device might reboot after this."
):
return
try:
# Use python-meshtastic Node.removeNode API to remove from NodeDB
# https://python.meshtastic.org/node.html
self.iface.localNode.removeNode(dest) # type: ignore[attr-defined]
self._append(f"[admin] Requested delete of node {label} ({dest})")
except Exception as e:
messagebox.showerror("Delete node", f"Failed to delete node: {e}")
return
# Also remove from UI for this session
try:
self.tv_nodes.delete(nid)
self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children()))
except Exception:
pass
def _resolve_node_dest_id(self, nid: str) -> Optional[str]:
# `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx)
if nid.startswith("!") or nid.isdigit():
return nid
try:
if self.iface and getattr(self.iface, "nodes", None):
node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
node_id = user.get("id") or ""
if node_id:
return node_id
except Exception:
pass
if nid:
return "!" + nid if not nid.startswith("!") else nid
return None
def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0):
# Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI.
if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"):
self._do_traceroute_via_interface(dest, hop_limit, channel_index)
else:
self._do_traceroute_via_cli(dest)
def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int):
evt = threading.Event()
result: Dict[str, Any] = {}
def _num_to_label(num: int) -> str:
try:
nbn = getattr(self.iface, "nodesByNum", None)
if nbn and num in nbn:
n = nbn[num]
user = (n or {}).get("user") or {}
sid = user.get("id") or f"!{num:08x}"
sn = user.get("shortName") or ""
ln = user.get("longName") or ""
label = (sn or ln or sid).strip()
return f"{label} ({sid})" if sid else label
except Exception:
pass
return f"!{int(num):08x}"
def _on_response(p: dict):
try:
rd = mesh_pb2.RouteDiscovery()
rd.ParseFromString(p["decoded"]["payload"])
as_dict = _json_format.MessageToDict(rd)
result["packet"] = p
result["data"] = as_dict
except Exception as e: # pragma: no cover - defensive
result["error"] = str(e)
finally:
evt.set()
try:
r = mesh_pb2.RouteDiscovery()
# Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients
self.iface.sendData(
r,
destinationId=dest,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=_on_response,
channelIndex=channel_index,
hopLimit=hop_limit,
)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}"))
return
if not evt.wait(30.0):
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported)."))
return
if "error" in result:
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}"))
return
p = result.get("packet") or {}
data = result.get("data") or {}
UNK = -128
try:
origin_num = int(p.get("to"))
dest_num = int(p.get("from"))
except Exception:
origin_num = None
dest_num = None
def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]:
route_nums = []
for v in data.get(route_key, []):
try:
route_nums.append(int(v))
except Exception:
pass
snrs = []
for v in data.get(snr_key, []):
try:
snrs.append(int(v))
except Exception:
pass
if not start_num or not end_num:
return None
nodes = [start_num] + route_nums + [end_num]
if len(nodes) <= 1:
return None
parts = []
for idx, num in enumerate(nodes):
label = _num_to_label(num)
if idx == 0:
parts.append(label)
else:
snr_txt = "? dB"
if (idx - 1) < len(snrs):
v = snrs[idx - 1]
if v != UNK:
snr_txt = f"{v / 4.0:.2f} dB"
parts.append(f"{label} ({snr_txt})")
return title + "\n" + " -> ".join(parts)
lines = []
fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num)
if fwd:
lines.append(fwd)
back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num)
if back:
lines.append(back)
if not lines:
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available."))
return
text = "\n\n".join(lines)
self.root.after(0, lambda: self._show_traceroute_window(text))
def _do_traceroute_via_cli(self, dest: str):
host = (self.host_var.get() or "").strip() or HOST_DEFAULT
cmd = ["meshtastic", "--host", host, "--traceroute", dest]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40)
except Exception as e: # pragma: no cover - environment specific
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}"))
return
out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "")
if not out.strip():
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute."))
return
self.root.after(0, lambda: self._show_traceroute_window(out))
def _show_traceroute_window(self, text: str):
win = tk.Toplevel(self.root)
win.title("Traceroute")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
txt.insert("1.0", text.strip() or "No traceroute data.")
txt.configure(state="disabled")
def _toggle_send_target(self):
nid = self._get_selected_node_id()
if nid:
# Switch selector to "To selected node" when a node is double-clicked
self._set_channel_choice("To selected node")
self._append("[target] will send to %s" % self._node_label(nid))
else:
# No selection -> fall back to public broadcast
self._set_channel_choice("Public (broadcast)")
def _popup_node_menu(self, event):
iid = self.tv_nodes.identify_row(event.y)
if iid:
self.tv_nodes.selection_set(iid)
self.node_menu.tk_popup(event.x_root, event.y_root)
self.node_menu.grab_release()
def _cm_show_node_details(self):
self.show_raw_node(friendly=True)
def _cm_open_map(self):
nid = self._get_selected_node_id()
if not nid or not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Map", "No node selected.")
return
node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined]
lat, lon = self._extract_latlon(node)
if lat is None or lon is None:
messagebox.showinfo("Map", "Selected node has no GPS position.")
return
url = f"https://www.google.com/maps/search/?api=1&query={lat},{lon}"
self._open_browser_url(url)
def show_raw_node(self, friendly: bool = False):
nid = self._get_selected_node_id()
if not nid or not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Node", "No node selected.")
return
node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined]
win = tk.Toplevel(self.root)
win.title("Node: %s" % self._node_label(nid))
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
if friendly:
def fmt_val(v, indent=0):
pad = " " * indent
if isinstance(v, dict):
lines = []
for k, vv in v.items():
if isinstance(vv, (dict, list)):
lines.append(f"{pad}{k}:")
lines.append(fmt_val(vv, indent + 1))
else:
lines.append(f"{pad}{k}: {vv}")
return "\n".join(lines)
elif isinstance(v, list):
lines = []
for i, item in enumerate(v):
if isinstance(item, (dict, list)):
lines.append(f"{pad}- [{i}]")
lines.append(fmt_val(item, indent + 1))
else:
lines.append(f"{pad}- {item}")
return "\n".join(lines)
else:
return f"{pad}{v}"
user = (node or {}).get("user") or {}
pos = (node or {}).get("position") or {}
caps = (node or {}).get("capabilities") or {}
config = (node or {}).get("config") or {}
node_id = user.get("id") or node.get("id") or nid
macaddr = user.get("macaddr") or node.get("macaddr") or ""
publickey = user.get("publicKey") or node.get("publicKey") or ""
hw = user.get("hwModel", "")
lines = [
f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(),
f"ID: {node_id}",
f"MAC: {macaddr}",
f"HW: {hw}",
f"Public key: {publickey}",
"",
f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}",
"",
"Position:",
fmt_val(pos, 1),
]
if caps:
lines.append("Capabilities:")
lines.append(fmt_val(caps, 1))
if config:
lines.append("Config:")
lines.append(fmt_val(config, 1))
lines.append("RAW fields:")
skip = {"user", "position", "capabilities", "config"}
other = {k: v for k, v in (node or {}).items() if k not in skip}
lines.append(fmt_val(other, 1))
txt.insert("1.0", "\n".join(lines))
else:
txt.insert("1.0", json.dumps(node, indent=2, default=str))
txt.configure(state="disabled")
def main():
app = MeshtasticGUI()
app.root.geometry("1500x820")
app.root.protocol("WM_DELETE_WINDOW", lambda: (app.disconnect(), app.root.destroy()))
app.root.mainloop()
if __name__ == "__main__":
main()