Files
Meshtastic-Client/meshtastic_client.py
2025-11-26 15:41:24 +01:00

1644 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
from __future__ import annotations
import json, time, datetime, threading, pathlib, tkinter as tk
from tkinter import ttk, messagebox, simpledialog
from typing import Any, Dict, Optional
import os
import subprocess
import webbrowser
try:
from pubsub import pub
except Exception:
pub = None
try:
from meshtastic.tcp_interface import TCPInterface
except Exception:
TCPInterface = None
try:
from meshtastic.serial_interface import SerialInterface
except Exception:
SerialInterface = None
try:
from meshtastic.ble_interface import BLEInterface
except Exception:
BLEInterface = None
try:
from meshtastic.protobuf import mesh_pb2, portnums_pb2, telemetry_pb2
import google.protobuf.json_format as _json_format
except Exception:
mesh_pb2 = None
portnums_pb2 = None
telemetry_pb2 = None
_json_format = None
try:
from serial.tools import list_ports
except Exception:
list_ports = None
HOST_DEFAULT = "192.168.0.156"
PORT_DEFAULT = 4403
PROJECT_PATH = pathlib.Path(__file__).parent
ICON_PATH = PROJECT_PATH / "meshtastic.ico"
def _prefer_chrome(url: str):
# try Chrome first
chrome_paths = [
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
]
for p in chrome_paths:
if os.path.exists(p):
subprocess.Popen([p, url])
return
# fallback
webbrowser.open(url)
def _fmt_ago(epoch_seconds: Optional[float]) -> str:
if not epoch_seconds:
return "N/A"
try:
delta = time.time() - float(epoch_seconds)
except Exception:
return "N/A"
if delta < 0:
delta = 0
mins = int(delta // 60)
hours = int(delta // 3600)
days = int(delta // 86400)
if delta < 60:
return "%ds" % int(delta)
if mins < 60:
return "%dm" % mins
if hours < 24:
return "%dh" % hours
if days < 7:
return "%dd" % days
dt = datetime.datetime.fromtimestamp(epoch_seconds)
return dt.strftime("%Y-%m-%d %H:%M")
class MeshtasticGUI:
def __init__(self, master: Optional[tk.Tk] = None):
self.root = master or tk.Tk()
self.root.title("Meshtastic Client")
try:
if ICON_PATH.exists():
self.root.iconbitmap(default=str(ICON_PATH))
except Exception:
pass
self.current_theme = "dark"
self.root.rowconfigure(0, weight=1)
self.root.columnconfigure(0, weight=1)
self.host_var = tk.StringVar(value=HOST_DEFAULT)
self.port_var = tk.IntVar(value=PORT_DEFAULT)
self.menubar = tk.Menu(self.root)
m_conn = tk.Menu(self.menubar, tearoff=False)
m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp)
m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog)
m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog)
m_conn.add_command(label="Disconnect", command=self.disconnect)
m_conn.add_separator()
m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port)
self.menubar.add_cascade(label="Connection", menu=m_conn)
m_tools = tk.Menu(self.menubar, tearoff=False)
m_tools.add_command(label="Clear messages", command=lambda: self.txt_messages.delete("1.0", "end"))
self.menubar.add_cascade(label="Tools", menu=m_tools)
m_view = tk.Menu(self.menubar, tearoff=False)
m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light"))
m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark"))
m_view.add_separator()
m_view.add_command(label="Neighbor table", command=self.show_neighbors_window)
m_view.add_command(label="Radio config (view)", command=self.show_radio_config_window)
m_view.add_command(label="Channel editor", command=self.show_channel_editor_window)
self.menubar.add_cascade(label="View", menu=m_view)
m_links = tk.Menu(self.menubar, tearoff=False)
m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client"))
m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/"))
m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/"))
m_links.add_command(label="Meshtastic Web Client", command=lambda: self._open_browser_url("https://client.meshtastic.org"))
m_links.add_command(label="Meshtastic docker client", command=lambda: self._open_browser_url("https://meshtastic.org/docs/software/linux/usage/#usage-with-docker"))
m_links.add_separator()
m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/"))
m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/"))
self.menubar.add_cascade(label="Links", menu=m_links)
self.root.config(menu=self.menubar)
self.rootframe = ttk.Frame(self.root)
self.rootframe.grid(row=0, column=0, sticky="nsew")
self.rootframe.rowconfigure(0, weight=1)
self.rootframe.columnconfigure(0, weight=1)
self.paned = ttk.Panedwindow(self.rootframe, orient="horizontal")
self.paned.grid(row=0, column=0, sticky="nsew")
# messages
self.msg_frame = ttk.Frame(self.paned)
self.msg_frame.rowconfigure(1, weight=1)
self.msg_frame.columnconfigure(0, weight=1)
ttk.Label(self.msg_frame, text="Messages").grid(row=0, column=0, sticky="w", pady=(2, 0))
self.txt_messages = tk.Text(self.msg_frame, wrap="word")
self.txt_messages.grid(row=1, column=0, sticky="nsew", padx=(0, 4), pady=(2, 2))
yscroll_left = ttk.Scrollbar(self.msg_frame, orient="vertical", command=self.txt_messages.yview)
self.txt_messages.configure(yscrollcommand=yscroll_left.set)
yscroll_left.grid(row=1, column=1, sticky="ns")
self.send_frame = ttk.Frame(self.msg_frame)
self.send_frame.grid(row=2, column=0, columnspan=2, sticky="nsew", pady=(0, 4))
self.send_frame.columnconfigure(0, weight=1)
self.ent_message = ttk.Entry(self.send_frame)
self.ent_message.grid(row=0, column=0, sticky="nsew")
self.ent_message.bind("<Return>", lambda e: self.send_message())
self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message)
self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew")
# channel selector (public / selected / private channels)
self.channel_var = tk.StringVar()
self._channel_map = {}
self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=22)
self._reset_channel_choices()
self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w")
# nodes
self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)")
self.nodes_frame.rowconfigure(1, weight=1)
self.nodes_frame.columnconfigure(0, weight=1)
self.ent_search = ttk.Entry(self.nodes_frame)
self.ent_search.grid(row=0, column=0, sticky="nsew", padx=2, pady=2)
self.ent_search.bind("<KeyRelease>", lambda e: self.refresh_nodes())
self.cols_all = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt", "battery", "voltage",
"lastheard", "hwmodel", "role",
"macaddr", "publickey", "isunmessagable", "id"
)
self.cols_visible = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt", "battery", "voltage", "hwmodel", "role"
)
self.tv_nodes = ttk.Treeview(
self.nodes_frame,
columns=self.cols_all,
show="headings",
displaycolumns=self.cols_visible
)
self.tv_nodes.grid(row=1, column=0, sticky="nsew", padx=(2, 0), pady=(0, 2))
headings = {
"shortname": "Short",
"longname": "Long",
"since": "Since",
"hops": "Hops",
"distkm": "Dist (km)",
"speed": "Speed",
"alt": "Alt (m)",
"battery": "Batt %",
"voltage": "Volt",
"hwmodel": "HW",
"role": "Role",
"lastheard": "",
"macaddr": "MAC",
"publickey": "Public key",
"isunmessagable": "Unmsg?",
"id": "ID",
}
for key, text in headings.items():
self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False))
widths = {
"shortname": 90,
"longname": 200,
"since": 90,
"hops": 50,
"distkm": 70,
"speed": 70,
"alt": 70,
"battery": 70,
"voltage": 80,
"hwmodel": 90,
"role": 110,
}
for key, w in widths.items():
try:
self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt", "battery", "voltage")))
except Exception:
pass
# hide technical columns
for col in ("lastheard", "macaddr", "publickey", "isunmessagable", "id"):
try:
self.tv_nodes.column(col, width=0, minwidth=0, stretch=False)
except Exception:
pass
# scrollbar for node list
yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview)
self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set)
yscroll_nodes.grid(row=1, column=1, sticky="ns")
# right-click:
self.node_menu = tk.Menu(self.nodes_frame, tearoff=False)
self.node_menu.add_command(label="Node info", command=self._cm_show_node_details)
self.node_menu.add_command(label="Map", command=self._cm_open_map)
self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute)
self.node_menu.add_command(label="Chat with node", command=self._cm_open_chat)
self.node_menu.add_separator()
self.node_menu.add_command(label="Delete node", command=self._cm_delete_node)
self.tv_nodes.bind("<Button-3>", self._popup_node_menu)
self.tv_nodes.bind("<Double-1>", lambda e: self._cm_open_chat())
self.paned.add(self.msg_frame, weight=3)
self.paned.add(self.nodes_frame, weight=4)
self.root.after(100, lambda: self._safe_set_sash(0.40))
self.iface: Optional[object] = None
self.connected_evt = threading.Event()
self._last_seen_overrides: Dict[str, float] = {}
self._last_sort_col: Optional[str] = "since"
self._last_sort_reverse: bool = True
self._telemetry: Dict[str, Dict[str, Any]] = {}
self._per_node_chats: Dict[str, "NodeChatWindow"] = {}
if pub is not None:
try:
pub.subscribe(self.on_connection_established, "meshtastic.connection.established")
pub.subscribe(self.on_connection_lost, "meshtastic.connection.lost")
pub.subscribe(self.on_receive, "meshtastic.receive")
pub.subscribe(self.on_node_updated, "meshtastic.node.updated")
except Exception as e:
print("pubsub subscribe failed:", e)
self.apply_theme("dark")
self._append("Ready. Connection -> Connect (TCP/USB/BLE)")
self._update_title_with_host()
# helpers ---------------------------------------------------------
def _open_browser_url(self, url: str):
_prefer_chrome(url)
def _update_title_with_host(self):
self.root.title("Meshtastic Client - %s:%s" % (self.host_var.get(), self.port_var.get()))
def _safe_set_sash(self, fraction: float = 0.5):
try:
w = self.paned.winfo_width() or self.paned.winfo_reqwidth()
self.paned.sashpos(0, int(w * fraction))
except Exception:
pass
def _node_label(self, node_id: str) -> str:
if not self.iface or not getattr(self.iface, "nodes", None):
return node_id
node = self.iface.nodes.get(node_id, {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
label = ("%s %s" % (shortname, longname)).strip()
if label:
return label
return node_id
# pubsub callbacks ------------------------------------------------
def on_connection_established(self, interface=None, **kwargs):
self.connected_evt.set()
self._append("[+] Connected")
# refresh nodes and channel list when we connect
self.refresh_nodes()
try:
self._update_channels_from_iface()
except Exception:
pass
def on_connection_lost(self, interface=None, **kwargs):
self.connected_evt.clear()
self._append("[-] Connection lost")
def on_node_updated(self, node=None, interface=None, **kwargs):
self.root.after(0, self.refresh_nodes)
def on_receive(self, packet=None, interface=None, **kwargs):
self.root.after(0, lambda: self._handle_receive(packet or {}))
# receive ---------------------------------------------------------
def _handle_receive(self, packet: dict):
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
app_name = decoded.get("app", "")
portnum = decoded.get("portnum") or app_name
from_id = packet.get("fromId") or packet.get("from") or "UNKNOWN"
label = self._node_label(from_id) if hasattr(self, "_node_label") else from_id
tag_map = {
"TEXT_MESSAGE_APP": "MSG",
"TEXT_MESSAGE_COMPRESSED_APP": "MSG",
"POSITION_APP": "POS",
"TELEMETRY_APP": "TEL",
"NODEINFO_APP": "INFO",
"ROUTING_APP": "ROUT",
"MAP_REPORT_APP": "MAP",
"ADMIN_APP": "ADM",
"NEIGHBORINFO_APP": "NEI",
"STORE_FORWARD_APP": "SFWD",
"REMOTE_HARDWARE_APP": "RHW",
"PRIVATE_APP": "PRIV",
}
tag = tag_map.get(app_name or portnum, "INFO")
if app_name in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP"):
text = decoded.get("text") or decoded.get("payload", {}).get("text", "")
if text:
self._append(f"[MSG] {label}: {text}")
else:
self._append(f"[MSG] {label}")
if hasattr(self, "refresh_nodes"):
self.refresh_nodes()
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
portnum = decoded.get("portnum")
sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort")
if portnum == "TELEMETRY_APP" and telemetry_pb2 is not None and sender:
try:
payload = decoded.get("payload")
if isinstance(payload, (bytes, bytearray)):
tmsg = telemetry_pb2.Telemetry()
tmsg.ParseFromString(payload)
if tmsg.HasField("device_metrics"):
dm = tmsg.device_metrics
entry = self._telemetry.get(str(sender), {})
if dm.battery_level is not None:
entry["battery"] = dm.battery_level
if dm.voltage is not None:
entry["voltage"] = dm.voltage
self._telemetry[str(sender)] = entry
except Exception:
# ignore telemetry parse errors, they are non-fatal
pass
if sender:
self._last_seen_overrides[str(sender)] = time.time()
user = {}
if self.iface and getattr(self.iface, "nodes", None) and sender:
user = (self.iface.nodes.get(sender) or {}).get("user", {}) # type: ignore[attr-defined]
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
label = str(shortname or longname or sender or "Unknown").strip()
text = ""
p = decoded.get("payload", "")
if isinstance(p, (bytes, bytearray)):
try:
text = p.decode("utf-8", errors="ignore")
except Exception:
text = repr(p)
elif isinstance(p, str):
text = p
else:
t = decoded.get("text")
if isinstance(t, bytes):
text = t.decode("utf-8", errors="ignore")
elif isinstance(t, str):
text = t
rssi = packet.get("rxRssi")
if portnum == "TEXT_MESSAGE_APP":
self._append("[MSG] %s: %s (RSSI=%s)" % (label, text, rssi))
self._append_to_node_chat(str(sender), "%s" % (text,))
if isinstance(text, str) and text.strip().lower() == "ping":
self._send_pong(sender, label)
self.refresh_nodes()
def _send_pong(self, dest_id: Optional[str], label: str):
if not self.iface or not dest_id:
return
try:
self.iface.sendText("pong", destinationId=dest_id, wantAck=False)
self._append("[auto] pong -> %s" % label)
self._append_to_node_chat(str(dest_id), "[auto] pong")
except Exception as e:
self._append("[auto] pong failed: %s" % e)
# connection actions ----------------------------------------------
def set_ip_port(self):
win = tk.Toplevel(self.root)
win.title("Set IP/Port")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.grid(row=0, column=0, sticky="nsew")
win.columnconfigure(0, weight=1)
win.rowconfigure(0, weight=1)
ttk.Label(frm, text="Host/IP:").grid(row=0, column=0, sticky="w", pady=4)
ent_host = ttk.Entry(frm, textvariable=self.host_var, width=28)
ent_host.grid(row=0, column=1, sticky="ew", pady=4, padx=4)
ttk.Label(frm, text="Port:").grid(row=1, column=0, sticky="w", pady=4)
ent_port = ttk.Entry(frm, textvariable=self.port_var, width=10)
ent_port.grid(row=1, column=1, sticky="w", pady=4, padx=4)
frm.columnconfigure(1, weight=1)
def save():
h = self.host_var.get().strip()
try:
p = int(self.port_var.get())
except Exception:
messagebox.showerror("Port", "Port must be 1-65535")
return
if not h:
messagebox.showerror("Host", "Host cannot be empty")
return
if not (1 <= p <= 65535):
messagebox.showerror("Port", "Port must be 1-65535")
return
self.host_var.set(h)
self.port_var.set(p)
self._update_title_with_host()
win.destroy()
btnbar = ttk.Frame(frm)
btnbar.grid(row=2, column=0, columnspan=2, sticky="e")
ttk.Button(btnbar, text="Cancel", command=win.destroy).grid(row=0, column=0, padx=4)
ttk.Button(btnbar, text="Save", command=save).grid(row=0, column=1, padx=4)
def connect_tcp(self):
if self.iface:
return
host = self.host_var.get().strip()
try:
port = int(self.port_var.get())
except Exception:
messagebox.showerror("Port", "Invalid port")
return
self._append("Connecting TCP %s:%s ..." % (host, port))
def run():
try:
if TCPInterface is None:
raise RuntimeError("meshtastic.tcp_interface not installed")
self.iface = TCPInterface(hostname=host, portNumber=port)
self.connected_evt.wait(timeout=5)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("TCP connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_serial_dialog(self):
if SerialInterface is None:
messagebox.showerror("Unavailable", "meshtastic.serial_interface not installed.")
return
ports = []
if list_ports:
try:
ports = [p.device for p in list_ports.comports()]
except Exception:
ports = []
presets = ["(auto)"] + ports + ["COM4", "/dev/ttyUSB0"]
port = simpledialog.askstring("Serial", "Serial port (or leave '(auto)'):", initialvalue=presets[0])
if port is None:
return
port = port.strip()
if port.lower() == "(auto)" or port == "":
port = None
self._append("Connecting Serial %s ..." % (port or "(auto)"))
def run():
try:
self.iface = SerialInterface(devPath=port) if port else SerialInterface()
self.connected_evt.wait(timeout=5)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Serial connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_ble_dialog(self):
if BLEInterface is None:
messagebox.showerror("Unavailable", "meshtastic.ble_interface not installed (needs bleak).")
return
self._append("Scanning BLE for Meshtastic devices ...")
try:
devices = BLEInterface.scan()
except Exception as e:
messagebox.showerror("BLE scan failed", str(e))
return
if not devices:
messagebox.showinfo("BLE", "No devices found.")
return
options = ["%d. %s [%s]" % (i + 1, getattr(d, "name", "") or "(unnamed)", getattr(d, "address", "?")) for i, d in enumerate(devices)]
choice = simpledialog.askinteger(
"Select BLE device",
"Enter number:\n" + "\n".join(options),
minvalue=1,
maxvalue=len(devices),
)
if not choice:
return
addr = getattr(devices[choice - 1], "address", None)
if not addr:
messagebox.showerror("BLE", "Selected device has no address.")
return
self._append("Connecting BLE %s ..." % addr)
def run():
try:
self.iface = BLEInterface(address=addr)
self.connected_evt.wait(timeout=8)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("BLE connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def disconnect(self):
try:
if self.iface:
self.iface.close()
except Exception:
pass
self.iface = None
self.connected_evt.clear()
self._append("[*] Disconnected")
# send ------------------------------------------------------------
# channel selector helpers ----------------------------------------
def _reset_channel_choices(self):
"""Initialize channel selector with Public + To selected."""
self._channel_map = {}
options = []
label_pub = "Public (broadcast)"
self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0}
options.append(label_pub)
if hasattr(self, "cbo_channel"):
self.cbo_channel["values"] = options
if hasattr(self, "channel_var"):
self.channel_var.set(label_pub)
def _set_channel_choice(self, label: str):
"""Safely set the current channel choice, if it exists."""
try:
values = list(self.cbo_channel["values"])
except Exception:
return
if label not in values:
return
self.channel_var.set(label)
def _update_channels_from_iface(self):
"""
Populate channel selector with channels from the connected device.
We keep:
* "Public (broadcast)" -> broadcast on channel 0
And then append additional channels (1..N) from the radio as:
* "Ch <idx>: <name>" -> broadcast on that channel.
"""
iface = getattr(self, "iface", None)
if not iface:
return
local_node = getattr(iface, "localNode", None)
if not local_node:
return
# Try to request channels if we don't have them yet.
chans = getattr(local_node, "channels", None)
try:
if (not chans) and hasattr(local_node, "requestChannels"):
local_node.requestChannels()
time.sleep(1.5)
chans = getattr(local_node, "channels", None)
except Exception:
chans = getattr(local_node, "channels", None)
try:
options = list(self.cbo_channel["values"])
except Exception:
return
# If channel 0 has a name, update the "Public" label to show it.
try:
if chans and len(chans) > 0:
ch0 = chans[0]
try:
ch0_name = (getattr(ch0, "settings", None).name or "").strip()
except Exception:
try:
ch0_name = (ch0.settings.name or "").strip()
except Exception:
ch0_name = ""
if ch0_name:
# Find the existing public entry (mode=broadcast, channelIndex=0)
old_label = None
for lbl, meta in list(self._channel_map.items()):
if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0:
old_label = lbl
break
if old_label:
new_label = f"Public (ch0: {ch0_name})"
if new_label != old_label:
# Update mapping
self._channel_map[new_label] = self._channel_map.pop(old_label)
# Update combobox options
options = [new_label if v == old_label else v for v in options]
# Keep current selection if it was pointing to the old label
if self.channel_var.get() == old_label:
self.channel_var.set(new_label)
except Exception:
# Failing to pretty-print channel 0 is not fatal; just continue.
pass
# Add remaining channels (1..N) as broadcast options.
for idx, ch in enumerate(chans or []):
if idx == 0:
# Skip channel 0 here it's already represented by "Public".
continue
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
# older protobufs might expose fields differently
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
if not name:
label = f"Ch {idx}"
else:
label = f"Ch {idx}: {name}"
if label in self._channel_map:
continue
self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx}
options.append(label)
try:
self.cbo_channel["values"] = options
except Exception:
pass
def send_message(self):
msg = self.ent_message.get().strip()
if not msg:
return
if not self.iface:
messagebox.showwarning("Not connected", "Connect first.")
return
try:
choice = self.channel_var.get() if hasattr(self, "channel_var") else ""
info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or {
"mode": "broadcast",
"channelIndex": 0,
}
mode = info.get("mode", "broadcast")
ch_index = int(info.get("channelIndex", 0) or 0)
if mode == "selected":
# Direct message to the currently selected node
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("No selection", "Select a node first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Send failed", "Cannot resolve destination for selected node.")
return
self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index)
self._append("[ME -> %s] %s" % (self._node_label(nid), msg))
else:
# Broadcast on chosen channel (public or private)
self.iface.sendText(msg, wantAck=False, channelIndex=ch_index)
label = self.channel_var.get() if hasattr(self, "channel_var") else ""
if mode == "broadcast_channel" and ch_index:
self._append("[ME ch%d] %s" % (ch_index, msg))
else:
self._append("[ME] %s" % msg)
self.ent_message.delete(0, "end")
except Exception as e:
messagebox.showerror("Send failed", str(e))
# nodes -----------------------------------------------------------
def _get_lastheard_epoch(self, node_id: str, node: Dict[str, Any]) -> Optional[float]:
raw = (node or {}).get("lastHeard")
ts_iface = None
if raw is not None:
try:
val = float(raw)
ts_iface = val / 1000.0 if val > 10000000000 else val
except Exception:
ts_iface = None
ts_local = self._last_seen_overrides.get(str(node_id))
if ts_iface and ts_local:
return max(ts_iface, ts_local)
return ts_iface or ts_local
def _extract_latlon(self, node: dict) -> tuple[float | None, float | None]:
pos = (node or {}).get("position") or {}
lat = pos.get("latitude") or pos.get("latitudeI") or pos.get("latitude_i")
lon = pos.get("longitude") or pos.get("longitudeI") or pos.get("longitude_i")
try:
if lat is not None:
lat = float(lat) * (1e-7 if abs(float(lat)) > 90 else 1.0)
if lon is not None:
lon = float(lon) * (1e-7 if abs(float(lon)) > 180 else 1.0)
except Exception:
lat = lon = None
return lat, lon
def _extract_speed_alt(self, node: dict) -> tuple[float | None, float | None]:
"""Return (speed_kmh, alt_m) if present in node.position, else (None, None)."""
pos = (node or {}).get("position") or {}
speed = (
pos.get("groundSpeedKmh")
or pos.get("groundSpeedKmhI")
or pos.get("groundSpeed")
or pos.get("ground_speed")
)
alt = (
pos.get("altitude")
or pos.get("altitudeM")
or pos.get("altitude_i")
or pos.get("altitudeI")
)
try:
if speed is not None:
speed = float(speed)
if alt is not None:
alt = float(alt)
except Exception:
speed, alt = None, None
return speed, alt
def _get_local_latlon(self) -> tuple[float | None, float | None]:
if not self.iface:
return (None, None)
try:
mi = getattr(self.iface, "myInfo", None)
nbn = getattr(self.iface, "nodesByNum", None)
if mi is not None and hasattr(mi, "my_node_num") and nbn:
n = nbn.get(mi.my_node_num) or {}
lat, lon = self._extract_latlon(n)
if lat is not None and lon is not None:
return lat, lon
except Exception:
pass
return (None, None)
def _haversine_km(self, lat1, lon1, lat2, lon2) -> float:
import math
R = 6371.0088
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlmb = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlmb / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def refresh_nodes(self):
if not self.iface or not getattr(self.iface, "nodes", None):
return
q = self.ent_search.get().strip().lower()
for iid in self.tv_nodes.get_children(""):
self.tv_nodes.delete(iid)
try:
nodes_snapshot = dict(self.iface.nodes or {}) # type: ignore[attr-defined]
except Exception:
nodes_snapshot = {}
base_lat, base_lon = self._get_local_latlon()
for node_id, node in nodes_snapshot.items():
user = (node or {}).get("user") or {}
shortname = user.get("shortName") or ""
longname = user.get("longName") or ""
hwmodel = user.get("hwModel") or ""
role = user.get("role") or ""
macaddr = user.get("macaddr") or ""
publickey = user.get("publicKey") or ""
unmsg = user.get("isUnmessagable") or user.get("isUnmessageable") or False
lastheard_epoch = self._get_lastheard_epoch(node_id, node)
since_str = _fmt_ago(lastheard_epoch)
hops = node.get("hopsAway")
lat, lon = self._extract_latlon(node)
if base_lat is not None and base_lon is not None and lat is not None and lon is not None:
try:
dist = self._haversine_km(base_lat, base_lon, lat, lon)
except Exception:
dist = None
else:
dist = None
dist_str = "%.1f" % dist if isinstance(dist, (int, float)) else "-"
speed, alt = self._extract_speed_alt(node)
speed_str = "%.1f" % speed if isinstance(speed, (int, float)) else "-"
alt_str = "%.0f" % alt if isinstance(alt, (int, float)) else "-"
telem = self._telemetry.get(node_id, {}) if hasattr(self, "_telemetry") else {}
bat = telem.get("battery")
volt = telem.get("voltage")
bat_str = "%.0f" % bat if isinstance(bat, (int, float)) else "-"
volt_str = "%.2f" % volt if isinstance(volt, (int, float)) else "-"
values = (
shortname,
longname,
since_str,
str(hops) if hops is not None else "-",
dist_str,
speed_str,
alt_str,
bat_str,
volt_str,
"%.0f" % (lastheard_epoch or 0),
hwmodel,
role,
macaddr,
publickey,
str(bool(unmsg)),
node_id,
)
if not q or any(q in str(v).lower() for v in values):
try:
self.tv_nodes.insert("", "end", iid=node_id, values=values)
except Exception:
self.tv_nodes.insert("", "end", values=values)
if self._last_sort_col:
self.sort_by_column(self._last_sort_col, self._last_sort_reverse)
self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children()))
def sort_by_column(self, col: str, reverse: bool = False):
self._last_sort_col = col
self._last_sort_reverse = reverse
col_to_sort = "lastheard" if col == "since" else col
numeric = {"lastheard", "distkm", "hops", "speed", "alt", "battery", "voltage"}
rows = []
for iid in self.tv_nodes.get_children(""):
val = self.tv_nodes.set(iid, col_to_sort)
if col_to_sort in numeric:
try:
val = float(val if val != "-" else 0.0)
except Exception:
val = 0.0
else:
val = val.casefold()
rows.append((val, iid))
rows.sort(key=lambda t: t[0], reverse=reverse)
for index, (_, iid) in enumerate(rows):
self.tv_nodes.move(iid, "", index)
self.tv_nodes.heading(col, command=lambda: self.sort_by_column(col, not reverse))
# THEME -----------------------------------------------------------
def apply_theme(self, mode: str = "light"):
self.current_theme = mode
is_dark = mode == "dark"
bg = "#1e1e1e" if is_dark else "#f5f5f5"
fg = "#ffffff" if is_dark else "#000000"
acc = "#2d2d2d" if is_dark else "#ffffff"
sel = "#555555" if is_dark else "#cce0ff"
# Root window background (client area)
try:
self.root.configure(bg=bg)
except Exception:
pass
style = ttk.Style(self.root)
try:
style.theme_use("clam")
except Exception:
pass
style.configure("TFrame", background=bg)
style.configure("TLabelframe", background=bg, foreground=fg)
style.configure("TLabelframe.Label", background=bg, foreground=fg)
style.configure("TLabel", background=bg, foreground=fg)
style.configure("TButton", background=acc, foreground=fg)
style.configure("TEntry", fieldbackground=acc, foreground=fg)
style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg)
style.map(
"TCombobox",
fieldbackground=[("readonly", acc)],
foreground=[("readonly", fg)],
background=[("readonly", acc)],
)
style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0)
style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)])
# Menubar itself (stored as self.menubar when created)
try:
if hasattr(self, "menubar") and self.menubar is not None:
self.menubar.configure(
background=bg,
foreground=fg,
activebackground=sel,
activeforeground=fg,
borderwidth=0,
relief="flat",
)
except Exception:
pass
try:
self.txt_messages.configure(
bg=acc,
fg=fg,
insertbackground=fg,
selectbackground=sel,
selectforeground=fg,
)
except Exception:
pass
try:
self.root.option_add("*Menu*background", bg)
self.root.option_add("*Menu*foreground", fg)
self.root.option_add("*Menu*activeBackground", sel)
self.root.option_add("*Menu*activeForeground", fg)
# Dark background for ttk.Combobox dropdown list
try:
self.root.option_add("*TCombobox*Listbox*background", acc)
self.root.option_add("*TCombobox*Listbox*foreground", fg)
except Exception:
pass
except Exception:
pass
def _style_toplevel(self, win: tk.Toplevel):
is_dark = self.current_theme == "dark"
bg = "#1e1e1e" if is_dark else "#f5f5f5"
win.configure(bg=bg)
# UTILS / CONTEXT ------------------------------------------------
def _append(self, text: str):
self.txt_messages.insert("end", text + "\n")
self.txt_messages.see("end")
def _get_selected_node_id(self) -> Optional[str]:
sel = self.tv_nodes.selection()
if not sel:
return None
return sel[0]
def _cm_traceroute(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Traceroute", "Select a node first.")
return
if not self.iface:
messagebox.showwarning("Traceroute", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Traceroute", "Cannot determine node ID for traceroute.")
return
self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})")
threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start()
def _cm_delete_node(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Delete node", "Select a node first.")
return
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showwarning("Delete node", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Delete node", "Cannot determine node ID.")
return
label = self._node_label(nid)
if not messagebox.askyesno(
"Delete node",
f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\n"
"The device might reboot after this."
):
return
try:
# Use python-meshtastic Node.removeNode API to remove from NodeDB
# https://python.meshtastic.org/node.html
self.iface.localNode.removeNode(dest) # type: ignore[attr-defined]
self._append(f"[admin] Requested delete of node {label} ({dest})")
except Exception as e:
messagebox.showerror("Delete node", f"Failed to delete node: {e}")
return
# Also remove from UI for this session
try:
self.tv_nodes.delete(nid)
self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children()))
except Exception:
pass
def _resolve_node_dest_id(self, nid: str) -> Optional[str]:
# `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx)
if nid.startswith("!") or nid.isdigit():
return nid
try:
if self.iface and getattr(self.iface, "nodes", None):
node = (self.iface.nodes.get(nid) or {}) # type: ignore[attr-defined]
user = (node or {}).get("user") or {}
node_id = user.get("id") or ""
if node_id:
return node_id
except Exception:
pass
if nid:
return "!" + nid if not nid.startswith("!") else nid
return None
def _do_traceroute(self, dest: str, hop_limit: int = 10, channel_index: int = 0):
# Prefer native python-meshtastic traceroute if dependencies are available; otherwise fall back to CLI.
if self.iface and mesh_pb2 is not None and portnums_pb2 is not None and _json_format is not None and hasattr(self.iface, "sendData"):
self._do_traceroute_via_interface(dest, hop_limit, channel_index)
else:
self._do_traceroute_via_cli(dest)
def _do_traceroute_via_interface(self, dest: str, hop_limit: int, channel_index: int):
evt = threading.Event()
result: Dict[str, Any] = {}
def _num_to_label(num: int) -> str:
try:
nbn = getattr(self.iface, "nodesByNum", None)
if nbn and num in nbn:
n = nbn[num]
user = (n or {}).get("user") or {}
sid = user.get("id") or f"!{num:08x}"
sn = user.get("shortName") or ""
ln = user.get("longName") or ""
label = (sn or ln or sid).strip()
return f"{label} ({sid})" if sid else label
except Exception:
pass
return f"!{int(num):08x}"
def _on_response(p: dict):
try:
rd = mesh_pb2.RouteDiscovery()
rd.ParseFromString(p["decoded"]["payload"])
as_dict = _json_format.MessageToDict(rd)
result["packet"] = p
result["data"] = as_dict
except Exception as e: # pragma: no cover - defensive
result["error"] = str(e)
finally:
evt.set()
try:
r = mesh_pb2.RouteDiscovery()
# Use the same TRACEROUTE_APP mechanism as the official Meshtastic clients
self.iface.sendData(
r,
destinationId=dest,
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=_on_response,
channelIndex=channel_index,
hopLimit=hop_limit,
)
except Exception as e:
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}"))
return
if not evt.wait(30.0):
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No traceroute response (timeout or unsupported)."))
return
if "error" in result:
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}"))
return
p = result.get("packet") or {}
data = result.get("data") or {}
UNK = -128
try:
origin_num = int(p.get("to"))
dest_num = int(p.get("from"))
except Exception:
origin_num = None
dest_num = None
def _build_path(title: str, start_num: Optional[int], route_key: str, snr_key: str, end_num: Optional[int]) -> Optional[str]:
route_nums = []
for v in data.get(route_key, []):
try:
route_nums.append(int(v))
except Exception:
pass
snrs = []
for v in data.get(snr_key, []):
try:
snrs.append(int(v))
except Exception:
pass
if not start_num or not end_num:
return None
nodes = [start_num] + route_nums + [end_num]
if len(nodes) <= 1:
return None
parts = []
for idx, num in enumerate(nodes):
label = _num_to_label(num)
if idx == 0:
parts.append(label)
else:
snr_txt = "? dB"
if (idx - 1) < len(snrs):
v = snrs[idx - 1]
if v != UNK:
snr_txt = f"{v / 4.0:.2f} dB"
parts.append(f"{label} ({snr_txt})")
return title + "\n" + " -> ".join(parts)
lines = []
fwd = _build_path("Route towards destination:", origin_num, "route", "snrTowards", dest_num)
if fwd:
lines.append(fwd)
back = _build_path("Route back to us:", dest_num, "routeBack", "snrBack", origin_num)
if back:
lines.append(back)
if not lines:
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "Traceroute completed but no route data available."))
return
text = "\n\n".join(lines)
self.root.after(0, lambda: self._show_traceroute_window(text))
def _do_traceroute_via_cli(self, dest: str):
host = (self.host_var.get() or "").strip() or HOST_DEFAULT
cmd = ["meshtastic", "--host", host, "--traceroute", dest]
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40)
except Exception as e: # pragma: no cover - environment specific
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}"))
return
out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "")
if not out.strip():
self.root.after(0, lambda: messagebox.showinfo("Traceroute", "No output from meshtastic traceroute."))
return
self.root.after(0, lambda: self._show_traceroute_window(out))
def _show_traceroute_window(self, text: str):
win = tk.Toplevel(self.root)
win.title("Traceroute")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
txt.insert("1.0", text.strip() or "No traceroute data.")
txt.configure(state="disabled")
def _toggle_send_target(self):
# Legacy helper kept for compatibility, but no longer used.
# Double-clicking a node now opens the per-node chat window directly.
return
def _popup_node_menu(self, event):
iid = self.tv_nodes.identify_row(event.y)
if iid:
self.tv_nodes.selection_set(iid)
self.node_menu.tk_popup(event.x_root, event.y_root)
self.node_menu.grab_release()
def _cm_show_node_details(self):
self.show_raw_node(friendly=True)
def _cm_open_map(self):
nid = self._get_selected_node_id()
if not nid or not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Map", "No node selected.")
return
node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined]
lat, lon = self._extract_latlon(node)
if lat is None or lon is None:
messagebox.showinfo("Map", "Selected node has no GPS position.")
return
url = f"https://www.google.com/maps/search/?api=1&query={lat},{lon}"
self._open_browser_url(url)
def show_raw_node(self, friendly: bool = False):
nid = self._get_selected_node_id()
if not nid or not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Node", "No node selected.")
return
node = self.iface.nodes.get(nid, {}) # type: ignore[attr-defined]
win = tk.Toplevel(self.root)
win.title("Node: %s" % self._node_label(nid))
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="word")
txt.pack(expand=True, fill="both")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
if friendly:
def fmt_val(v, indent=0):
pad = " " * indent
if isinstance(v, dict):
lines = []
for k, vv in v.items():
if isinstance(vv, (dict, list)):
lines.append(f"{pad}{k}:")
lines.append(fmt_val(vv, indent + 1))
else:
lines.append(f"{pad}{k}: {vv}")
return "\n".join(lines)
elif isinstance(v, list):
lines = []
for i, item in enumerate(v):
if isinstance(item, (dict, list)):
lines.append(f"{pad}- [{i}]")
lines.append(fmt_val(item, indent + 1))
else:
lines.append(f"{pad}- {item}")
return "\n".join(lines)
else:
return f"{pad}{v}"
user = (node or {}).get("user") or {}
pos = (node or {}).get("position") or {}
caps = (node or {}).get("capabilities") or {}
config = (node or {}).get("config") or {}
node_id = user.get("id") or node.get("id") or nid
macaddr = user.get("macaddr") or node.get("macaddr") or ""
publickey = user.get("publicKey") or node.get("publicKey") or ""
hw = user.get("hwModel", "")
lines = [
f"Name: {user.get('shortName', '')} {user.get('longName', '')}".strip(),
f"ID: {node_id}",
f"MAC: {macaddr}",
f"HW: {hw}",
f"Public key: {publickey}",
"",
f"Last heard: {_fmt_ago(self._get_lastheard_epoch(nid, node))}",
"",
"Position:",
fmt_val(pos, 1),
]
if caps:
lines.append("Capabilities:")
lines.append(fmt_val(caps, 1))
if config:
lines.append("Config:")
lines.append(fmt_val(config, 1))
lines.append("RAW fields:")
skip = {"user", "position", "capabilities", "config"}
other = {k: v for k, v in (node or {}).items() if k not in skip}
lines.append(fmt_val(other, 1))
txt.insert("1.0", "\n".join(lines))
else:
txt.insert("1.0", json.dumps(node, indent=2, default=str))
txt.configure(state="disabled")
def show_neighbors_window(self):
if not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Neighbors", "No interface or nodes available.")
return
win = tk.Toplevel(self.root)
win.title("Neighbor table")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
tree = ttk.Treeview(frm, columns=("from", "to", "snr", "lastheard"), show="headings")
for col, txt in (("from", "From node"), ("to", "To node"), ("snr", "SNR"), ("lastheard", "Last heard")):
tree.heading(col, text=txt)
tree.column(col, width=160 if col in ("from", "to") else 90, anchor="w")
tree.pack(side="left", expand=True, fill="both")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=yscroll.set)
yscroll.pack(side="right", fill="y")
try:
nodes_snapshot = dict(self.iface.nodes or {})
except Exception:
nodes_snapshot = {}
rows = 0
for node_id, node in nodes_snapshot.items():
from_label = self._node_label(node_id)
neighbors = (node or {}).get("neighbors") or []
if isinstance(neighbors, dict):
neighbors = neighbors.values()
for n in neighbors:
try:
to_id = (n or {}).get("nodeId") or (n or {}).get("id") or ""
to_label = self._node_label(str(to_id)) if to_id else str(to_id)
snr = (n or {}).get("snr") or (n or {}).get("snrDb")
last = (n or {}).get("lastHeard")
try:
last_epoch = float(last) if last is not None else None
except Exception:
last_epoch = None
last_str = _fmt_ago(last_epoch)
except Exception:
continue
tree.insert("", "end", values=(from_label, to_label, snr if snr is not None else "-", last_str))
rows += 1
if rows == 0:
messagebox.showinfo("Neighbors", "No neighbor information found on nodes.")
def show_radio_config_window(self):
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showinfo("Radio config", "Connect to a device first.")
return
ln = self.iface.localNode
try:
if getattr(ln, "localConfig", None) is None and hasattr(ln, "waitForConfig"):
ln.waitForConfig("localConfig")
except Exception:
pass
cfg = getattr(ln, "localConfig", None)
mod = getattr(ln, "moduleConfig", None)
if cfg is None and mod is None:
messagebox.showinfo("Radio config", "No config available yet from device.")
return
win = tk.Toplevel(self.root)
win.title("Radio + module config (readonly)")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="none")
txt.pack(side="left", expand=True, fill="both")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=txt.yview)
txt.configure(yscrollcommand=yscroll.set)
yscroll.pack(side="right", fill="y")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
lines = []
if cfg is not None:
lines.append("localConfig:")
try:
lines.append(str(cfg))
except Exception:
lines.append(repr(cfg))
if mod is not None:
lines.append("\nmoduleConfig:")
try:
lines.append(str(mod))
except Exception:
lines.append(repr(mod))
txt.insert("1.0", "\n".join(lines))
txt.configure(state="disabled")
def show_channel_editor_window(self):
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showinfo("Channels", "Connect to a device first.")
return
ln = self.iface.localNode
try:
if getattr(ln, "channels", None) in (None, {}) and hasattr(ln, "waitForConfig"):
ln.waitForConfig("channels")
except Exception:
pass
chans = getattr(ln, "channels", None)
if not chans:
messagebox.showinfo("Channels", "No channels available from device.")
return
win = tk.Toplevel(self.root)
win.title("Channel editor")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.grid(row=0, column=0, sticky="nsew")
win.rowconfigure(0, weight=1)
win.columnconfigure(0, weight=1)
listbox = tk.Listbox(frm, height=10)
listbox.grid(row=0, column=0, columnspan=2, sticky="nsew")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
for idx, ch in enumerate(chans):
if ch is None:
label = f"Ch {idx} (empty)"
else:
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
label = f"{idx}: {name or '(no name)'}"
listbox.insert("end", label)
ttk.Label(frm, text="Channel name:").grid(row=1, column=0, sticky="w", pady=(6, 2))
name_var = tk.StringVar()
ent_name = ttk.Entry(frm, textvariable=name_var, width=40)
ent_name.grid(row=2, column=0, columnspan=2, sticky="ew")
def on_select(evt=None):
sel = listbox.curselection()
if not sel:
return
i = sel[0]
ch = chans[i]
if ch is None:
name_var.set("")
return
try:
nm = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
nm = (ch.settings.name or "").strip()
except Exception:
nm = ""
name_var.set(nm)
listbox.bind("<<ListboxSelect>>", on_select)
def save_name():
sel = listbox.curselection()
if not sel:
messagebox.showinfo("Channel editor", "Select a channel first.")
return
i = sel[0]
ch = chans[i]
if ch is None:
messagebox.showerror("Channel editor", "Selected channel object is empty, cannot rename.")
return
new_name = name_var.get().strip()
try:
if hasattr(ch, "settings"):
ch.settings.name = new_name
elif hasattr(ch, "name"):
ch.name = new_name
ln.writeChannel(i)
self._append(f"[admin] Renamed channel {i} to '{new_name}'")
self._update_channels_from_iface()
listbox.delete(i)
label = f"{i}: {new_name or '(no name)'}"
listbox.insert(i, label)
except Exception as e:
messagebox.showerror("Channel editor", f"Failed to write channel: {e}")
btn_save = ttk.Button(frm, text="Save name to device", command=save_name)
btn_save.grid(row=3, column=0, sticky="w", pady=(6, 0))
def _cm_open_chat(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Chat", "Select a node first.")
return
self._open_node_chat(nid)
def _open_node_chat(self, nid: str):
key = str(nid)
if key in self._per_node_chats:
win = self._per_node_chats[key]
try:
win.top.deiconify()
win.top.lift()
except Exception:
pass
return
label = self._node_label(nid)
chat = NodeChatWindow(self, key, label)
self._per_node_chats[key] = chat
def _append_to_node_chat(self, node_id: str, line: str):
key = str(node_id)
if not key:
return
win = self._per_node_chats.get(key)
if not win:
return
win.append(line)
def _send_text_to_node(self, dest_id: str, msg: str) -> bool:
if not self.iface:
messagebox.showwarning("Send", "Connect first.")
return False
msg = (msg or "").strip()
if not msg:
return False
try:
choice = self.channel_var.get() if hasattr(self, "channel_var") else ""
info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or {
"mode": "broadcast",
"channelIndex": 0,
}
ch_index = int(info.get("channelIndex", 0) or 0)
except Exception:
ch_index = 0
try:
self.iface.sendText(msg, destinationId=dest_id, wantAck=False, channelIndex=ch_index)
self._append("[ME -> %s] %s" % (self._node_label(dest_id), msg))
self._append_to_node_chat(dest_id, "[ME] " + msg)
return True
except Exception as e:
messagebox.showerror("Send failed", str(e))
return False
class NodeChatWindow:
def __init__(self, app: "MeshtasticGUI", node_id: str, label: str):
self.app = app
self.node_id = node_id
self.label = label
self.top = tk.Toplevel(app.root)
self.top.title(f"Chat: {label}")
app._style_toplevel(self.top)
self.top.protocol("WM_DELETE_WINDOW", self._on_close)
frm = ttk.Frame(self.top, padding=8)
frm.pack(expand=True, fill="both")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
self.txt = tk.Text(frm, wrap="word")
self.txt.grid(row=0, column=0, columnspan=2, sticky="nsew")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=self.txt.yview)
self.txt.configure(yscrollcommand=yscroll.set)
yscroll.grid(row=0, column=2, sticky="ns")
is_dark = app.current_theme == "dark"
self.txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
self.entry = ttk.Entry(frm)
self.entry.grid(row=1, column=0, sticky="nsew", pady=(6, 0))
self.entry.bind("<Return>", self._on_send)
btn = ttk.Button(frm, text="Send", command=self._on_send)
btn.grid(row=1, column=1, sticky="e", padx=(4, 0), pady=(6, 0))
def append(self, line: str):
try:
self.txt.insert("end", line + "\n")
self.txt.see("end")
except Exception:
pass
def _on_send(self, *_):
msg = self.entry.get().strip()
if not msg:
return
ok = self.app._send_text_to_node(self.node_id, msg)
if ok:
self.entry.delete(0, "end")
def _on_close(self):
key = str(self.node_id)
try:
if key in self.app._per_node_chats:
del self.app._per_node_chats[key]
except Exception:
pass
self.top.destroy()
def main():
app = MeshtasticGUI()
app.root.geometry("1500x820")
app.root.protocol("WM_DELETE_WINDOW", lambda: (app.disconnect(), app.root.destroy()))
app.root.mainloop()
if __name__ == "__main__":
main()