Add files via upload

This commit is contained in:
Knud Schrøder
2025-11-26 15:02:21 +01:00
committed by GitHub
parent 34144ca6b6
commit 3ffdeb5998
2 changed files with 605 additions and 47 deletions
Binary file not shown.
+605 -47
View File
@@ -27,13 +27,13 @@ except Exception:
BLEInterface = None
try:
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from meshtastic.protobuf import mesh_pb2, portnums_pb2, telemetry_pb2
import google.protobuf.json_format as _json_format
except Exception:
mesh_pb2 = None
portnums_pb2 = None
telemetry_pb2 = None
_json_format = None
try:
from serial.tools import list_ports
except Exception:
@@ -102,26 +102,30 @@ class MeshtasticGUI:
self.host_var = tk.StringVar(value=HOST_DEFAULT)
self.port_var = tk.IntVar(value=PORT_DEFAULT)
menubar = tk.Menu(self.root)
m_conn = tk.Menu(menubar, tearoff=False)
self.menubar = tk.Menu(self.root)
m_conn = tk.Menu(self.menubar, tearoff=False)
m_conn.add_command(label="Connect (TCP)", command=self.connect_tcp)
m_conn.add_command(label="Connect via USB/Serial...", command=self.connect_serial_dialog)
m_conn.add_command(label="Connect via Bluetooth...", command=self.connect_ble_dialog)
m_conn.add_command(label="Disconnect", command=self.disconnect)
m_conn.add_separator()
m_conn.add_command(label="Set IP/Port...", command=self.set_ip_port)
menubar.add_cascade(label="Connection", menu=m_conn)
self.menubar.add_cascade(label="Connection", menu=m_conn)
m_tools = tk.Menu(menubar, tearoff=False)
m_tools = tk.Menu(self.menubar, tearoff=False)
m_tools.add_command(label="Clear messages", command=lambda: self.txt_messages.delete("1.0", "end"))
menubar.add_cascade(label="Tools", menu=m_tools)
self.menubar.add_cascade(label="Tools", menu=m_tools)
m_view = tk.Menu(menubar, tearoff=False)
m_view = tk.Menu(self.menubar, tearoff=False)
m_view.add_command(label="Light theme", command=lambda: self.apply_theme("light"))
m_view.add_command(label="Dark theme", command=lambda: self.apply_theme("dark"))
menubar.add_cascade(label="View", menu=m_view)
m_view.add_separator()
m_view.add_command(label="Neighbor table", command=self.show_neighbors_window)
m_view.add_command(label="Radio config (view)", command=self.show_radio_config_window)
m_view.add_command(label="Channel editor", command=self.show_channel_editor_window)
self.menubar.add_cascade(label="View", menu=m_view)
m_links = tk.Menu(menubar, tearoff=False)
m_links = tk.Menu(self.menubar, tearoff=False)
m_links.add_command(label="Meshtastic client", command=lambda: self._open_browser_url("https://github.com/dk98174003/Meshtastic-Client"))
m_links.add_command(label="Meshtastic org", command=lambda: self._open_browser_url("https://meshtastic.org/"))
m_links.add_command(label="Meshtastic flasher (Chrome)", command=lambda: self._open_browser_url("https://flasher.meshtastic.org/"))
@@ -130,9 +134,9 @@ class MeshtasticGUI:
m_links.add_separator()
m_links.add_command(label="Meshtastic Facebook Danmark", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1553839535376876/"))
m_links.add_command(label="Meshtastic Facebook Nordjylland", command=lambda: self._open_browser_url("https://www.facebook.com/groups/1265866668302201/"))
menubar.add_cascade(label="Links", menu=m_links)
self.menubar.add_cascade(label="Links", menu=m_links)
self.root.config(menu=menubar)
self.root.config(menu=self.menubar)
self.rootframe = ttk.Frame(self.root)
self.rootframe.grid(row=0, column=0, sticky="nsew")
@@ -163,9 +167,13 @@ class MeshtasticGUI:
self.ent_message.bind("<Return>", lambda e: self.send_message())
self.btn_send = ttk.Button(self.send_frame, text="Send", command=self.send_message)
self.btn_send.grid(row=0, column=1, padx=4, sticky="nsew")
self.send_to_selected = tk.BooleanVar(value=False)
self.chk_to_selected = ttk.Checkbutton(self.send_frame, text="To selected", variable=self.send_to_selected)
self.chk_to_selected.grid(row=0, column=2, padx=4, sticky="w")
# channel selector (public / selected / private channels)
self.channel_var = tk.StringVar()
self._channel_map = {}
self.cbo_channel = ttk.Combobox(self.send_frame, textvariable=self.channel_var, state="readonly", width=22)
self._reset_channel_choices()
self.cbo_channel.grid(row=0, column=2, padx=4, sticky="w")
# nodes
self.nodes_frame = ttk.Labelframe(self.paned, text="Nodes (0)")
@@ -178,13 +186,13 @@ class MeshtasticGUI:
self.cols_all = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt",
"distkm", "speed", "alt", "battery", "voltage",
"lastheard", "hwmodel", "role",
"macaddr", "publickey", "isunmessagable", "id"
)
self.cols_visible = (
"shortname", "longname", "since", "hops",
"distkm", "speed", "alt", "hwmodel", "role"
"distkm", "speed", "alt", "battery", "voltage", "hwmodel", "role"
)
self.tv_nodes = ttk.Treeview(
self.nodes_frame,
@@ -202,6 +210,8 @@ class MeshtasticGUI:
"distkm": "Dist (km)",
"speed": "Speed",
"alt": "Alt (m)",
"battery": "Batt %",
"voltage": "Volt",
"hwmodel": "HW",
"role": "Role",
"lastheard": "",
@@ -212,7 +222,6 @@ class MeshtasticGUI:
}
for key, text in headings.items():
self.tv_nodes.heading(key, text=text, command=lambda c=key: self.sort_by_column(c, False))
widths = {
"shortname": 90,
"longname": 200,
@@ -221,12 +230,14 @@ class MeshtasticGUI:
"distkm": 70,
"speed": 70,
"alt": 70,
"battery": 70,
"voltage": 80,
"hwmodel": 90,
"role": 110,
}
for key, w in widths.items():
try:
self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt")))
self.tv_nodes.column(key, width=w, anchor="w", stretch=(key not in ("since", "hops", "distkm", "speed", "alt", "battery", "voltage")))
except Exception:
pass
@@ -237,6 +248,7 @@ class MeshtasticGUI:
except Exception:
pass
# scrollbar for node list
yscroll_nodes = ttk.Scrollbar(self.nodes_frame, orient="vertical", command=self.tv_nodes.yview)
self.tv_nodes.configure(yscrollcommand=yscroll_nodes.set)
yscroll_nodes.grid(row=1, column=1, sticky="ns")
@@ -246,9 +258,12 @@ class MeshtasticGUI:
self.node_menu.add_command(label="Node info", command=self._cm_show_node_details)
self.node_menu.add_command(label="Map", command=self._cm_open_map)
self.node_menu.add_command(label="Traceroute", command=self._cm_traceroute)
self.node_menu.add_command(label="Chat with node", command=self._cm_open_chat)
self.node_menu.add_separator()
self.node_menu.add_command(label="Delete node", command=self._cm_delete_node)
self.tv_nodes.bind("<Button-3>", self._popup_node_menu)
self.tv_nodes.bind("<Double-1>", lambda e: self._toggle_send_target())
self.tv_nodes.bind("<Double-1>", lambda e: self._cm_open_chat())
self.paned.add(self.msg_frame, weight=3)
self.paned.add(self.nodes_frame, weight=4)
@@ -259,6 +274,8 @@ class MeshtasticGUI:
self._last_seen_overrides: Dict[str, float] = {}
self._last_sort_col: Optional[str] = "since"
self._last_sort_reverse: bool = True
self._telemetry: Dict[str, Dict[str, Any]] = {}
self._per_node_chats: Dict[str, "NodeChatWindow"] = {}
if pub is not None:
try:
@@ -269,7 +286,7 @@ class MeshtasticGUI:
except Exception as e:
print("pubsub subscribe failed:", e)
self.apply_theme("light")
self.apply_theme("dark")
self._append("Ready. Connection -> Connect (TCP/USB/BLE)")
self._update_title_with_host()
@@ -303,8 +320,12 @@ class MeshtasticGUI:
def on_connection_established(self, interface=None, **kwargs):
self.connected_evt.set()
self._append("[+] Connected")
# refresh nodes and channel list when we connect
self.refresh_nodes()
try:
self._update_channels_from_iface()
except Exception:
pass
def on_connection_lost(self, interface=None, **kwargs):
self.connected_evt.clear()
self._append("[-] Connection lost")
@@ -352,6 +373,24 @@ class MeshtasticGUI:
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
portnum = decoded.get("portnum")
sender = packet.get("fromId") or packet.get("from") or packet.get("fromIdShort")
if portnum == "TELEMETRY_APP" and telemetry_pb2 is not None and sender:
try:
payload = decoded.get("payload")
if isinstance(payload, (bytes, bytearray)):
tmsg = telemetry_pb2.Telemetry()
tmsg.ParseFromString(payload)
if tmsg.HasField("device_metrics"):
dm = tmsg.device_metrics
entry = self._telemetry.get(str(sender), {})
if dm.battery_level is not None:
entry["battery"] = dm.battery_level
if dm.voltage is not None:
entry["voltage"] = dm.voltage
self._telemetry[str(sender)] = entry
except Exception:
# ignore telemetry parse errors, they are non-fatal
pass
if sender:
self._last_seen_overrides[str(sender)] = time.time()
@@ -382,6 +421,7 @@ class MeshtasticGUI:
if portnum == "TEXT_MESSAGE_APP":
self._append("[MSG] %s: %s (RSSI=%s)" % (label, text, rssi))
self._append_to_node_chat(str(sender), "%s" % (text,))
if isinstance(text, str) and text.strip().lower() == "ping":
self._send_pong(sender, label)
@@ -393,6 +433,7 @@ class MeshtasticGUI:
try:
self.iface.sendText("pong", destinationId=dest_id, wantAck=False)
self._append("[auto] pong -> %s" % label)
self._append_to_node_chat(str(dest_id), "[auto] pong")
except Exception as e:
self._append("[auto] pong failed: %s" % e)
@@ -457,8 +498,7 @@ class MeshtasticGUI:
self.iface = TCPInterface(hostname=host, portNumber=port)
self.connected_evt.wait(timeout=5)
except Exception as e:
err = str(e)
self.root.after(0, lambda err=err: messagebox.showerror("TCP connect failed", err))
self.root.after(0, lambda: messagebox.showerror("TCP connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_serial_dialog(self):
@@ -486,8 +526,7 @@ class MeshtasticGUI:
self.iface = SerialInterface(devPath=port) if port else SerialInterface()
self.connected_evt.wait(timeout=5)
except Exception as e:
err = str(e)
self.root.after(0, lambda err=err: messagebox.showerror("Serial connect failed", err))
self.root.after(0, lambda: messagebox.showerror("Serial connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def connect_ble_dialog(self):
@@ -523,8 +562,7 @@ class MeshtasticGUI:
self.iface = BLEInterface(address=addr)
self.connected_evt.wait(timeout=8)
except Exception as e:
err = str(e)
self.root.after(0, lambda err=err: messagebox.showerror("BLE connect failed", err))
self.root.after(0, lambda: messagebox.showerror("BLE connect failed", str(e)))
threading.Thread(target=run, daemon=True).start()
def disconnect(self):
@@ -538,6 +576,124 @@ class MeshtasticGUI:
self._append("[*] Disconnected")
# send ------------------------------------------------------------
# channel selector helpers ----------------------------------------
def _reset_channel_choices(self):
"""Initialize channel selector with Public + To selected."""
self._channel_map = {}
options = []
label_pub = "Public (broadcast)"
self._channel_map[label_pub] = {"mode": "broadcast", "channelIndex": 0}
options.append(label_pub)
if hasattr(self, "cbo_channel"):
self.cbo_channel["values"] = options
if hasattr(self, "channel_var"):
self.channel_var.set(label_pub)
def _set_channel_choice(self, label: str):
"""Safely set the current channel choice, if it exists."""
try:
values = list(self.cbo_channel["values"])
except Exception:
return
if label not in values:
return
self.channel_var.set(label)
def _update_channels_from_iface(self):
"""
Populate channel selector with channels from the connected device.
We keep:
* "Public (broadcast)" -> broadcast on channel 0
And then append additional channels (1..N) from the radio as:
* "Ch <idx>: <name>" -> broadcast on that channel.
"""
iface = getattr(self, "iface", None)
if not iface:
return
local_node = getattr(iface, "localNode", None)
if not local_node:
return
# Try to request channels if we don't have them yet.
chans = getattr(local_node, "channels", None)
try:
if (not chans) and hasattr(local_node, "requestChannels"):
local_node.requestChannels()
time.sleep(1.5)
chans = getattr(local_node, "channels", None)
except Exception:
chans = getattr(local_node, "channels", None)
try:
options = list(self.cbo_channel["values"])
except Exception:
return
# If channel 0 has a name, update the "Public" label to show it.
try:
if chans and len(chans) > 0:
ch0 = chans[0]
try:
ch0_name = (getattr(ch0, "settings", None).name or "").strip()
except Exception:
try:
ch0_name = (ch0.settings.name or "").strip()
except Exception:
ch0_name = ""
if ch0_name:
# Find the existing public entry (mode=broadcast, channelIndex=0)
old_label = None
for lbl, meta in list(self._channel_map.items()):
if meta.get("mode") == "broadcast" and int(meta.get("channelIndex", 0) or 0) == 0:
old_label = lbl
break
if old_label:
new_label = f"Public (ch0: {ch0_name})"
if new_label != old_label:
# Update mapping
self._channel_map[new_label] = self._channel_map.pop(old_label)
# Update combobox options
options = [new_label if v == old_label else v for v in options]
# Keep current selection if it was pointing to the old label
if self.channel_var.get() == old_label:
self.channel_var.set(new_label)
except Exception:
# Failing to pretty-print channel 0 is not fatal; just continue.
pass
# Add remaining channels (1..N) as broadcast options.
for idx, ch in enumerate(chans or []):
if idx == 0:
# Skip channel 0 here it's already represented by "Public".
continue
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
# older protobufs might expose fields differently
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
if not name:
label = f"Ch {idx}"
else:
label = f"Ch {idx}: {name}"
if label in self._channel_map:
continue
self._channel_map[label] = {"mode": "broadcast_channel", "channelIndex": idx}
options.append(label)
try:
self.cbo_channel["values"] = options
except Exception:
pass
def send_message(self):
msg = self.ent_message.get().strip()
if not msg:
@@ -545,17 +701,37 @@ class MeshtasticGUI:
if not self.iface:
messagebox.showwarning("Not connected", "Connect first.")
return
try:
if self.send_to_selected.get():
choice = self.channel_var.get() if hasattr(self, "channel_var") else ""
info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or {
"mode": "broadcast",
"channelIndex": 0,
}
mode = info.get("mode", "broadcast")
ch_index = int(info.get("channelIndex", 0) or 0)
if mode == "selected":
# Direct message to the currently selected node
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("No selection", "Select a node first.")
return
self.iface.sendText(msg, destinationId=nid, wantAck=False)
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Send failed", "Cannot resolve destination for selected node.")
return
self.iface.sendText(msg, destinationId=dest, wantAck=False, channelIndex=ch_index)
self._append("[ME -> %s] %s" % (self._node_label(nid), msg))
else:
self.iface.sendText(msg, wantAck=False)
self._append("[ME] %s" % msg)
# Broadcast on chosen channel (public or private)
self.iface.sendText(msg, wantAck=False, channelIndex=ch_index)
label = self.channel_var.get() if hasattr(self, "channel_var") else ""
if mode == "broadcast_channel" and ch_index:
self._append("[ME ch%d] %s" % (ch_index, msg))
else:
self._append("[ME] %s" % msg)
self.ent_message.delete(0, "end")
except Exception as e:
messagebox.showerror("Send failed", str(e))
@@ -680,6 +856,12 @@ class MeshtasticGUI:
speed_str = "%.1f" % speed if isinstance(speed, (int, float)) else "-"
alt_str = "%.0f" % alt if isinstance(alt, (int, float)) else "-"
telem = self._telemetry.get(node_id, {}) if hasattr(self, "_telemetry") else {}
bat = telem.get("battery")
volt = telem.get("voltage")
bat_str = "%.0f" % bat if isinstance(bat, (int, float)) else "-"
volt_str = "%.2f" % volt if isinstance(volt, (int, float)) else "-"
values = (
shortname,
longname,
@@ -688,6 +870,8 @@ class MeshtasticGUI:
dist_str,
speed_str,
alt_str,
bat_str,
volt_str,
"%.0f" % (lastheard_epoch or 0),
hwmodel,
role,
@@ -702,7 +886,6 @@ class MeshtasticGUI:
self.tv_nodes.insert("", "end", iid=node_id, values=values)
except Exception:
self.tv_nodes.insert("", "end", values=values)
if self._last_sort_col:
self.sort_by_column(self._last_sort_col, self._last_sort_reverse)
@@ -712,7 +895,7 @@ class MeshtasticGUI:
self._last_sort_col = col
self._last_sort_reverse = reverse
col_to_sort = "lastheard" if col == "since" else col
numeric = {"lastheard", "distkm", "hops", "speed", "alt"}
numeric = {"lastheard", "distkm", "hops", "speed", "alt", "battery", "voltage"}
rows = []
for iid in self.tv_nodes.get_children(""):
val = self.tv_nodes.set(iid, col_to_sort)
@@ -737,6 +920,13 @@ class MeshtasticGUI:
fg = "#ffffff" if is_dark else "#000000"
acc = "#2d2d2d" if is_dark else "#ffffff"
sel = "#555555" if is_dark else "#cce0ff"
# Root window background (client area)
try:
self.root.configure(bg=bg)
except Exception:
pass
style = ttk.Style(self.root)
try:
style.theme_use("clam")
@@ -748,10 +938,38 @@ class MeshtasticGUI:
style.configure("TLabel", background=bg, foreground=fg)
style.configure("TButton", background=acc, foreground=fg)
style.configure("TEntry", fieldbackground=acc, foreground=fg)
style.configure("TCombobox", fieldbackground=acc, background=acc, foreground=fg, arrowcolor=fg)
style.map(
"TCombobox",
fieldbackground=[("readonly", acc)],
foreground=[("readonly", fg)],
background=[("readonly", acc)],
)
style.configure("Treeview", background=acc, fieldbackground=acc, foreground=fg, borderwidth=0)
style.map("Treeview", background=[("selected", sel)], foreground=[("selected", fg)])
# Menubar itself (stored as self.menubar when created)
try:
self.txt_messages.configure(bg=acc, fg=fg, insertbackground=fg, selectbackground=sel, selectforeground=fg)
if hasattr(self, "menubar") and self.menubar is not None:
self.menubar.configure(
background=bg,
foreground=fg,
activebackground=sel,
activeforeground=fg,
borderwidth=0,
relief="flat",
)
except Exception:
pass
try:
self.txt_messages.configure(
bg=acc,
fg=fg,
insertbackground=fg,
selectbackground=sel,
selectforeground=fg,
)
except Exception:
pass
try:
@@ -759,6 +977,12 @@ class MeshtasticGUI:
self.root.option_add("*Menu*foreground", fg)
self.root.option_add("*Menu*activeBackground", sel)
self.root.option_add("*Menu*activeForeground", fg)
# Dark background for ttk.Combobox dropdown list
try:
self.root.option_add("*TCombobox*Listbox*background", acc)
self.root.option_add("*TCombobox*Listbox*foreground", fg)
except Exception:
pass
except Exception:
pass
@@ -794,6 +1018,42 @@ class MeshtasticGUI:
self._append(f"[trace] Requesting traceroute to {self._node_label(nid)} ({dest})")
threading.Thread(target=self._do_traceroute, args=(dest,), daemon=True).start()
def _cm_delete_node(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Delete node", "Select a node first.")
return
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showwarning("Delete node", "Connect first.")
return
dest = self._resolve_node_dest_id(nid)
if not dest:
messagebox.showerror("Delete node", "Cannot determine node ID.")
return
label = self._node_label(nid)
if not messagebox.askyesno(
"Delete node",
f"Remove node {label} ({dest}) from the NodeDB on the connected radio?\n\n"
"The device might reboot after this."
):
return
try:
# Use python-meshtastic Node.removeNode API to remove from NodeDB
# https://python.meshtastic.org/node.html
self.iface.localNode.removeNode(dest) # type: ignore[attr-defined]
self._append(f"[admin] Requested delete of node {label} ({dest})")
except Exception as e:
messagebox.showerror("Delete node", f"Failed to delete node: {e}")
return
# Also remove from UI for this session
try:
self.tv_nodes.delete(nid)
self.nodes_frame.config(text="Nodes (%d)" % len(self.tv_nodes.get_children()))
except Exception:
pass
def _resolve_node_dest_id(self, nid: str) -> Optional[str]:
# `nid` is the Treeview item id; in this client it normally equals the user.id (!xxxx)
if nid.startswith("!") or nid.isdigit():
@@ -862,8 +1122,7 @@ class MeshtasticGUI:
hopLimit=hop_limit,
)
except Exception as e:
err = str(e)
self.root.after(0, lambda err=err: messagebox.showerror("Traceroute", f"Failed to send traceroute: {err}"))
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to send traceroute: {e}"))
return
if not evt.wait(30.0):
@@ -871,8 +1130,7 @@ class MeshtasticGUI:
return
if "error" in result:
err = result.get("error")
self.root.after(0, lambda err=err: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {err}"))
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to decode traceroute: {result['error']}"))
return
p = result.get("packet") or {}
@@ -939,8 +1197,7 @@ class MeshtasticGUI:
try:
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=40)
except Exception as e: # pragma: no cover - environment specific
err = str(e)
self.root.after(0, lambda err=err: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {err}"))
self.root.after(0, lambda: messagebox.showerror("Traceroute", f"Failed to run meshtastic CLI: {e}"))
return
out = (proc.stdout or "") + ("\n" + (proc.stderr or "") if proc.stderr else "")
if not out.strip():
@@ -965,11 +1222,9 @@ class MeshtasticGUI:
txt.insert("1.0", text.strip() or "No traceroute data.")
txt.configure(state="disabled")
def _toggle_send_target(self):
nid = self._get_selected_node_id()
self.send_to_selected.set(bool(nid))
if nid:
self._append("[target] will send to %s" % self._node_label(nid))
# Legacy helper kept for compatibility, but no longer used.
# Double-clicking a node now opens the per-node chat window directly.
return
def _popup_node_menu(self, event):
iid = self.tv_nodes.identify_row(event.y)
if iid:
@@ -1072,9 +1327,312 @@ class MeshtasticGUI:
txt.insert("1.0", "\n".join(lines))
else:
txt.insert("1.0", json.dumps(node, indent=2, default=str))
txt.configure(state="disabled")
def show_neighbors_window(self):
if not self.iface or not getattr(self.iface, "nodes", None):
messagebox.showinfo("Neighbors", "No interface or nodes available.")
return
win = tk.Toplevel(self.root)
win.title("Neighbor table")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
tree = ttk.Treeview(frm, columns=("from", "to", "snr", "lastheard"), show="headings")
for col, txt in (("from", "From node"), ("to", "To node"), ("snr", "SNR"), ("lastheard", "Last heard")):
tree.heading(col, text=txt)
tree.column(col, width=160 if col in ("from", "to") else 90, anchor="w")
tree.pack(side="left", expand=True, fill="both")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=tree.yview)
tree.configure(yscrollcommand=yscroll.set)
yscroll.pack(side="right", fill="y")
try:
nodes_snapshot = dict(self.iface.nodes or {})
except Exception:
nodes_snapshot = {}
rows = 0
for node_id, node in nodes_snapshot.items():
from_label = self._node_label(node_id)
neighbors = (node or {}).get("neighbors") or []
if isinstance(neighbors, dict):
neighbors = neighbors.values()
for n in neighbors:
try:
to_id = (n or {}).get("nodeId") or (n or {}).get("id") or ""
to_label = self._node_label(str(to_id)) if to_id else str(to_id)
snr = (n or {}).get("snr") or (n or {}).get("snrDb")
last = (n or {}).get("lastHeard")
try:
last_epoch = float(last) if last is not None else None
except Exception:
last_epoch = None
last_str = _fmt_ago(last_epoch)
except Exception:
continue
tree.insert("", "end", values=(from_label, to_label, snr if snr is not None else "-", last_str))
rows += 1
if rows == 0:
messagebox.showinfo("Neighbors", "No neighbor information found on nodes.")
def show_radio_config_window(self):
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showinfo("Radio config", "Connect to a device first.")
return
ln = self.iface.localNode
try:
if getattr(ln, "localConfig", None) is None and hasattr(ln, "waitForConfig"):
ln.waitForConfig("localConfig")
except Exception:
pass
cfg = getattr(ln, "localConfig", None)
mod = getattr(ln, "moduleConfig", None)
if cfg is None and mod is None:
messagebox.showinfo("Radio config", "No config available yet from device.")
return
win = tk.Toplevel(self.root)
win.title("Radio + module config (readonly)")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.pack(expand=True, fill="both")
txt = tk.Text(frm, wrap="none")
txt.pack(side="left", expand=True, fill="both")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=txt.yview)
txt.configure(yscrollcommand=yscroll.set)
yscroll.pack(side="right", fill="y")
is_dark = self.current_theme == "dark"
txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
lines = []
if cfg is not None:
lines.append("localConfig:")
try:
lines.append(str(cfg))
except Exception:
lines.append(repr(cfg))
if mod is not None:
lines.append("\nmoduleConfig:")
try:
lines.append(str(mod))
except Exception:
lines.append(repr(mod))
txt.insert("1.0", "\n".join(lines))
txt.configure(state="disabled")
def show_channel_editor_window(self):
if not self.iface or not getattr(self.iface, "localNode", None):
messagebox.showinfo("Channels", "Connect to a device first.")
return
ln = self.iface.localNode
try:
if getattr(ln, "channels", None) in (None, {}) and hasattr(ln, "waitForConfig"):
ln.waitForConfig("channels")
except Exception:
pass
chans = getattr(ln, "channels", None)
if not chans:
messagebox.showinfo("Channels", "No channels available from device.")
return
win = tk.Toplevel(self.root)
win.title("Channel editor")
self._style_toplevel(win)
frm = ttk.Frame(win, padding=8)
frm.grid(row=0, column=0, sticky="nsew")
win.rowconfigure(0, weight=1)
win.columnconfigure(0, weight=1)
listbox = tk.Listbox(frm, height=10)
listbox.grid(row=0, column=0, columnspan=2, sticky="nsew")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
for idx, ch in enumerate(chans):
if ch is None:
label = f"Ch {idx} (empty)"
else:
try:
name = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
name = (ch.settings.name or "").strip()
except Exception:
name = ""
label = f"{idx}: {name or '(no name)'}"
listbox.insert("end", label)
ttk.Label(frm, text="Channel name:").grid(row=1, column=0, sticky="w", pady=(6, 2))
name_var = tk.StringVar()
ent_name = ttk.Entry(frm, textvariable=name_var, width=40)
ent_name.grid(row=2, column=0, columnspan=2, sticky="ew")
def on_select(evt=None):
sel = listbox.curselection()
if not sel:
return
i = sel[0]
ch = chans[i]
if ch is None:
name_var.set("")
return
try:
nm = (getattr(ch, "settings", None).name or "").strip()
except Exception:
try:
nm = (ch.settings.name or "").strip()
except Exception:
nm = ""
name_var.set(nm)
listbox.bind("<<ListboxSelect>>", on_select)
def save_name():
sel = listbox.curselection()
if not sel:
messagebox.showinfo("Channel editor", "Select a channel first.")
return
i = sel[0]
ch = chans[i]
if ch is None:
messagebox.showerror("Channel editor", "Selected channel object is empty, cannot rename.")
return
new_name = name_var.get().strip()
try:
if hasattr(ch, "settings"):
ch.settings.name = new_name
elif hasattr(ch, "name"):
ch.name = new_name
ln.writeChannel(i)
self._append(f"[admin] Renamed channel {i} to '{new_name}'")
self._update_channels_from_iface()
listbox.delete(i)
label = f"{i}: {new_name or '(no name)'}"
listbox.insert(i, label)
except Exception as e:
messagebox.showerror("Channel editor", f"Failed to write channel: {e}")
btn_save = ttk.Button(frm, text="Save name to device", command=save_name)
btn_save.grid(row=3, column=0, sticky="w", pady=(6, 0))
def _cm_open_chat(self):
nid = self._get_selected_node_id()
if not nid:
messagebox.showinfo("Chat", "Select a node first.")
return
self._open_node_chat(nid)
def _open_node_chat(self, nid: str):
key = str(nid)
if key in self._per_node_chats:
win = self._per_node_chats[key]
try:
win.top.deiconify()
win.top.lift()
except Exception:
pass
return
label = self._node_label(nid)
chat = NodeChatWindow(self, key, label)
self._per_node_chats[key] = chat
def _append_to_node_chat(self, node_id: str, line: str):
key = str(node_id)
if not key:
return
win = self._per_node_chats.get(key)
if not win:
return
win.append(line)
def _send_text_to_node(self, dest_id: str, msg: str) -> bool:
if not self.iface:
messagebox.showwarning("Send", "Connect first.")
return False
msg = (msg or "").strip()
if not msg:
return False
try:
choice = self.channel_var.get() if hasattr(self, "channel_var") else ""
info = (self._channel_map.get(choice) if hasattr(self, "_channel_map") else None) or {
"mode": "broadcast",
"channelIndex": 0,
}
ch_index = int(info.get("channelIndex", 0) or 0)
except Exception:
ch_index = 0
try:
self.iface.sendText(msg, destinationId=dest_id, wantAck=False, channelIndex=ch_index)
self._append("[ME -> %s] %s" % (self._node_label(dest_id), msg))
self._append_to_node_chat(dest_id, "[ME] " + msg)
return True
except Exception as e:
messagebox.showerror("Send failed", str(e))
return False
class NodeChatWindow:
def __init__(self, app: "MeshtasticGUI", node_id: str, label: str):
self.app = app
self.node_id = node_id
self.label = label
self.top = tk.Toplevel(app.root)
self.top.title(f"Chat: {label}")
app._style_toplevel(self.top)
self.top.protocol("WM_DELETE_WINDOW", self._on_close)
frm = ttk.Frame(self.top, padding=8)
frm.pack(expand=True, fill="both")
frm.rowconfigure(0, weight=1)
frm.columnconfigure(0, weight=1)
self.txt = tk.Text(frm, wrap="word")
self.txt.grid(row=0, column=0, columnspan=2, sticky="nsew")
yscroll = ttk.Scrollbar(frm, orient="vertical", command=self.txt.yview)
self.txt.configure(yscrollcommand=yscroll.set)
yscroll.grid(row=0, column=2, sticky="ns")
is_dark = app.current_theme == "dark"
self.txt.configure(
bg=("#2d2d2d" if is_dark else "#ffffff"),
fg=("#ffffff" if is_dark else "#000000"),
insertbackground=("#ffffff" if is_dark else "#000000"),
)
self.entry = ttk.Entry(frm)
self.entry.grid(row=1, column=0, sticky="nsew", pady=(6, 0))
self.entry.bind("<Return>", self._on_send)
btn = ttk.Button(frm, text="Send", command=self._on_send)
btn.grid(row=1, column=1, sticky="e", padx=(4, 0), pady=(6, 0))
def append(self, line: str):
try:
self.txt.insert("end", line + "\n")
self.txt.see("end")
except Exception:
pass
def _on_send(self, *_):
msg = self.entry.get().strip()
if not msg:
return
ok = self.app._send_text_to_node(self.node_id, msg)
if ok:
self.entry.delete(0, "end")
def _on_close(self):
key = str(self.node_id)
try:
if key in self.app._per_node_chats:
del self.app._per_node_chats[key]
except Exception:
pass
self.top.destroy()
def main():
app = MeshtasticGUI()
app.root.geometry("1500x820")
@@ -1083,4 +1641,4 @@ def main():
if __name__ == "__main__":
main()
main()