Refactoring

This commit is contained in:
pe1hvh
2026-02-05 18:10:53 +01:00
parent 35f46651dd
commit 5bfd103e97
37 changed files with 1940 additions and 1952 deletions
BIN
View File
Binary file not shown.
-108
View File
@@ -1,108 +0,0 @@
# MeshCore GUI v4.0 — Changelog
## Probleem dat is opgelost
Bij het opstarten van de GUI bufferde het systeem RX_LOG-pakketten die tijdens de
initialisatiefase binnenkwamen (send_appstart, send_device_query, get_contacts).
Deze pakketten werden later foutief gekoppeld aan het eerste echte channel message,
wat resulteerde in onmogelijke route-informatie (bijv. "22 of 7 repeaters identified").
Daarnaast gebruikte de oude code SNR-matching om RX_LOG-entries te koppelen aan
channel messages. Dit werkte niet betrouwbaar omdat het companion protocol en
RX_LOG verschillende SNR-waarden rapporteren voor hetzelfde fysieke pakket
(bijv. companion=13.5, RX_LOG=14.0).
## Drie defensieve lagen
### Laag 1: Startup buffer clearing (`ble_worker.py:126-135`)
Na `start_auto_message_fetching()` worden de RX path buffer én het SharedData
archive geleegd. Alle pakketten die tijdens de init-fase zijn binnengekomen
worden weggegooid voordat het eerste echte channel message kan arriveren.
```python
await self.mc.start_auto_message_fetching()
self._rx_path_buffer.clear()
self.shared.clear_rx_archive()
```
### Laag 2: Path_len sanity checks (alle matching paden)
In elke matching-methode wordt gecontroleerd of het aantal hashes in een RX_LOG
entry niet wild afwijkt van het door het companion protocol gerapporteerde hop
count. De margin is configureerbaar via `PATH_LEN_SANITY_MARGIN = 5`.
Aanwezig in:
- Forward matching (`ble_worker.py:366`)
- Retroactive matching (`ble_worker.py:304`)
- Archive matching (`shared_data.py:354`)
Voorbeeld: een 7-hop bericht accepteert maximaal 12 hashes (7 + 5).
Een entry met 22 hashes wordt geweigerd.
### Laag 3: Display-time guard (`route_page.py:89`)
Laatste vangnet bij het renderen van de route-pagina. Als het aantal resolved
hops meer is dan 2× de path_len, wordt de match als false positive beschouwd
en verworpen.
```python
if msg_path_len > 0 and resolved_hops > 2 * msg_path_len:
resolved_hops = 0
route['path_nodes'] = []
```
## Andere wijzigingen
- **Temporal matching i.p.v. SNR** — Alle RX_LOG correlatie gebruikt nu
tijdproximiteit (binnen 3 seconden) in plaats van SNR-vergelijking.
SNR wordt nog steeds weergegeven in de UI maar niet meer gebruikt voor matching.
- **`PATH_LEN_SANITY_MARGIN`** als configureerbare constante in `config.py`
## Bestanden gewijzigd
| Bestand | Wijziging |
|---------|-----------|
| `meshcore_gui.py` | Versie → 4.0 |
| `meshcore_gui/__init__.py` | `__version__` → "4.0" |
| `meshcore_gui/config.py` | + `PATH_LEN_SANITY_MARGIN = 5` |
| `meshcore_gui/ble_worker.py` | Startup clear, temporal matching, path_len checks |
| `meshcore_gui/shared_data.py` | `clear_rx_archive()`, `find_rx_path()` met path_len check |
| `meshcore_gui/route_page.py` | Display-time sanity guard |
| `meshcore_gui/route_builder.py` | Geeft `msg_path_len` door aan archive lookup |
| `meshcore_gui/protocols.py` | + `clear_rx_archive()` in Writer protocol |
## Installatie
Vervang je huidige bestanden:
```bash
# Backup
cp -r meshcore_gui meshcore_gui.bak
cp meshcore_gui.py meshcore_gui.py.bak
# Vervang
cp -r meshcore-gui-v4.0/meshcore_gui ./
cp meshcore-gui-v4.0/meshcore_gui.py ./
```
## Verwacht gedrag na update
Met `--debug-on` zie je nu bij opstarten:
```
DEBUG: Startup buffer+archive cleared — only post-init packets will be matched
BLE: Ready!
```
En bij een 7-hop bericht met correcte match:
```
DEBUG: Forward match: dt=0.42s, hashes=7
```
In plaats van het oude gedrag:
```
DEBUG: No RX_LOG match: msg_snr=13.5, buffer_snrs=[14.0, 14.0, ...]
DEBUG: RX archive match: snr=14.0, hashes=[...22 total], time_diff=0.81s
```
-652
View File
@@ -1,652 +0,0 @@
"""
BLE communication worker for MeshCore GUI.
Runs in a separate thread with its own asyncio event loop. Connects to
the MeshCore device, subscribes to events, and processes commands sent
from the GUI via the SharedData command queue.
Single-source architecture
~~~~~~~~~~~~~~~~~~~~~~~~~~
When a LoRa packet arrives the companion firmware pushes two events:
1. ``RX_LOG_DATA`` — the *raw* LoRa frame with header, path hashes
and encrypted payload.
2. ``CHANNEL_MSG_RECV`` — the *decrypted* message text but **no** path
hashes (only the hop count ``path_len``).
This module uses ``meshcoredecoder`` to fully decode the raw packet
from (1): message_hash, path_hashes, sender name, message text and
channel index are all extracted from that **single frame**.
The ``CHANNEL_MSG_RECV`` event (2) serves only as a fallback for
packets that could not be decrypted from the raw frame (e.g. missing
channel key).
Deduplication is done via ``message_hash``: if the same hash has
already been processed from RX_LOG_DATA, the CHANNEL_MSG_RECV event
is silently dropped.
There is **no temporal correlation**, no ring buffer, no archive, and
no sanity-margin heuristics.
"""
import asyncio
import threading
import time
from datetime import datetime
from typing import Dict, List, Optional, Set
from meshcore import MeshCore, EventType
from meshcore_gui.config import (
BOT_CHANNEL, BOT_COOLDOWN_SECONDS, BOT_KEYWORDS, BOT_NAME,
CHANNELS_CONFIG, debug_print,
)
from meshcore_gui.packet_parser import PacketDecoder, PayloadType
from meshcore_gui.protocols import SharedDataWriter
# Maximum number of message_hashes kept for deduplication.
# Oldest entries are evicted first. 200 is generous for the
# typical message rate of a mesh network.
_SEEN_HASHES_MAX = 200
class BLEWorker:
"""
BLE communication worker that runs in a separate thread.
Attributes:
address: BLE MAC address of the device
shared: SharedDataWriter for thread-safe communication
mc: MeshCore instance after connection
running: Boolean to control the worker loop
"""
def __init__(self, address: str, shared: SharedDataWriter) -> None:
self.address = address
self.shared = shared
self.mc: Optional[MeshCore] = None
self.running = True
# Packet decoder (channel keys loaded at startup)
self._decoder = PacketDecoder()
# BOT: timestamp of last reply (cooldown enforcement)
self._bot_last_reply: float = 0.0
# Deduplication: message_hash values already processed via
# RX_LOG_DATA decode. When CHANNEL_MSG_RECV arrives for the
# same packet, it is silently dropped.
#
# Two dedup strategies:
# 1. message_hash (from decoded packet)
# 2. content key (sender:channel:text) — because CHANNEL_MSG_RECV
# does NOT include message_hash in its payload
self._seen_hashes: Set[str] = set()
self._seen_hashes_order: List[str] = []
self._seen_content: Set[str] = set()
self._seen_content_order: List[str] = []
# ------------------------------------------------------------------
# Thread lifecycle
# ------------------------------------------------------------------
def start(self) -> None:
"""Start the worker in a new daemon thread."""
thread = threading.Thread(target=self._run, daemon=True)
thread.start()
debug_print("BLE worker thread started")
def _run(self) -> None:
"""Entry point for the worker thread."""
asyncio.run(self._async_main())
async def _async_main(self) -> None:
"""Connect, then process commands in an infinite loop."""
await self._connect()
if self.mc:
while self.running:
await self._process_commands()
await asyncio.sleep(0.1)
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
async def _connect(self) -> None:
"""Connect to the BLE device and load initial data."""
self.shared.set_status(f"🔄 Connecting to {self.address}...")
try:
print(f"BLE: Connecting to {self.address}...")
self.mc = await MeshCore.create_ble(self.address)
print("BLE: Connected!")
await asyncio.sleep(1)
# Subscribe to events
self.mc.subscribe(EventType.CHANNEL_MSG_RECV, self._on_channel_msg)
self.mc.subscribe(EventType.CONTACT_MSG_RECV, self._on_contact_msg)
self.mc.subscribe(EventType.RX_LOG_DATA, self._on_rx_log)
await self._load_data()
await self._load_channel_keys()
await self.mc.start_auto_message_fetching()
self.shared.set_connected(True)
self.shared.set_status("✅ Connected")
print("BLE: Ready!")
except Exception as e:
print(f"BLE: Connection error: {e}")
self.shared.set_status(f"{e}")
async def _load_data(self) -> None:
"""
Load device data with retry mechanism.
Tries send_appstart and send_device_query each up to 5 times.
Channels come from hardcoded config.
"""
# send_appstart
self.shared.set_status("🔄 Device info...")
for i in range(5):
debug_print(f"send_appstart attempt {i + 1}")
r = await self.mc.commands.send_appstart()
if r.type != EventType.ERROR:
print(f"BLE: send_appstart OK: {r.payload.get('name')}")
self.shared.update_from_appstart(r.payload)
break
await asyncio.sleep(0.3)
# send_device_query
for i in range(5):
debug_print(f"send_device_query attempt {i + 1}")
r = await self.mc.commands.send_device_query()
if r.type != EventType.ERROR:
print(f"BLE: send_device_query OK: {r.payload.get('ver')}")
self.shared.update_from_device_query(r.payload)
break
await asyncio.sleep(0.3)
# Channels (hardcoded — BLE get_channel is unreliable)
self.shared.set_status("🔄 Channels...")
self.shared.set_channels(CHANNELS_CONFIG)
print(f"BLE: Channels loaded: {[c['name'] for c in CHANNELS_CONFIG]}")
# Contacts
self.shared.set_status("🔄 Contacts...")
r = await self.mc.commands.get_contacts()
if r.type != EventType.ERROR:
self.shared.set_contacts(r.payload)
print(f"BLE: Contacts loaded: {len(r.payload)} contacts")
async def _load_channel_keys(self) -> None:
"""
Load channel decryption keys for packet decoding.
Strategy per channel:
1. Try ``get_channel(idx)`` from the device (returns the
authoritative 16-byte ``channel_secret``).
2. If that fails, derive the key from the channel name via
``SHA-256(name)[:16]``. This is correct for channels whose
name starts with ``#`` (like ``#test``). For other channels
the derived key may be wrong, but decryption will simply fail
gracefully.
"""
self.shared.set_status("🔄 Channel keys...")
for ch in CHANNELS_CONFIG:
idx = ch['idx']
name = ch['name']
loaded = False
# Strategy 1: get_channel from device (3 retries)
for attempt in range(3):
try:
r = await self.mc.commands.get_channel(idx)
if r.type != EventType.ERROR:
secret = r.payload.get('channel_secret')
if secret and isinstance(secret, bytes) and len(secret) >= 16:
self._decoder.add_channel_key(idx, secret[:16])
print(
f"BLE: Channel key [{idx}] '{name}' "
f"loaded from device"
)
loaded = True
break
except Exception as exc:
debug_print(
f"get_channel({idx}) attempt {attempt + 1} "
f"error: {exc}"
)
await asyncio.sleep(0.3)
# Strategy 2: derive from name
if not loaded:
self._decoder.add_channel_key_from_name(idx, name)
print(
f"BLE: Channel key [{idx}] '{name}' "
f"derived from name (fallback)"
)
print(
f"BLE: PacketDecoder ready — "
f"has_keys={self._decoder.has_keys}"
)
# ------------------------------------------------------------------
# Deduplication
# ------------------------------------------------------------------
def _mark_seen(self, message_hash: str) -> None:
"""Record a message_hash as processed. Evicts old entries."""
if message_hash in self._seen_hashes:
return
self._seen_hashes.add(message_hash)
self._seen_hashes_order.append(message_hash)
while len(self._seen_hashes_order) > _SEEN_HASHES_MAX:
oldest = self._seen_hashes_order.pop(0)
self._seen_hashes.discard(oldest)
def _mark_content_seen(self, sender: str, channel, text: str) -> None:
"""Record a content key as processed. Evicts old entries."""
key = f"{channel}:{sender}:{text}"
if key in self._seen_content:
return
self._seen_content.add(key)
self._seen_content_order.append(key)
while len(self._seen_content_order) > _SEEN_HASHES_MAX:
oldest = self._seen_content_order.pop(0)
self._seen_content.discard(oldest)
def _is_seen(self, message_hash: str) -> bool:
"""Check if a message_hash has already been processed."""
return message_hash in self._seen_hashes
def _is_content_seen(self, sender: str, channel, text: str) -> bool:
"""Check if a content key has already been processed."""
key = f"{channel}:{sender}:{text}"
return key in self._seen_content
# ------------------------------------------------------------------
# Command handling
# ------------------------------------------------------------------
async def _process_commands(self) -> None:
"""Process all commands queued by the GUI."""
while True:
cmd = self.shared.get_next_command()
if cmd is None:
break
await self._handle_command(cmd)
async def _handle_command(self, cmd: Dict) -> None:
"""
Process a single command from the GUI.
Supported actions: send_message, send_dm, send_advert, refresh.
"""
action = cmd.get('action')
if action == 'send_message':
channel = cmd.get('channel', 0)
text = cmd.get('text', '')
is_bot = cmd.get('_bot', False)
if text and self.mc:
await self.mc.commands.send_chan_msg(channel, text)
# Bot replies appear via the radio echo (RX_LOG),
# so only add manual messages to the message list.
if not is_bot:
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': 'Me',
'text': text,
'channel': channel,
'direction': 'out',
'sender_pubkey': '',
'path_hashes': [],
})
debug_print(
f"{'BOT' if is_bot else 'Sent'} message to "
f"channel {channel}: {text[:30]}"
)
elif action == 'send_advert':
if self.mc:
await self.mc.commands.send_advert(flood=True)
self.shared.set_status("📢 Advert sent")
debug_print("Advert sent")
elif action == 'send_dm':
pubkey = cmd.get('pubkey', '')
text = cmd.get('text', '')
contact_name = cmd.get('contact_name', pubkey[:8])
if text and pubkey and self.mc:
await self.mc.commands.send_msg(pubkey, text)
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': 'Me',
'text': text,
'channel': None,
'direction': 'out',
'sender_pubkey': pubkey,
'path_hashes': [],
})
debug_print(f"Sent DM to {contact_name}: {text[:30]}")
elif action == 'refresh':
if self.mc:
debug_print("Refresh requested")
await self._load_data()
# ------------------------------------------------------------------
# BOT — keyword-triggered auto-reply
# ------------------------------------------------------------------
def _bot_check_and_queue(
self,
sender: str,
text: str,
channel_idx,
snr,
path_len: int,
path_hashes: Optional[List[str]] = None,
) -> None:
"""Queue a BOT reply if all guards pass.
Guards:
1. BOT is enabled (checkbox in GUI)
2. Message is on the configured BOT_CHANNEL
3. Sender is not the BOT itself (prevent self-reply)
4. Sender name does not end with 'Bot' (prevent bot-to-bot loops)
5. Cooldown period has elapsed
6. Message text contains a recognised keyword
"""
# Guard 1: BOT enabled?
if not self.shared.is_bot_enabled():
return
# Guard 2: correct channel?
if channel_idx != BOT_CHANNEL:
return
# Guard 3: skip own messages (use BOT_NAME as identifier, not device name)
if sender == 'Me' or (text and text.startswith(BOT_NAME)):
return
# Guard 4: skip other bots (name ends with "Bot")
if sender and sender.rstrip().lower().endswith('bot'):
debug_print(f"BOT: skipping message from other bot '{sender}'")
return
# Guard 5: cooldown
now = time.time()
if now - self._bot_last_reply < BOT_COOLDOWN_SECONDS:
debug_print("BOT: cooldown active, skipping")
return
# Guard 6: keyword match (case-insensitive, first match wins)
text_lower = (text or '').lower()
matched_template = None
for keyword, template in BOT_KEYWORDS.items():
if keyword in text_lower:
matched_template = template
break
if matched_template is None:
return
# Build path string: "path(N); A>B" or "path(0)"
path_str = self._format_path(path_len, path_hashes)
# Build reply
snr_str = f"{snr:.1f}" if snr is not None else "?"
reply = matched_template.format(
bot=BOT_NAME,
sender=sender or "?",
snr=snr_str,
path=path_str,
)
# Update cooldown timestamp
self._bot_last_reply = now
# Queue as internal command — picked up by _process_commands
self.shared.put_command({
'action': 'send_message',
'channel': BOT_CHANNEL,
'text': reply,
'_bot': True,
})
debug_print(f"BOT: queued reply to '{sender}': {reply}")
def _format_path(
self, path_len: int, path_hashes: Optional[List[str]],
) -> str:
"""Format path info as ``path(N); 8D>A8`` or ``path(0)``.
Shows raw 1-byte hashes in uppercase hex.
"""
if not path_len:
return "path(0)"
if not path_hashes:
return f"path({path_len})"
hop_names = [h.upper() for h in path_hashes if h and len(h) >= 2]
if hop_names:
return f"path({path_len}); {'>'.join(hop_names)}"
return f"path({path_len})"
# ------------------------------------------------------------------
# Event callbacks
# ------------------------------------------------------------------
def _on_rx_log(self, event) -> None:
"""Callback for RX log data — the single source of truth.
Decodes the raw LoRa frame via ``meshcoredecoder``. For
GroupText packets this yields message_hash, path_hashes,
sender, text and channel_idx — all from **one** frame.
The decoded message is added to SharedData directly. The
message_hash is recorded so that the duplicate
``CHANNEL_MSG_RECV`` event is suppressed.
"""
payload = event.payload
# Always add to the RX log display
self.shared.add_rx_log({
'time': datetime.now().strftime('%H:%M:%S'),
'snr': payload.get('snr', 0),
'rssi': payload.get('rssi', 0),
'payload_type': payload.get('payload_type', '?'),
'hops': payload.get('path_len', 0),
})
# Decode the raw packet
payload_hex = payload.get('payload', '')
if not payload_hex:
return
decoded = self._decoder.decode(payload_hex)
if decoded is None:
return
# Only process decrypted GroupText packets as messages
if (decoded.payload_type == PayloadType.GroupText
and decoded.is_decrypted):
# Mark as seen so CHANNEL_MSG_RECV is suppressed
self._mark_seen(decoded.message_hash)
self._mark_content_seen(
decoded.sender, decoded.channel_idx, decoded.text,
)
# Look up sender pubkey from contact name
sender_pubkey = ''
if decoded.sender:
match = self.shared.get_contact_by_name(decoded.sender)
if match:
sender_pubkey, _contact = match
# Extract SNR from the RX_LOG event
snr = payload.get('snr')
if snr is not None:
try:
snr = float(snr)
except (ValueError, TypeError):
snr = None
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': decoded.sender,
'text': decoded.text,
'channel': decoded.channel_idx,
'direction': 'in',
'snr': snr,
'path_len': decoded.path_length,
'sender_pubkey': sender_pubkey,
'path_hashes': decoded.path_hashes,
'message_hash': decoded.message_hash,
})
debug_print(
f"RX_LOG → message: hash={decoded.message_hash}, "
f"sender={decoded.sender!r}, "
f"ch={decoded.channel_idx}, "
f"path={decoded.path_hashes}"
)
# BOT: check for keyword and queue reply
self._bot_check_and_queue(
sender=decoded.sender,
text=decoded.text,
channel_idx=decoded.channel_idx,
snr=snr,
path_len=decoded.path_length,
path_hashes=decoded.path_hashes,
)
def _on_channel_msg(self, event) -> None:
"""Callback for channel messages — fallback only.
If the same packet was already decoded from ``RX_LOG_DATA``
(checked via ``message_hash``), this event is suppressed.
Otherwise — e.g. when the channel key is missing or decryption
failed — this adds the message without path data.
"""
payload = event.payload
debug_print(f"Channel msg payload keys: {list(payload.keys())}")
debug_print(f"Channel msg payload: {payload}")
# --- Check for duplicate via message_hash ---
msg_hash = payload.get('message_hash', '')
if msg_hash and self._is_seen(msg_hash):
debug_print(
f"Channel msg suppressed (hash match): "
f"hash={msg_hash}"
)
return
# --- Extract sender name from text field ---
# Channel text format: "SenderName: message body"
raw_text = payload.get('text', '')
sender = ''
msg_text = raw_text
if ': ' in raw_text:
name_part, body_part = raw_text.split(': ', 1)
sender = name_part.strip()
msg_text = body_part
elif raw_text:
msg_text = raw_text
# --- Check for duplicate via content ---
ch_idx = payload.get('channel_idx')
if self._is_content_seen(sender, ch_idx, msg_text):
debug_print(
f"Channel msg suppressed (content match): "
f"sender={sender!r}, ch={ch_idx}, text={msg_text[:30]!r}"
)
return
debug_print(
f"Channel msg (fallback): sender={sender!r}, "
f"text={msg_text[:40]!r}"
)
# --- Look up sender contact by name to obtain pubkey ---
sender_pubkey = ''
if sender:
match = self.shared.get_contact_by_name(sender)
if match:
sender_pubkey, _contact = match
# Extract SNR
msg_snr = payload.get('SNR') or payload.get('snr')
if msg_snr is not None:
try:
msg_snr = float(msg_snr)
except (ValueError, TypeError):
msg_snr = None
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': sender,
'text': msg_text,
'channel': payload.get('channel_idx'),
'direction': 'in',
'snr': msg_snr,
'path_len': payload.get('path_len', 0),
'sender_pubkey': sender_pubkey,
'path_hashes': [], # No path data from companion event
'message_hash': msg_hash,
})
# BOT: check for keyword and queue reply (fallback path)
self._bot_check_and_queue(
sender=sender,
text=msg_text,
channel_idx=payload.get('channel_idx'),
snr=msg_snr,
path_len=payload.get('path_len', 0),
)
def _on_contact_msg(self, event) -> None:
"""Callback for received DMs; resolves sender name via pubkey."""
payload = event.payload
pubkey = payload.get('pubkey_prefix', '')
sender = ''
debug_print(f"DM payload keys: {list(payload.keys())}")
debug_print(f"DM payload: {payload}")
if pubkey:
sender = self.shared.get_contact_name_by_prefix(pubkey)
if not sender:
sender = pubkey[:8] if pubkey else ''
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': sender,
'text': payload.get('text', ''),
'channel': None,
'direction': 'in',
'snr': payload.get('SNR') or payload.get('snr'),
'path_len': payload.get('path_len', 0),
'sender_pubkey': pubkey,
'path_hashes': [], # DMs use out_path from contact record
})
debug_print(
f"DM received from {sender}: "
f"{payload.get('text', '')[:30]}"
)
-81
View File
@@ -1,81 +0,0 @@
"""
Configuration and shared constants for MeshCore GUI.
Contains:
- Debug flag and debug_print helper
- Channel configuration
- Contact type mappings
The DEBUG flag defaults to False and can be activated at startup
with the ``--debug-on`` command-line option.
"""
from typing import Dict, List
# ==============================================================================
# DEBUG
# ==============================================================================
DEBUG = False
def debug_print(msg: str) -> None:
"""
Print debug message if DEBUG mode is enabled.
Args:
msg: The message to print
"""
if DEBUG:
print(f"DEBUG: {msg}")
# ==============================================================================
# CHANNELS
# ==============================================================================
# Hardcoded channels configuration.
# Determine your channels with meshcli:
# meshcli -d <BLE_ADDRESS>
# > get_channels
# Output: 0: Public [...], 1: #test [...], etc.
CHANNELS_CONFIG: List[Dict] = [
{'idx': 0, 'name': 'Public'},
{'idx': 1, 'name': '#test'},
{'idx': 2, 'name': '#zwolle'},
{'idx': 3, 'name': 'RahanSom'},
]
# ==============================================================================
# CONTACT TYPE MAPPINGS
# ==============================================================================
TYPE_ICONS: Dict[int, str] = {0: "", 1: "📱", 2: "📡", 3: "🏠"}
TYPE_NAMES: Dict[int, str] = {0: "-", 1: "CLI", 2: "REP", 3: "ROOM"}
TYPE_LABELS: Dict[int, str] = {0: "-", 1: "Companion", 2: "Repeater", 3: "Room Server"}
# ==============================================================================
# BOT
# ==============================================================================
# Channel index the bot listens on (must match CHANNELS_CONFIG).
BOT_CHANNEL: int = 1 # #test
# Display name prepended to every bot reply.
BOT_NAME: str = "Zwolle Bot"
# Minimum seconds between two bot replies (prevents reply-storms).
BOT_COOLDOWN_SECONDS: float = 5.0
# Keyword → reply template mapping.
# Available variables: {bot}, {sender}, {snr}, {path}
# The bot checks whether the incoming message text *contains* the keyword
# (case-insensitive). First match wins.
BOT_KEYWORDS: Dict[str, str] = {
'test': '{bot}: {sender}, rcvd | SNR {snr} | {path}',
'ping': '{bot}: Pong!',
'help': '{bot}: test, ping, help',
}
-440
View File
@@ -1,440 +0,0 @@
"""
Main dashboard page for MeshCore GUI.
Contains the three-column layout with device info, contacts, map,
messaging, filters and RX log. The 500 ms update timer lives here.
"""
from typing import Dict, List
import logging
from nicegui import ui
from meshcore_gui.config import TYPE_ICONS, TYPE_NAMES
from meshcore_gui.protocols import SharedDataReader
# Suppress the harmless "Client has been deleted" warning that NiceGUI
# emits when a browser tab is refreshed while a ui.timer is active.
class _DeletedClientFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return 'Client has been deleted' not in record.getMessage()
logging.getLogger('nicegui').addFilter(_DeletedClientFilter())
class DashboardPage:
"""
Main dashboard rendered at ``/``.
Args:
shared: SharedDataReader for data access and command dispatch
"""
def __init__(self, shared: SharedDataReader) -> None:
self._shared = shared
# UI element references
self._status_label = None
self._device_label = None
self._channel_select = None
self._channels_filter_container = None
self._channel_filters: Dict = {}
self._bot_checkbox = None
self._contacts_container = None
self._map_widget = None
self._messages_container = None
self._rxlog_table = None
self._msg_input = None
# Map markers tracking
self._markers: List = []
# Channel data for message display
self._last_channels: List[Dict] = []
# Local first-render flag — each new browser tab gets its own
# DashboardPage instance and must render device/contacts once.
self._initialized: bool = False
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def render(self) -> None:
"""Build the complete dashboard layout and start the timer."""
# Reset per-tab state — same DashboardPage instance is reused
# across browser reconnects, so force a fresh first-render.
self._initialized = False
self._markers.clear()
self._channel_filters = {}
self._last_channels = []
ui.dark_mode(False)
# Header
with ui.header().classes('bg-blue-600 text-white'):
ui.label('🔗 MeshCore').classes('text-xl font-bold')
ui.space()
self._status_label = ui.label('Starting...').classes('text-sm')
# Three columns
with ui.row().classes('w-full h-full gap-2 p-2'):
with ui.column().classes('w-64 gap-2'):
self._render_device_panel()
self._render_contacts_panel()
with ui.column().classes('flex-grow gap-2'):
self._render_map_panel()
self._render_input_panel()
self._render_channels_filter()
self._render_messages_panel()
with ui.column().classes('w-64 gap-2'):
self._render_actions_panel()
self._render_rxlog_panel()
# 500 ms update timer
ui.timer(0.5, self._update_ui)
# ------------------------------------------------------------------
# Panel builders
# ------------------------------------------------------------------
def _render_device_panel(self) -> None:
with ui.card().classes('w-full'):
ui.label('📡 Device').classes('font-bold text-gray-600')
self._device_label = ui.label('Connecting...').classes(
'text-sm whitespace-pre-line'
)
def _render_contacts_panel(self) -> None:
with ui.card().classes('w-full'):
ui.label('👥 Contacts').classes('font-bold text-gray-600')
self._contacts_container = ui.column().classes(
'w-full gap-1 max-h-96 overflow-y-auto'
)
def _render_map_panel(self) -> None:
with ui.card().classes('w-full'):
self._map_widget = ui.leaflet(
center=(52.5, 6.0), zoom=9
).classes('w-full h-72')
def _render_input_panel(self) -> None:
with ui.card().classes('w-full'):
with ui.row().classes('w-full items-center gap-2'):
self._msg_input = ui.input(
placeholder='Message...'
).classes('flex-grow')
self._channel_select = ui.select(
options={0: '[0] Public'}, value=0
).classes('w-32')
ui.button(
'Send', on_click=self._send_message
).classes('bg-blue-500 text-white')
def _render_channels_filter(self) -> None:
with ui.card().classes('w-full'):
with ui.row().classes('w-full items-center gap-4 justify-center'):
ui.label('📻 Filter:').classes('text-sm text-gray-600')
self._channels_filter_container = ui.row().classes('gap-4')
def _render_messages_panel(self) -> None:
with ui.card().classes('w-full'):
ui.label('💬 Messages').classes('font-bold text-gray-600')
self._messages_container = ui.column().classes(
'w-full h-40 overflow-y-auto gap-0 text-sm font-mono '
'bg-gray-50 p-2 rounded'
)
def _render_actions_panel(self) -> None:
with ui.card().classes('w-full'):
ui.label('⚡ Actions').classes('font-bold text-gray-600')
with ui.row().classes('gap-2'):
ui.button('🔄 Refresh', on_click=self._cmd_refresh)
ui.button('📢 Advert', on_click=self._cmd_send_advert)
def _render_rxlog_panel(self) -> None:
with ui.card().classes('w-full'):
ui.label('📊 RX Log').classes('font-bold text-gray-600')
self._rxlog_table = ui.table(
columns=[
{'name': 'time', 'label': 'Time', 'field': 'time'},
{'name': 'snr', 'label': 'SNR', 'field': 'snr'},
{'name': 'type', 'label': 'Type', 'field': 'type'},
],
rows=[],
).props('dense flat').classes('text-xs max-h-48 overflow-y-auto')
# ------------------------------------------------------------------
# Timer-driven UI update
# ------------------------------------------------------------------
def _update_ui(self) -> None:
"""Periodic UI refresh — called every 500 ms."""
try:
if not self._status_label or not self._device_label:
return
data = self._shared.get_snapshot()
is_first = not self._initialized
self._status_label.text = data['status']
if data['device_updated'] or is_first:
self._update_device_info(data)
if data['channels_updated'] or is_first:
self._update_channels(data)
if data['contacts_updated'] or is_first:
self._update_contacts(data)
if data['contacts'] and (
data['contacts_updated'] or not self._markers or is_first
):
self._update_map(data)
self._refresh_messages(data)
if data['rxlog_updated'] and self._rxlog_table:
self._update_rxlog(data)
self._shared.clear_update_flags()
if is_first and data['channels'] and data['contacts']:
self._initialized = True
self._shared.mark_gui_initialized()
except Exception as e:
err = str(e).lower()
if "deleted" not in err and "client" not in err:
print(f"GUI update error: {e}")
# ------------------------------------------------------------------
# Data → UI updaters
# ------------------------------------------------------------------
def _update_device_info(self, data: Dict) -> None:
lines = []
if data['name']:
lines.append(f"📡 {data['name']}")
if data['public_key']:
lines.append(f"🔑 {data['public_key'][:16]}...")
if data['radio_freq']:
lines.append(f"📻 {data['radio_freq']:.3f} MHz")
lines.append(f"⚙️ SF{data['radio_sf']} / {data['radio_bw']} kHz")
if data['tx_power']:
lines.append(f"⚡ TX: {data['tx_power']} dBm")
if data['adv_lat'] and data['adv_lon']:
lines.append(f"📍 {data['adv_lat']:.4f}, {data['adv_lon']:.4f}")
if data['firmware_version']:
lines.append(f"🏷️ {data['firmware_version']}")
self._device_label.text = "\n".join(lines) if lines else "Loading..."
def _update_channels(self, data: Dict) -> None:
if not self._channels_filter_container or not data['channels']:
return
self._channels_filter_container.clear()
self._channel_filters = {}
with self._channels_filter_container:
# BOT toggle — controls auto-reply, not display filtering
self._bot_checkbox = ui.checkbox(
'🤖 BOT',
value=data.get('bot_enabled', False),
on_change=lambda e: self._shared.set_bot_enabled(e.value),
)
# Visual separator
ui.label('').classes('text-gray-300')
# Display filters: DM + channels
cb_dm = ui.checkbox('DM', value=True)
self._channel_filters['DM'] = cb_dm
for ch in data['channels']:
cb = ui.checkbox(f"[{ch['idx']}] {ch['name']}", value=True)
self._channel_filters[ch['idx']] = cb
self._last_channels = data['channels']
if self._channel_select and data['channels']:
opts = {
ch['idx']: f"[{ch['idx']}] {ch['name']}"
for ch in data['channels']
}
self._channel_select.options = opts
if self._channel_select.value not in opts:
self._channel_select.value = list(opts.keys())[0]
self._channel_select.update()
def _update_contacts(self, data: Dict) -> None:
if not self._contacts_container:
return
self._contacts_container.clear()
with self._contacts_container:
for key, contact in data['contacts'].items():
ctype = contact.get('type', 0)
icon = TYPE_ICONS.get(ctype, '')
name = contact.get('adv_name', key[:12])
type_name = TYPE_NAMES.get(ctype, '-')
lat = contact.get('adv_lat', 0)
lon = contact.get('adv_lon', 0)
has_loc = lat != 0 or lon != 0
tooltip = (
f"{name}\nType: {type_name}\n"
f"Key: {key[:16]}...\nClick to send DM"
)
if has_loc:
tooltip += f"\nLat: {lat:.4f}\nLon: {lon:.4f}"
with ui.row().classes(
'w-full items-center gap-2 p-1 '
'hover:bg-gray-100 rounded cursor-pointer'
).on('click', lambda e, k=key, n=name: self._open_dm_dialog(k, n)):
ui.label(icon).classes('text-sm')
ui.label(name[:15]).classes(
'text-sm flex-grow truncate'
).tooltip(tooltip)
ui.label(type_name).classes('text-xs text-gray-500')
if has_loc:
ui.label('📍').classes('text-xs')
def _update_map(self, data: Dict) -> None:
if not self._map_widget:
return
for marker in self._markers:
try:
self._map_widget.remove_layer(marker)
except Exception:
pass
self._markers.clear()
if data['adv_lat'] and data['adv_lon']:
m = self._map_widget.marker(
latlng=(data['adv_lat'], data['adv_lon'])
)
self._markers.append(m)
self._map_widget.set_center((data['adv_lat'], data['adv_lon']))
for key, contact in data['contacts'].items():
lat = contact.get('adv_lat', 0)
lon = contact.get('adv_lon', 0)
if lat != 0 or lon != 0:
m = self._map_widget.marker(latlng=(lat, lon))
self._markers.append(m)
def _update_rxlog(self, data: Dict) -> None:
rows = [
{
'time': e['time'],
'snr': f"{e['snr']:.1f}",
'type': e['payload_type'],
}
for e in data['rx_log'][:20]
]
self._rxlog_table.rows = rows
self._rxlog_table.update()
def _refresh_messages(self, data: Dict) -> None:
if not self._messages_container:
return
channel_names = {ch['idx']: ch['name'] for ch in self._last_channels}
filtered = []
for orig_idx, msg in enumerate(data['messages']):
ch = msg['channel']
if ch is None:
if self._channel_filters.get('DM') and not self._channel_filters['DM'].value:
continue
else:
if ch in self._channel_filters and not self._channel_filters[ch].value:
continue
filtered.append((orig_idx, msg))
self._messages_container.clear()
with self._messages_container:
for orig_idx, msg in reversed(filtered[-50:]):
direction = '' if msg['direction'] == 'out' else ''
ch = msg['channel']
ch_label = (
f"[{channel_names.get(ch, f'ch{ch}')}]"
if ch is not None
else '[DM]'
)
sender = msg.get('sender', '')
path_len = msg.get('path_len', 0)
has_path = bool(msg.get('path_hashes'))
if msg['direction'] == 'in' and path_len > 0:
hop_tag = f' [{path_len}h{"" if has_path else ""}]'
else:
hop_tag = ''
if sender:
line = f"{msg['time']} {direction} {ch_label}{hop_tag} {sender}: {msg['text']}"
else:
line = f"{msg['time']} {direction} {ch_label}{hop_tag} {msg['text']}"
ui.label(line).classes(
'text-xs leading-tight cursor-pointer '
'hover:bg-blue-50 rounded px-1'
).on('click', lambda e, i=orig_idx: ui.navigate.to(
f'/route/{i}', new_tab=True
))
# ------------------------------------------------------------------
# DM dialog
# ------------------------------------------------------------------
def _open_dm_dialog(self, pubkey: str, contact_name: str) -> None:
with ui.dialog() as dialog, ui.card().classes('w-96'):
ui.label(f'💬 DM to {contact_name}').classes('font-bold text-lg')
msg_input = ui.input(placeholder='Type your message...').classes('w-full')
with ui.row().classes('w-full justify-end gap-2 mt-4'):
ui.button('Cancel', on_click=dialog.close).props('flat')
def send_dm():
text = msg_input.value
if text:
self._shared.put_command({
'action': 'send_dm',
'pubkey': pubkey,
'text': text,
'contact_name': contact_name,
})
dialog.close()
ui.button('Send', on_click=send_dm).classes('bg-blue-500 text-white')
dialog.open()
# ------------------------------------------------------------------
# Command helpers
# ------------------------------------------------------------------
def _send_message(self) -> None:
text = self._msg_input.value
channel = self._channel_select.value
if text:
self._shared.put_command({
'action': 'send_message',
'channel': channel,
'text': text,
})
self._msg_input.value = ''
def _cmd_send_advert(self) -> None:
self._shared.put_command({'action': 'send_advert'})
def _cmd_refresh(self) -> None:
self._shared.put_command({'action': 'refresh'})
-306
View File
@@ -1,306 +0,0 @@
"""
Route visualization page for MeshCore GUI.
Standalone NiceGUI page that opens in a new browser tab when a user
clicks on a message. Shows a Leaflet map with the message route,
a hop count summary, and a details table.
"""
from typing import Dict
from nicegui import ui
from meshcore_gui.config import TYPE_LABELS, debug_print
from meshcore_gui.route_builder import RouteBuilder
from meshcore_gui.protocols import SharedDataReadAndLookup
class RoutePage:
"""
Route visualization page rendered at ``/route/{msg_index}``.
Args:
shared: SharedDataReadAndLookup for data access and contact lookups
"""
def __init__(self, shared: SharedDataReadAndLookup) -> None:
self._shared = shared
self._builder = RouteBuilder(shared)
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def render(self, msg_index: int) -> None:
"""
Render the route page for a specific message.
Args:
msg_index: Index into SharedData.messages list
"""
data = self._shared.get_snapshot()
# Validate
if msg_index < 0 or msg_index >= len(data['messages']):
ui.label('❌ Message not found').classes('text-xl p-8')
return
msg = data['messages'][msg_index]
route = self._builder.build(msg, data)
sender = msg.get('sender', 'Unknown')
ui.page_title(f'Route — {sender}')
ui.dark_mode(False)
# Header
with ui.header().classes('bg-blue-600 text-white'):
ui.label('🗺️ MeshCore Route').classes('text-xl font-bold')
with ui.column().classes('w-full max-w-4xl mx-auto p-4 gap-4'):
self._render_message_info(msg)
self._render_hop_summary(msg, route)
self._render_map(data, route)
self._render_route_table(msg, data, route)
# ------------------------------------------------------------------
# Private — sub-sections
# ------------------------------------------------------------------
@staticmethod
def _render_message_info(msg: Dict) -> None:
"""Message header with sender name and text."""
sender = msg.get('sender', 'Unknown')
direction = '→ Sent' if msg['direction'] == 'out' else '← Received'
ui.label(f'Message Route — {sender} ({direction})').classes('font-bold text-lg')
ui.label(
f"{msg['time']} {sender}: "
f"{msg['text'][:120]}"
).classes('text-sm text-gray-600')
@staticmethod
def _render_hop_summary(msg: Dict, route: Dict) -> None:
"""Hop count banner with SNR and path source."""
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_source = route.get('path_source', 'none')
expected_repeaters = max(msg_path_len - 1, 0)
with ui.card().classes('w-full'):
with ui.row().classes('items-center gap-4'):
if msg['direction'] == 'in':
if msg_path_len == 0:
ui.label('📡 Direct (0 hops)').classes(
'text-lg font-bold text-green-600'
)
else:
hop_text = '1 hop' if msg_path_len == 1 else f'{msg_path_len} hops'
ui.label(f'📡 {hop_text}').classes(
'text-lg font-bold text-blue-600'
)
else:
ui.label('📡 Outgoing message').classes(
'text-lg font-bold text-gray-600'
)
if route['snr'] is not None:
ui.label(
f'📶 SNR: {route["snr"]:.1f} dB'
).classes('text-sm text-gray-600')
# Resolution status
if expected_repeaters > 0 and resolved_hops > 0:
source_label = (
'from received packet'
if path_source == 'rx_log'
else 'from stored contact route'
)
rpt = 'repeater' if expected_repeaters == 1 else 'repeaters'
ui.label(
f'{resolved_hops} of {expected_repeaters} '
f'{rpt} identified '
f'({source_label})'
).classes('text-xs text-gray-500 mt-1')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
f'{msg_path_len} '
f'hop{"s" if msg_path_len != 1 else ""}'
f'repeater identities not resolved'
).classes('text-xs text-gray-500 mt-1')
@staticmethod
def _render_map(data: Dict, route: Dict) -> None:
"""Leaflet map with route markers and polylines.
Lines are only drawn between nodes that are **adjacent** in the
route and both have GPS coordinates. A node without coordinates
breaks the line so that no false connections are shown.
"""
with ui.card().classes('w-full'):
if not route['has_locations']:
ui.label(
'📍 No location data available for map display'
).classes('text-gray-500 italic p-4')
return
center_lat = data['adv_lat'] or 52.5
center_lon = data['adv_lon'] or 6.0
route_map = ui.leaflet(
center=(center_lat, center_lon), zoom=10
).classes('w-full h-96')
# --- Build ordered list of positions (or None) ---
ordered = []
# Sender
if route['sender']:
s = route['sender']
if s['lat'] or s['lon']:
ordered.append((s['lat'], s['lon']))
else:
ordered.append(None)
else:
ordered.append(None)
# Repeaters
for node in route['path_nodes']:
if node['lat'] or node['lon']:
ordered.append((node['lat'], node['lon']))
else:
ordered.append(None)
# Own position (receiver)
if data['adv_lat'] or data['adv_lon']:
ordered.append((data['adv_lat'], data['adv_lon']))
else:
ordered.append(None)
# --- Place markers for all nodes with coordinates ---
all_points = [p for p in ordered if p is not None]
for lat, lon in all_points:
route_map.marker(latlng=(lat, lon))
# --- Draw line between all located nodes (skip unknowns) ---
# Nodes without coordinates are simply skipped so the line
# connects sender → known repeaters → receiver without gaps.
if len(all_points) >= 2:
route_map.generic_layer(
name='polyline',
args=[all_points, {'color': '#2563eb', 'weight': 3}],
)
# Center map on all located nodes
if all_points:
lats = [p[0] for p in all_points]
lons = [p[1] for p in all_points]
route_map.set_center(
(sum(lats) / len(lats), sum(lons) / len(lons))
)
@staticmethod
def _render_route_table(msg: Dict, data: Dict, route: Dict) -> None:
"""Route details table with sender, hops and receiver."""
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_source = route.get('path_source', 'none')
with ui.card().classes('w-full'):
ui.label('📋 Route Details').classes('font-bold text-gray-600')
rows = []
# Sender
if route['sender']:
s = route['sender']
has_loc = s['lat'] != 0 or s['lon'] != 0
rows.append({
'hop': 'Start',
'name': s['name'],
'hash': s.get('pubkey', '')[:2].upper() if s.get('pubkey') else '-',
'type': TYPE_LABELS.get(s['type'], '-'),
'location': f"{s['lat']:.4f}, {s['lon']:.4f}" if has_loc else '-',
'role': '📱 Sender',
})
else:
sender_pubkey = msg.get('sender_pubkey', '')
rows.append({
'hop': 'Start',
'name': msg.get('sender', 'Unknown'),
'hash': sender_pubkey[:2].upper() if sender_pubkey else '-',
'type': '-',
'location': '-',
'role': '📱 Sender',
})
# Resolved repeaters (from RX_LOG or out_path)
for i, node in enumerate(route['path_nodes']):
has_loc = node['lat'] != 0 or node['lon'] != 0
rows.append({
'hop': str(i + 1),
'name': node['name'],
'hash': node.get('pubkey', '')[:2].upper() if node.get('pubkey') else '-',
'type': TYPE_LABELS.get(node['type'], '-'),
'location': f"{node['lat']:.4f}, {node['lon']:.4f}" if has_loc else '-',
'role': '📡 Repeater',
})
# Placeholder rows when no path data was resolved
if not route['path_nodes'] and msg_path_len > 0:
for i in range(msg_path_len):
rows.append({
'hop': str(i + 1),
'name': '-',
'hash': '-',
'type': '-',
'location': '-',
'role': '📡 Repeater',
})
# Own position
self_has_loc = data['adv_lat'] != 0 or data['adv_lon'] != 0
rows.append({
'hop': 'End',
'name': data['name'] or 'Me',
'hash': '-',
'type': 'Companion',
'location': f"{data['adv_lat']:.4f}, {data['adv_lon']:.4f}" if self_has_loc else '-',
'role': '📱 Receiver' if msg['direction'] == 'in' else '📱 Sender',
})
ui.table(
columns=[
{'name': 'hop', 'label': 'Hop', 'field': 'hop', 'align': 'center'},
{'name': 'role', 'label': 'Role', 'field': 'role'},
{'name': 'name', 'label': 'Name', 'field': 'name'},
{'name': 'hash', 'label': 'ID', 'field': 'hash', 'align': 'center'},
{'name': 'type', 'label': 'Type', 'field': 'type'},
{'name': 'location', 'label': 'Location', 'field': 'location'},
],
rows=rows,
).props('dense flat bordered').classes('w-full')
# Footnote based on path_source
if msg_path_len == 0 and msg['direction'] == 'in':
ui.label(
'️ Direct message — no intermediate hops.'
).classes('text-xs text-gray-400 italic mt-2')
elif path_source == 'rx_log':
ui.label(
'️ Path extracted from received LoRa packet (RX_LOG). '
'Each ID is the first byte of a node\'s public key.'
).classes('text-xs text-gray-400 italic mt-2')
elif path_source == 'contact_out_path':
ui.label(
'️ Path from sender\'s stored contact route (out_path). '
'Last known route, not necessarily this message\'s path.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
'️ Repeater identities could not be resolved. '
'RX_LOG correlation may have missed the raw packet, '
'and sender has no stored out_path.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg['direction'] == 'out':
ui.label(
'️ Hop information is only available for received messages.'
).classes('text-xs text-gray-400 italic mt-2')
@@ -11,7 +11,7 @@ Usage:
python meshcore_gui.py <BLE_ADDRESS> --debug-on
Author: PE1HVH
Version: 4.0
Version: 5.0
SPDX-License-Identifier: MIT
Copyright: (c) 2026 PE1HVH
"""
@@ -20,7 +20,7 @@ import sys
from nicegui import ui
# Allow overriding DEBUG and CHANNELS_CONFIG before anything imports them
# Allow overriding DEBUG before anything imports it
import meshcore_gui.config as config
try:
@@ -30,10 +30,10 @@ except ImportError:
print("Install with: pip install meshcore")
sys.exit(1)
from meshcore_gui.ble_worker import BLEWorker
from meshcore_gui.main_page import DashboardPage
from meshcore_gui.route_page import RoutePage
from meshcore_gui.shared_data import SharedData
from meshcore_gui.ble.worker import BLEWorker
from meshcore_gui.core.shared_data import SharedData
from meshcore_gui.gui.dashboard import DashboardPage
from meshcore_gui.gui.route_page import RoutePage
# Global instances (needed by NiceGUI page decorators)
BIN
View File
Binary file not shown.
@@ -5,4 +5,4 @@ A graphical user interface for MeshCore mesh network devices,
communicating via Bluetooth Low Energy (BLE).
"""
__version__ = "4.0"
__version__ = "5.0"
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
MeshCore GUI - Threaded BLE Edition
MeshCore GUI Threaded BLE Edition
====================================
Entry point. Parses arguments, wires up the components, registers
@@ -9,9 +9,10 @@ NiceGUI pages and starts the server.
Usage:
python meshcore_gui.py <BLE_ADDRESS>
python meshcore_gui.py <BLE_ADDRESS> --debug-on
python -m meshcore_gui <BLE_ADDRESS>
Author: PE1HVH
Version: 4.0
Version: 5.0
SPDX-License-Identifier: MIT
Copyright: (c) 2026 PE1HVH
"""
@@ -20,7 +21,7 @@ import sys
from nicegui import ui
# Allow overriding DEBUG and CHANNELS_CONFIG before anything imports them
# Allow overriding DEBUG before anything imports it
import meshcore_gui.config as config
try:
@@ -30,10 +31,10 @@ except ImportError:
print("Install with: pip install meshcore")
sys.exit(1)
from meshcore_gui.ble_worker import BLEWorker
from meshcore_gui.main_page import DashboardPage
from meshcore_gui.route_page import RoutePage
from meshcore_gui.shared_data import SharedData
from meshcore_gui.ble.worker import BLEWorker
from meshcore_gui.core.shared_data import SharedData
from meshcore_gui.gui.dashboard import DashboardPage
from meshcore_gui.gui.route_page import RoutePage
# Global instances (needed by NiceGUI page decorators)
+3
View File
@@ -0,0 +1,3 @@
"""
BLE infrastructure layer — device connection, commands and events.
"""
+113
View File
@@ -0,0 +1,113 @@
"""
BLE command handlers for MeshCore GUI.
Extracted from ``BLEWorker`` so that each command is an isolated unit
of work. New commands can be registered without modifying existing
code (Open/Closed Principle).
"""
from datetime import datetime
from typing import Dict, Optional
from meshcore import MeshCore
from meshcore_gui.config import debug_print
from meshcore_gui.core.models import Message
from meshcore_gui.core.protocols import SharedDataWriter
class CommandHandler:
"""Dispatches and executes commands sent from the GUI.
Args:
mc: Connected MeshCore instance.
shared: SharedDataWriter for storing results.
"""
def __init__(self, mc: MeshCore, shared: SharedDataWriter) -> None:
self._mc = mc
self._shared = shared
# Handler registry — add new commands here (OCP)
self._handlers: Dict[str, object] = {
'send_message': self._cmd_send_message,
'send_dm': self._cmd_send_dm,
'send_advert': self._cmd_send_advert,
'refresh': self._cmd_refresh,
}
async def process_all(self) -> None:
"""Drain the command queue and dispatch each command."""
while True:
cmd = self._shared.get_next_command()
if cmd is None:
break
await self._dispatch(cmd)
async def _dispatch(self, cmd: Dict) -> None:
action = cmd.get('action')
handler = self._handlers.get(action)
if handler:
await handler(cmd)
else:
debug_print(f"Unknown command action: {action}")
# ------------------------------------------------------------------
# Individual command handlers
# ------------------------------------------------------------------
async def _cmd_send_message(self, cmd: Dict) -> None:
channel = cmd.get('channel', 0)
text = cmd.get('text', '')
is_bot = cmd.get('_bot', False)
if text:
await self._mc.commands.send_chan_msg(channel, text)
if not is_bot:
self._shared.add_message(Message(
time=datetime.now().strftime('%H:%M:%S'),
sender='Me',
text=text,
channel=channel,
direction='out',
))
debug_print(
f"{'BOT' if is_bot else 'Sent'} message to "
f"channel {channel}: {text[:30]}"
)
async def _cmd_send_dm(self, cmd: Dict) -> None:
pubkey = cmd.get('pubkey', '')
text = cmd.get('text', '')
contact_name = cmd.get('contact_name', pubkey[:8])
if text and pubkey:
await self._mc.commands.send_msg(pubkey, text)
self._shared.add_message(Message(
time=datetime.now().strftime('%H:%M:%S'),
sender='Me',
text=text,
channel=None,
direction='out',
sender_pubkey=pubkey,
))
debug_print(f"Sent DM to {contact_name}: {text[:30]}")
async def _cmd_send_advert(self, cmd: Dict) -> None:
await self._mc.commands.send_advert(flood=True)
self._shared.set_status("📢 Advert sent")
debug_print("Advert sent")
async def _cmd_refresh(self, cmd: Dict) -> None:
debug_print("Refresh requested")
# Delegate to the worker's _load_data via a callback
if self._load_data_callback:
await self._load_data_callback()
# ------------------------------------------------------------------
# Callback for refresh (set by BLEWorker after construction)
# ------------------------------------------------------------------
_load_data_callback = None
def set_load_data_callback(self, callback) -> None:
"""Register the worker's ``_load_data`` coroutine for refresh."""
self._load_data_callback = callback
+216
View File
@@ -0,0 +1,216 @@
"""
BLE event callbacks for MeshCore GUI.
Handles ``CHANNEL_MSG_RECV``, ``CONTACT_MSG_RECV`` and ``RX_LOG_DATA``
events from the MeshCore library. Extracted from ``BLEWorker`` so the
worker only deals with connection lifecycle.
"""
from datetime import datetime
from typing import Dict, Optional
from meshcore_gui.config import debug_print
from meshcore_gui.core.models import Message, RxLogEntry
from meshcore_gui.core.protocols import SharedDataWriter
from meshcore_gui.ble.packet_decoder import PacketDecoder, PayloadType
from meshcore_gui.services.bot import MeshBot
from meshcore_gui.services.dedup import DualDeduplicator
class EventHandler:
"""Processes BLE events and writes results to shared data.
Args:
shared: SharedDataWriter for storing messages and RX log.
decoder: PacketDecoder for raw LoRa packet decryption.
dedup: DualDeduplicator for message deduplication.
bot: MeshBot for auto-reply logic.
"""
def __init__(
self,
shared: SharedDataWriter,
decoder: PacketDecoder,
dedup: DualDeduplicator,
bot: MeshBot,
) -> None:
self._shared = shared
self._decoder = decoder
self._dedup = dedup
self._bot = bot
# ------------------------------------------------------------------
# RX_LOG_DATA — the single source of truth for path info
# ------------------------------------------------------------------
def on_rx_log(self, event) -> None:
"""Handle RX log data events."""
payload = event.payload
self._shared.add_rx_log(RxLogEntry(
time=datetime.now().strftime('%H:%M:%S'),
snr=payload.get('snr', 0),
rssi=payload.get('rssi', 0),
payload_type=payload.get('payload_type', '?'),
hops=payload.get('path_len', 0),
))
payload_hex = payload.get('payload', '')
if not payload_hex:
return
decoded = self._decoder.decode(payload_hex)
if decoded is None:
return
if decoded.payload_type == PayloadType.GroupText and decoded.is_decrypted:
self._dedup.mark_hash(decoded.message_hash)
self._dedup.mark_content(
decoded.sender, decoded.channel_idx, decoded.text,
)
sender_pubkey = ''
if decoded.sender:
match = self._shared.get_contact_by_name(decoded.sender)
if match:
sender_pubkey, _contact = match
snr = self._extract_snr(payload)
self._shared.add_message(Message(
time=datetime.now().strftime('%H:%M:%S'),
sender=decoded.sender,
text=decoded.text,
channel=decoded.channel_idx,
direction='in',
snr=snr,
path_len=decoded.path_length,
sender_pubkey=sender_pubkey,
path_hashes=decoded.path_hashes,
message_hash=decoded.message_hash,
))
debug_print(
f"RX_LOG → message: hash={decoded.message_hash}, "
f"sender={decoded.sender!r}, ch={decoded.channel_idx}, "
f"path={decoded.path_hashes}"
)
self._bot.check_and_reply(
sender=decoded.sender,
text=decoded.text,
channel_idx=decoded.channel_idx,
snr=snr,
path_len=decoded.path_length,
path_hashes=decoded.path_hashes,
)
# ------------------------------------------------------------------
# CHANNEL_MSG_RECV — fallback when RX_LOG decode missed it
# ------------------------------------------------------------------
def on_channel_msg(self, event) -> None:
"""Handle channel message events."""
payload = event.payload
debug_print(f"Channel msg payload keys: {list(payload.keys())}")
# Dedup via hash
msg_hash = payload.get('message_hash', '')
if msg_hash and self._dedup.is_hash_seen(msg_hash):
debug_print(f"Channel msg suppressed (hash): {msg_hash}")
return
# Parse sender from "SenderName: message body" format
raw_text = payload.get('text', '')
sender, msg_text = '', raw_text
if ': ' in raw_text:
name_part, body_part = raw_text.split(': ', 1)
sender = name_part.strip()
msg_text = body_part
elif raw_text:
msg_text = raw_text
# Dedup via content
ch_idx = payload.get('channel_idx')
if self._dedup.is_content_seen(sender, ch_idx, msg_text):
debug_print(f"Channel msg suppressed (content): {sender!r}")
return
debug_print(
f"Channel msg (fallback): sender={sender!r}, "
f"text={msg_text[:40]!r}"
)
sender_pubkey = ''
if sender:
match = self._shared.get_contact_by_name(sender)
if match:
sender_pubkey, _contact = match
snr = self._extract_snr(payload)
self._shared.add_message(Message(
time=datetime.now().strftime('%H:%M:%S'),
sender=sender,
text=msg_text,
channel=ch_idx,
direction='in',
snr=snr,
path_len=payload.get('path_len', 0),
sender_pubkey=sender_pubkey,
path_hashes=[],
message_hash=msg_hash,
))
self._bot.check_and_reply(
sender=sender,
text=msg_text,
channel_idx=ch_idx,
snr=snr,
path_len=payload.get('path_len', 0),
)
# ------------------------------------------------------------------
# CONTACT_MSG_RECV — DMs
# ------------------------------------------------------------------
def on_contact_msg(self, event) -> None:
"""Handle direct message events."""
payload = event.payload
pubkey = payload.get('pubkey_prefix', '')
debug_print(f"DM payload keys: {list(payload.keys())}")
sender = ''
if pubkey:
sender = self._shared.get_contact_name_by_prefix(pubkey)
if not sender:
sender = pubkey[:8] if pubkey else ''
self._shared.add_message(Message(
time=datetime.now().strftime('%H:%M:%S'),
sender=sender,
text=payload.get('text', ''),
channel=None,
direction='in',
snr=self._extract_snr(payload),
path_len=payload.get('path_len', 0),
sender_pubkey=pubkey,
))
debug_print(f"DM received from {sender}: {payload.get('text', '')[:30]}")
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _extract_snr(payload: Dict) -> Optional[float]:
"""Extract SNR from a payload dict (handles 'SNR' and 'snr' keys)."""
raw = payload.get('SNR') or payload.get('snr')
if raw is not None:
try:
return float(raw)
except (ValueError, TypeError):
pass
return None
+199
View File
@@ -0,0 +1,199 @@
"""
BLE communication worker for MeshCore GUI.
Runs in a separate thread with its own asyncio event loop. Connects
to the MeshCore device, wires up collaborators, and runs the command
processing loop.
Responsibilities deliberately kept narrow (SRP):
- Thread lifecycle and asyncio loop
- BLE connection and initial data loading
- Wiring CommandHandler and EventHandler
Command execution → :mod:`meshcore_gui.ble.commands`
Event handling → :mod:`meshcore_gui.ble.events`
Packet decoding → :mod:`meshcore_gui.ble.packet_decoder`
Bot logic → :mod:`meshcore_gui.services.bot`
Deduplication → :mod:`meshcore_gui.services.dedup`
"""
import asyncio
import threading
from typing import Optional
from meshcore import MeshCore, EventType
from meshcore_gui.config import CHANNELS_CONFIG, debug_print
from meshcore_gui.core.protocols import SharedDataWriter
from meshcore_gui.ble.commands import CommandHandler
from meshcore_gui.ble.events import EventHandler
from meshcore_gui.ble.packet_decoder import PacketDecoder
from meshcore_gui.services.bot import BotConfig, MeshBot
from meshcore_gui.services.dedup import DualDeduplicator
class BLEWorker:
"""BLE communication worker that runs in a separate thread.
Args:
address: BLE MAC address (e.g. ``"literal:AA:BB:CC:DD:EE:FF"``).
shared: SharedDataWriter for thread-safe communication.
"""
def __init__(self, address: str, shared: SharedDataWriter) -> None:
self.address = address
self.shared = shared
self.mc: Optional[MeshCore] = None
self.running = True
# Collaborators (created eagerly, wired after connection)
self._decoder = PacketDecoder()
self._dedup = DualDeduplicator(max_size=200)
self._bot = MeshBot(
config=BotConfig(),
command_sink=shared.put_command,
enabled_check=shared.is_bot_enabled,
)
# ------------------------------------------------------------------
# Thread lifecycle
# ------------------------------------------------------------------
def start(self) -> None:
"""Start the worker in a new daemon thread."""
thread = threading.Thread(target=self._run, daemon=True)
thread.start()
debug_print("BLE worker thread started")
def _run(self) -> None:
asyncio.run(self._async_main())
async def _async_main(self) -> None:
await self._connect()
if self.mc:
while self.running:
await self._cmd_handler.process_all()
await asyncio.sleep(0.1)
# ------------------------------------------------------------------
# Connection
# ------------------------------------------------------------------
async def _connect(self) -> None:
self.shared.set_status(f"🔄 Connecting to {self.address}...")
try:
print(f"BLE: Connecting to {self.address}...")
self.mc = await MeshCore.create_ble(self.address)
print("BLE: Connected!")
await asyncio.sleep(1)
# Wire collaborators now that mc is available
self._evt_handler = EventHandler(
shared=self.shared,
decoder=self._decoder,
dedup=self._dedup,
bot=self._bot,
)
self._cmd_handler = CommandHandler(mc=self.mc, shared=self.shared)
self._cmd_handler.set_load_data_callback(self._load_data)
# Subscribe to events
self.mc.subscribe(EventType.CHANNEL_MSG_RECV, self._evt_handler.on_channel_msg)
self.mc.subscribe(EventType.CONTACT_MSG_RECV, self._evt_handler.on_contact_msg)
self.mc.subscribe(EventType.RX_LOG_DATA, self._evt_handler.on_rx_log)
await self._load_data()
await self._load_channel_keys()
await self.mc.start_auto_message_fetching()
self.shared.set_connected(True)
self.shared.set_status("✅ Connected")
print("BLE: Ready!")
except Exception as e:
print(f"BLE: Connection error: {e}")
self.shared.set_status(f"{e}")
# ------------------------------------------------------------------
# Initial data loading
# ------------------------------------------------------------------
async def _load_data(self) -> None:
"""Load device info, channels and contacts."""
# send_appstart (retries)
self.shared.set_status("🔄 Device info...")
for i in range(5):
debug_print(f"send_appstart attempt {i + 1}")
r = await self.mc.commands.send_appstart()
if r.type != EventType.ERROR:
print(f"BLE: send_appstart OK: {r.payload.get('name')}")
self.shared.update_from_appstart(r.payload)
break
await asyncio.sleep(0.3)
# send_device_query (retries)
for i in range(5):
debug_print(f"send_device_query attempt {i + 1}")
r = await self.mc.commands.send_device_query()
if r.type != EventType.ERROR:
print(f"BLE: send_device_query OK: {r.payload.get('ver')}")
self.shared.update_from_device_query(r.payload)
break
await asyncio.sleep(0.3)
# Channels (hardcoded — BLE get_channel is unreliable)
self.shared.set_status("🔄 Channels...")
self.shared.set_channels(CHANNELS_CONFIG)
print(f"BLE: Channels loaded: {[c['name'] for c in CHANNELS_CONFIG]}")
# Contacts
self.shared.set_status("🔄 Contacts...")
r = await self.mc.commands.get_contacts()
if r.type != EventType.ERROR:
self.shared.set_contacts(r.payload)
print(f"BLE: Contacts loaded: {len(r.payload)} contacts")
async def _load_channel_keys(self) -> None:
"""Load channel decryption keys from device or derive from name.
Channels that cannot be confirmed on the device are logged with
a warning. Sending and receiving on unconfirmed channels will
likely fail because the device does not know about them.
"""
self.shared.set_status("🔄 Channel keys...")
confirmed: list[str] = []
missing: list[str] = []
for ch in CHANNELS_CONFIG:
idx, name = ch['idx'], ch['name']
loaded = False
for attempt in range(3):
try:
r = await self.mc.commands.get_channel(idx)
if r.type != EventType.ERROR:
secret = r.payload.get('channel_secret')
if secret and isinstance(secret, bytes) and len(secret) >= 16:
self._decoder.add_channel_key(idx, secret[:16])
print(f"BLE: ✅ Channel [{idx}] '{name}' — key loaded from device")
confirmed.append(f"[{idx}] {name}")
loaded = True
break
except Exception as exc:
debug_print(f"get_channel({idx}) attempt {attempt + 1} error: {exc}")
await asyncio.sleep(0.3)
if not loaded:
self._decoder.add_channel_key_from_name(idx, name)
missing.append(f"[{idx}] {name}")
print(f"BLE: ⚠️ Channel [{idx}] '{name}' — NOT found on device (key derived from name)")
if missing:
print(f"BLE: ⚠️ Channels not confirmed on device: {', '.join(missing)}")
print(f"BLE: ⚠️ Sending/receiving on these channels may not work.")
print(f"BLE: ⚠️ Check your device config with: meshcli -d <BLE_ADDRESS> → get_channels")
print(f"BLE: PacketDecoder ready — has_keys={self._decoder.has_keys}")
print(f"BLE: Confirmed: {', '.join(confirmed) if confirmed else 'none'}")
print(f"BLE: Unconfirmed: {', '.join(missing) if missing else 'none'}")
+43
View File
@@ -0,0 +1,43 @@
"""
Application configuration for MeshCore GUI.
Contains only global runtime settings and the channel table.
Bot configuration lives in :mod:`meshcore_gui.services.bot`.
UI display constants live in :mod:`meshcore_gui.gui.constants`.
The ``DEBUG`` flag defaults to False and can be activated at startup
with the ``--debug-on`` command-line option.
"""
from typing import Dict, List
# ==============================================================================
# DEBUG
# ==============================================================================
DEBUG: bool = False
def debug_print(msg: str) -> None:
"""Print a debug message when ``DEBUG`` is enabled."""
if DEBUG:
print(f"DEBUG: {msg}")
# ==============================================================================
# CHANNELS
# ==============================================================================
# Hardcoded channels configuration.
# Determine your channels with meshcli:
# meshcli -d <BLE_ADDRESS>
# > get_channels
# Output: 0: Public [...], 1: #test [...], etc.
CHANNELS_CONFIG: List[Dict] = [
{'idx': 0, 'name': 'Public'},
{'idx': 1, 'name': '#test'},
{'idx': 2, 'name': '#zwolle'},
{'idx': 3, 'name': 'RahanSom'},
{'idx': 4, 'name': '#bot'},
]
+16
View File
@@ -0,0 +1,16 @@
"""
Core domain layer — models, protocols and shared data store.
Re-exports the most commonly used names so consumers can write::
from meshcore_gui.core import SharedData, Message, RxLogEntry
"""
from meshcore_gui.core.models import ( # noqa: F401
Contact,
DeviceInfo,
Message,
RouteNode,
RxLogEntry,
)
from meshcore_gui.core.shared_data import SharedData # noqa: F401
+174
View File
@@ -0,0 +1,174 @@
"""
Domain model for MeshCore GUI.
Typed dataclasses that replace untyped Dict objects throughout the
codebase. Each class represents a core domain concept. All classes
are immutable-friendly (frozen is not used because SharedData mutates
collections, but fields are not reassigned after construction).
Migration note
~~~~~~~~~~~~~~
``SharedData.get_snapshot()`` still returns a plain dict for backward
compatibility with the NiceGUI timer loop. Inside that dict, however,
``messages`` and ``rx_log`` are now lists of dataclass instances.
UI code can access attributes directly (``msg.sender``) or fall back
to ``dataclasses.asdict(msg)`` if a plain dict is needed.
"""
from dataclasses import dataclass, field
from typing import List, Optional
# ---------------------------------------------------------------------------
# Message
# ---------------------------------------------------------------------------
@dataclass
class Message:
"""A channel message or direct message (DM).
Attributes:
time: Formatted timestamp (HH:MM:SS).
sender: Display name of the sender.
text: Message body.
channel: Channel index, or ``None`` for a DM.
direction: ``'in'`` for received, ``'out'`` for sent.
snr: Signal-to-noise ratio (dB), if available.
path_len: Hop count from the LoRa frame header.
sender_pubkey: Full public key of the sender (hex string).
path_hashes: List of 2-char hex strings, one per repeater.
message_hash: Deterministic packet identifier (hex string).
"""
time: str
sender: str
text: str
channel: Optional[int]
direction: str
snr: Optional[float] = None
path_len: int = 0
sender_pubkey: str = ""
path_hashes: List[str] = field(default_factory=list)
message_hash: str = ""
# ---------------------------------------------------------------------------
# Contact
# ---------------------------------------------------------------------------
@dataclass
class Contact:
"""A known mesh network node.
Attributes:
pubkey: Full public key (hex string).
adv_name: Advertised display name.
type: Node type (0=unknown, 1=CLI, 2=REP, 3=ROOM).
adv_lat: Advertised latitude (0.0 if unknown).
adv_lon: Advertised longitude (0.0 if unknown).
out_path: Hex string of stored route (2 hex chars per hop).
out_path_len: Number of hops in ``out_path``.
"""
pubkey: str
adv_name: str = ""
type: int = 0
adv_lat: float = 0.0
adv_lon: float = 0.0
out_path: str = ""
out_path_len: int = 0
@staticmethod
def from_dict(pubkey: str, d: dict) -> "Contact":
"""Create a Contact from a meshcore contacts dict entry."""
return Contact(
pubkey=pubkey,
adv_name=d.get("adv_name", ""),
type=d.get("type", 0),
adv_lat=d.get("adv_lat", 0.0),
adv_lon=d.get("adv_lon", 0.0),
out_path=d.get("out_path", ""),
out_path_len=d.get("out_path_len", 0),
)
# ---------------------------------------------------------------------------
# DeviceInfo
# ---------------------------------------------------------------------------
@dataclass
class DeviceInfo:
"""Radio device identification and configuration.
Attributes:
name: Device display name.
public_key: Device public key (hex string).
radio_freq: Radio frequency in MHz.
radio_sf: LoRa spreading factor.
radio_bw: Bandwidth in kHz.
tx_power: Transmit power in dBm.
adv_lat: Advertised latitude.
adv_lon: Advertised longitude.
firmware_version: Firmware version string.
"""
name: str = ""
public_key: str = ""
radio_freq: float = 0.0
radio_sf: int = 0
radio_bw: float = 0.0
tx_power: int = 0
adv_lat: float = 0.0
adv_lon: float = 0.0
firmware_version: str = ""
# ---------------------------------------------------------------------------
# RxLogEntry
# ---------------------------------------------------------------------------
@dataclass
class RxLogEntry:
"""A single RX log entry from the radio.
Attributes:
time: Formatted timestamp (HH:MM:SS).
snr: Signal-to-noise ratio (dB).
rssi: Received signal strength (dBm).
payload_type: Packet type identifier.
hops: Number of hops (path_len from frame header).
"""
time: str
snr: float = 0.0
rssi: float = 0.0
payload_type: str = "?"
hops: int = 0
# ---------------------------------------------------------------------------
# RouteNode
# ---------------------------------------------------------------------------
@dataclass
class RouteNode:
"""A node in a message route (sender, repeater or receiver).
Attributes:
name: Display name (or ``'-'`` if unknown).
lat: Latitude (0.0 if unknown).
lon: Longitude (0.0 if unknown).
type: Node type (0=unknown, 1=CLI, 2=REP, 3=ROOM).
pubkey: Public key or 2-char hash (hex string).
"""
name: str
lat: float = 0.0
lon: float = 0.0
type: int = 0
pubkey: str = ""
@property
def has_location(self) -> bool:
"""True if the node has GPS coordinates."""
return self.lat != 0 or self.lon != 0
@@ -9,10 +9,29 @@ and the Dependency Inversion Principle (DIP).
Consumers depend on these protocols rather than on the concrete
SharedData class, which makes the contracts explicit and enables
testing with lightweight stubs.
v4.1 changes
~~~~~~~~~~~~~
- Added ``CommandSink`` protocol for bot and command dispatch.
- ``SharedDataWriter.add_message`` now accepts a ``Message`` dataclass.
- ``SharedDataWriter.add_rx_log`` now accepts an ``RxLogEntry`` dataclass.
"""
from typing import Dict, List, Optional, Protocol, runtime_checkable
from meshcore_gui.core.models import Message, RxLogEntry
# ----------------------------------------------------------------------
# CommandSink — used by MeshBot and GUI pages
# ----------------------------------------------------------------------
@runtime_checkable
class CommandSink(Protocol):
"""Enqueue commands for the BLE worker."""
def put_command(self, cmd: Dict) -> None: ...
# ----------------------------------------------------------------------
# Writer — used by BLEWorker
@@ -33,8 +52,8 @@ class SharedDataWriter(Protocol):
def set_connected(self, connected: bool) -> None: ...
def set_contacts(self, contacts_dict: Dict) -> None: ...
def set_channels(self, channels: List[Dict]) -> None: ...
def add_message(self, msg: Dict) -> None: ...
def add_rx_log(self, entry: Dict) -> None: ...
def add_message(self, msg: Message) -> None: ...
def add_rx_log(self, entry: RxLogEntry) -> None: ...
def get_next_command(self) -> Optional[Dict]: ...
def get_contact_name_by_prefix(self, pubkey_prefix: str) -> str: ...
def get_contact_by_name(self, name: str) -> Optional[tuple]: ...
@@ -43,7 +62,7 @@ class SharedDataWriter(Protocol):
# ----------------------------------------------------------------------
# Reader — used by DashboardPage and RoutePage
# Reader — used by DashboardPage
# ----------------------------------------------------------------------
@runtime_checkable
@@ -70,9 +89,7 @@ class ContactLookup(Protocol):
"""Contact lookup interface used by RouteBuilder.
RouteBuilder needs to resolve public key prefixes and names
to contact records. Path hashes are always available in the
message dict (decoded from the raw packet), so no archive
lookup is needed.
to contact records.
"""
def get_contact_by_prefix(self, pubkey_prefix: str) -> Optional[Dict]: ...
@@ -5,67 +5,46 @@ SharedData is the central data store shared between the BLE worker thread
and the GUI main thread. All access goes through methods that acquire a
threading.Lock so both threads can safely read and write.
Single-source architecture
~~~~~~~~~~~~~~~~~~~~~~~~~~
Path data (repeater hashes) is embedded in each message dict at creation
time decoded from the raw LoRa packet via ``meshcoredecoder``. There
is no temporal archive or deferred matching.
v4.1 changes
~~~~~~~~~~~~~
- ``messages`` is now ``List[Message]`` (was ``List[Dict]``).
- ``rx_log`` is now ``List[RxLogEntry]`` (was ``List[Dict]``).
- ``DeviceInfo`` dataclass replaces loose scalar fields.
- ``get_snapshot()`` returns typed objects; UI code accesses attributes
directly (``msg.sender``) instead of dict keys (``msg['sender']``).
"""
import queue
import threading
from dataclasses import asdict
from typing import Dict, List, Optional, Tuple
from meshcore_gui.config import debug_print
from meshcore_gui.core.models import DeviceInfo, Message, RxLogEntry
class SharedData:
"""
Thread-safe container for shared data between BLE worker and GUI.
Attributes:
lock: Threading lock for thread-safe access
name: Device name
public_key: Device public key
radio_freq: Radio frequency in MHz
radio_sf: Spreading factor
radio_bw: Bandwidth in kHz
tx_power: Transmit power in dBm
adv_lat: Advertised latitude
adv_lon: Advertised longitude
firmware_version: Firmware version string
connected: Whether device is connected
status: Status text for UI
contacts: Dict of contacts {key: {adv_name, type, lat, lon, }}
channels: List of channels [{idx, name}, ]
messages: List of messages
rx_log: List of RX log entries
Implements all four Protocol interfaces defined in ``protocols.py``.
"""
def __init__(self) -> None:
"""Initialize SharedData with empty values and flags set to True."""
self.lock = threading.Lock()
# Device info
self.name: str = ""
self.public_key: str = ""
self.radio_freq: float = 0.0
self.radio_sf: int = 0
self.radio_bw: float = 0.0
self.tx_power: int = 0
self.adv_lat: float = 0.0
self.adv_lon: float = 0.0
self.firmware_version: str = ""
# Device info (typed)
self.device = DeviceInfo()
# Connection status
self.connected: bool = False
self.status: str = "Starting..."
# Data collections
# Data collections (typed)
self.contacts: Dict = {}
self.channels: List[Dict] = []
self.messages: List[Dict] = []
self.rx_log: List[Dict] = []
self.messages: List[Message] = []
self.rx_log: List[RxLogEntry] = []
# Command queue (GUI → BLE)
self.cmd_queue: queue.Queue = queue.Queue()
@@ -89,35 +68,36 @@ class SharedData:
def update_from_appstart(self, payload: Dict) -> None:
"""Update device info from send_appstart response."""
with self.lock:
self.name = payload.get('name', self.name)
self.public_key = payload.get('public_key', self.public_key)
self.radio_freq = payload.get('radio_freq', self.radio_freq)
self.radio_sf = payload.get('radio_sf', self.radio_sf)
self.radio_bw = payload.get('radio_bw', self.radio_bw)
self.tx_power = payload.get('tx_power', self.tx_power)
self.adv_lat = payload.get('adv_lat', self.adv_lat)
self.adv_lon = payload.get('adv_lon', self.adv_lon)
d = self.device
d.name = payload.get('name', d.name)
d.public_key = payload.get('public_key', d.public_key)
d.radio_freq = payload.get('radio_freq', d.radio_freq)
d.radio_sf = payload.get('radio_sf', d.radio_sf)
d.radio_bw = payload.get('radio_bw', d.radio_bw)
d.tx_power = payload.get('tx_power', d.tx_power)
d.adv_lat = payload.get('adv_lat', d.adv_lat)
d.adv_lon = payload.get('adv_lon', d.adv_lon)
self.device_updated = True
debug_print(f"Device info updated: {self.name}")
debug_print(f"Device info updated: {d.name}")
def update_from_device_query(self, payload: Dict) -> None:
"""Update firmware version from send_device_query response."""
with self.lock:
self.firmware_version = payload.get('ver', self.firmware_version)
self.device.firmware_version = payload.get(
'ver', self.device.firmware_version,
)
self.device_updated = True
debug_print(f"Firmware version: {self.firmware_version}")
debug_print(f"Firmware version: {self.device.firmware_version}")
# ------------------------------------------------------------------
# Status
# ------------------------------------------------------------------
def set_status(self, status: str) -> None:
"""Update status text."""
with self.lock:
self.status = status
def set_connected(self, connected: bool) -> None:
"""Update connection status."""
with self.lock:
self.connected = connected
@@ -126,13 +106,11 @@ class SharedData:
# ------------------------------------------------------------------
def set_bot_enabled(self, enabled: bool) -> None:
"""Toggle the BOT on or off."""
with self.lock:
self.bot_enabled = enabled
debug_print(f"BOT {'enabled' if enabled else 'disabled'}")
def is_bot_enabled(self) -> bool:
"""Return whether the BOT is currently enabled."""
with self.lock:
return self.bot_enabled
@@ -141,16 +119,9 @@ class SharedData:
# ------------------------------------------------------------------
def put_command(self, cmd: Dict) -> None:
"""Enqueue a command for the BLE worker."""
self.cmd_queue.put(cmd)
def get_next_command(self) -> Optional[Dict]:
"""
Dequeue the next command, or return None if the queue is empty.
Returns:
Command dictionary, or None.
"""
try:
return self.cmd_queue.get_nowait()
except queue.Empty:
@@ -161,39 +132,29 @@ class SharedData:
# ------------------------------------------------------------------
def set_contacts(self, contacts_dict: Dict) -> None:
"""Replace the contacts dictionary."""
with self.lock:
self.contacts = contacts_dict.copy()
self.contacts_updated = True
debug_print(f"Contacts updated: {len(self.contacts)} contacts")
def set_channels(self, channels: List[Dict]) -> None:
"""Replace the channels list."""
with self.lock:
self.channels = channels.copy()
self.channels_updated = True
debug_print(f"Channels updated: {[c['name'] for c in channels]}")
def add_message(self, msg: Dict) -> None:
"""
Add a message to the messages list (max 100).
Args:
msg: Message dict with keys: time, sender, text, channel,
direction, path_len, path_hashes, message_hash,
sender_pubkey, snr
"""
def add_message(self, msg: Message) -> None:
"""Add a Message to the store (max 100)."""
with self.lock:
self.messages.append(msg)
if len(self.messages) > 100:
self.messages.pop(0)
debug_print(
f"Message added: {msg.get('sender', '?')}: "
f"{msg.get('text', '')[:30]}"
f"Message added: {msg.sender}: {msg.text[:30]}"
)
def add_rx_log(self, entry: Dict) -> None:
"""Add an RX log entry (max 50, newest first)."""
def add_rx_log(self, entry: RxLogEntry) -> None:
"""Add an RxLogEntry (max 50, newest first)."""
with self.lock:
self.rx_log.insert(0, entry)
if len(self.rx_log) > 50:
@@ -205,24 +166,34 @@ class SharedData:
# ------------------------------------------------------------------
def get_snapshot(self) -> Dict:
"""Create a complete snapshot of all data for the GUI."""
"""Create a complete snapshot of all data for the GUI.
Returns a plain dict with typed objects inside. The
``messages`` and ``rx_log`` values are lists of dataclass
instances (not dicts).
"""
with self.lock:
d = self.device
return {
'name': self.name,
'public_key': self.public_key,
'radio_freq': self.radio_freq,
'radio_sf': self.radio_sf,
'radio_bw': self.radio_bw,
'tx_power': self.tx_power,
'adv_lat': self.adv_lat,
'adv_lon': self.adv_lon,
'firmware_version': self.firmware_version,
# DeviceInfo fields (flat for backward compat)
'name': d.name,
'public_key': d.public_key,
'radio_freq': d.radio_freq,
'radio_sf': d.radio_sf,
'radio_bw': d.radio_bw,
'tx_power': d.tx_power,
'adv_lat': d.adv_lat,
'adv_lon': d.adv_lon,
'firmware_version': d.firmware_version,
# Status
'connected': self.connected,
'status': self.status,
# Collections (typed copies)
'contacts': self.contacts.copy(),
'channels': self.channels.copy(),
'messages': self.messages.copy(),
'rx_log': self.rx_log.copy(),
# Flags
'device_updated': self.device_updated,
'contacts_updated': self.contacts_updated,
'channels_updated': self.channels_updated,
@@ -232,7 +203,6 @@ class SharedData:
}
def clear_update_flags(self) -> None:
"""Reset all update flags to False."""
with self.lock:
self.device_updated = False
self.contacts_updated = False
@@ -240,7 +210,6 @@ class SharedData:
self.rxlog_updated = False
def mark_gui_initialized(self) -> None:
"""Mark that the GUI has completed its first render."""
with self.lock:
self.gui_initialized = True
debug_print("GUI marked as initialized")
@@ -250,18 +219,8 @@ class SharedData:
# ------------------------------------------------------------------
def get_contact_by_prefix(self, pubkey_prefix: str) -> Optional[Dict]:
"""
Look up a contact by public key prefix.
Used by route visualization to resolve pubkey prefixes (from
messages and out_path) to full contact records.
Returns:
Copy of the contact dictionary, or None if not found.
"""
if not pubkey_prefix:
return None
with self.lock:
for key, contact in self.contacts.items():
if key.startswith(pubkey_prefix) or pubkey_prefix.startswith(key):
@@ -269,60 +228,34 @@ class SharedData:
return None
def get_contact_name_by_prefix(self, pubkey_prefix: str) -> str:
"""
Look up a contact name by public key prefix.
Returns:
The contact's adv_name, or the first 8 chars of the prefix
if not found, or empty string if prefix is empty.
"""
if not pubkey_prefix:
return ""
with self.lock:
for key, contact in self.contacts.items():
if key.startswith(pubkey_prefix):
name = contact.get('adv_name', '')
if name:
return name
return pubkey_prefix[:8]
# ------------------------------------------------------------------
# Contact lookup by name
# ------------------------------------------------------------------
def get_contact_by_name(self, name: str) -> Optional[Tuple[str, Dict]]:
"""
Look up a contact by advertised name.
Tries in order: exact match case-insensitive startswith
(either direction, to handle truncated names).
Returns:
``(pubkey, contact_dict)`` tuple, or ``None`` if no match.
"""
if not name:
return None
with self.lock:
# Strategy 1: exact match
for key, contact in self.contacts.items():
if contact.get('adv_name', '') == name:
return (key, contact.copy())
# Strategy 2: case-insensitive
name_lower = name.lower()
for key, contact in self.contacts.items():
if contact.get('adv_name', '').lower() == name_lower:
return (key, contact.copy())
# Strategy 3: one name starts with the other
# Strategy 3: prefix match
for key, contact in self.contacts.items():
adv = contact.get('adv_name', '')
if not adv:
continue
if name.startswith(adv) or adv.startswith(name):
return (key, contact.copy())
return None
+3
View File
@@ -0,0 +1,3 @@
"""
Presentation layer — NiceGUI pages and panels.
"""
+11
View File
@@ -0,0 +1,11 @@
"""
Display constants for the GUI layer.
Contact type → icon/name/label mappings used by multiple panels.
"""
from typing import Dict
TYPE_ICONS: Dict[int, str] = {0: "", 1: "📱", 2: "📡", 3: "🏠"}
TYPE_NAMES: Dict[int, str] = {0: "-", 1: "CLI", 2: "REP", 3: "ROOM"}
TYPE_LABELS: Dict[int, str] = {0: "-", 1: "Companion", 2: "Repeater", 3: "Room Server"}
+165
View File
@@ -0,0 +1,165 @@
"""
Main dashboard page for MeshCore GUI.
Thin orchestrator that owns the layout and the 500 ms update timer.
All visual content is delegated to individual panel classes in
:mod:`meshcore_gui.gui.panels`.
"""
import logging
from nicegui import ui
from meshcore_gui.core.protocols import SharedDataReader
from meshcore_gui.gui.panels import (
ActionsPanel,
ContactsPanel,
DevicePanel,
FilterPanel,
InputPanel,
MapPanel,
MessagesPanel,
RxLogPanel,
)
# Suppress the harmless "Client has been deleted" warning that NiceGUI
# emits when a browser tab is refreshed while a ui.timer is active.
class _DeletedClientFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return 'Client has been deleted' not in record.getMessage()
logging.getLogger('nicegui').addFilter(_DeletedClientFilter())
class DashboardPage:
"""Main dashboard rendered at ``/``.
Args:
shared: SharedDataReader for data access and command dispatch.
"""
def __init__(self, shared: SharedDataReader) -> None:
self._shared = shared
# Panels (created fresh on each render)
self._device: DevicePanel | None = None
self._contacts: ContactsPanel | None = None
self._map: MapPanel | None = None
self._input: InputPanel | None = None
self._filter: FilterPanel | None = None
self._messages: MessagesPanel | None = None
self._actions: ActionsPanel | None = None
self._rxlog: RxLogPanel | None = None
# Header status label
self._status_label = None
# Local first-render flag
self._initialized: bool = False
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def render(self) -> None:
"""Build the complete dashboard layout and start the timer."""
self._initialized = False
# Create panel instances
put_cmd = self._shared.put_command
self._device = DevicePanel()
self._contacts = ContactsPanel(put_cmd)
self._map = MapPanel()
self._input = InputPanel(put_cmd)
self._filter = FilterPanel(self._shared.set_bot_enabled)
self._messages = MessagesPanel()
self._actions = ActionsPanel(put_cmd)
self._rxlog = RxLogPanel()
ui.dark_mode(False)
# Header
with ui.header().classes('bg-blue-600 text-white'):
ui.label('🔗 MeshCore').classes('text-xl font-bold')
ui.space()
self._status_label = ui.label('Starting...').classes('text-sm')
# Three-column layout
with ui.row().classes('w-full h-full gap-2 p-2'):
# Left column
with ui.column().classes('w-64 gap-2'):
self._device.render()
self._contacts.render()
# Centre column
with ui.column().classes('flex-grow gap-2'):
self._map.render()
self._input.render()
self._filter.render()
self._messages.render()
# Right column
with ui.column().classes('w-64 gap-2'):
self._actions.render()
self._rxlog.render()
# Start update timer
ui.timer(0.5, self._update_ui)
# ------------------------------------------------------------------
# Timer-driven UI update
# ------------------------------------------------------------------
def _update_ui(self) -> None:
try:
if not self._status_label:
return
data = self._shared.get_snapshot()
is_first = not self._initialized
# Always update status
self._status_label.text = data['status']
# Device info
if data['device_updated'] or is_first:
self._device.update(data)
# Channels → filter checkboxes + input dropdown
if data['channels_updated'] or is_first:
self._filter.update(data)
self._input.update_channel_options(data['channels'])
# Contacts
if data['contacts_updated'] or is_first:
self._contacts.update(data)
# Map
if data['contacts'] and (
data['contacts_updated'] or not self._map.has_markers or is_first
):
self._map.update(data)
# Messages (always — for live filter changes)
self._messages.update(
data,
self._filter.channel_filters,
self._filter.last_channels,
)
# RX Log
if data['rxlog_updated']:
self._rxlog.update(data)
# Clear flags and mark initialised
self._shared.clear_update_flags()
if is_first and data['channels'] and data['contacts']:
self._initialized = True
self._shared.mark_gui_initialized()
except Exception as e:
err = str(e).lower()
if "deleted" not in err and "client" not in err:
print(f"GUI update error: {e}")
+16
View File
@@ -0,0 +1,16 @@
"""
Individual dashboard panels — each panel is a single-responsibility class.
Re-exports all panels for convenient importing::
from meshcore_gui.gui.panels import DevicePanel, ContactsPanel, ...
"""
from meshcore_gui.gui.panels.device_panel import DevicePanel # noqa: F401
from meshcore_gui.gui.panels.contacts_panel import ContactsPanel # noqa: F401
from meshcore_gui.gui.panels.map_panel import MapPanel # noqa: F401
from meshcore_gui.gui.panels.input_panel import InputPanel # noqa: F401
from meshcore_gui.gui.panels.filter_panel import FilterPanel # noqa: F401
from meshcore_gui.gui.panels.messages_panel import MessagesPanel # noqa: F401
from meshcore_gui.gui.panels.actions_panel import ActionsPanel # noqa: F401
from meshcore_gui.gui.panels.rxlog_panel import RxLogPanel # noqa: F401
+29
View File
@@ -0,0 +1,29 @@
"""Actions panel — refresh and advertise buttons."""
from typing import Callable, Dict
from nicegui import ui
class ActionsPanel:
"""Action buttons in the right column.
Args:
put_command: Callable to enqueue a command dict for the BLE worker.
"""
def __init__(self, put_command: Callable[[Dict], None]) -> None:
self._put_command = put_command
def render(self) -> None:
with ui.card().classes('w-full'):
ui.label('⚡ Actions').classes('font-bold text-gray-600')
with ui.row().classes('gap-2'):
ui.button('🔄 Refresh', on_click=self._refresh)
ui.button('📢 Advert', on_click=self._advert)
def _refresh(self) -> None:
self._put_command({'action': 'refresh'})
def _advert(self) -> None:
self._put_command({'action': 'send_advert'})
+87
View File
@@ -0,0 +1,87 @@
"""Contacts panel — list of known mesh nodes with click-to-DM."""
from typing import Callable, Dict
from nicegui import ui
from meshcore_gui.gui.constants import TYPE_ICONS, TYPE_NAMES
class ContactsPanel:
"""Displays contacts in the left column. Click opens a DM dialog.
Args:
put_command: Callable to enqueue a command dict for the BLE worker.
"""
def __init__(self, put_command: Callable[[Dict], None]) -> None:
self._put_command = put_command
self._container = None
def render(self) -> None:
with ui.card().classes('w-full'):
ui.label('👥 Contacts').classes('font-bold text-gray-600')
self._container = ui.column().classes(
'w-full gap-1 max-h-96 overflow-y-auto'
)
def update(self, data: Dict) -> None:
if not self._container:
return
self._container.clear()
with self._container:
for key, contact in data['contacts'].items():
ctype = contact.get('type', 0)
icon = TYPE_ICONS.get(ctype, '')
name = contact.get('adv_name', key[:12])
type_name = TYPE_NAMES.get(ctype, '-')
lat = contact.get('adv_lat', 0)
lon = contact.get('adv_lon', 0)
has_loc = lat != 0 or lon != 0
tooltip = (
f"{name}\nType: {type_name}\n"
f"Key: {key[:16]}...\nClick to send DM"
)
if has_loc:
tooltip += f"\nLat: {lat:.4f}\nLon: {lon:.4f}"
with ui.row().classes(
'w-full items-center gap-2 p-1 '
'hover:bg-gray-100 rounded cursor-pointer'
).on('click', lambda e, k=key, n=name: self._open_dm_dialog(k, n)):
ui.label(icon).classes('text-sm')
ui.label(name[:15]).classes(
'text-sm flex-grow truncate'
).tooltip(tooltip)
ui.label(type_name).classes('text-xs text-gray-500')
if has_loc:
ui.label('📍').classes('text-xs')
# ------------------------------------------------------------------
# DM dialog
# ------------------------------------------------------------------
def _open_dm_dialog(self, pubkey: str, contact_name: str) -> None:
with ui.dialog() as dialog, ui.card().classes('w-96'):
ui.label(f'💬 DM to {contact_name}').classes('font-bold text-lg')
msg_input = ui.input(placeholder='Type your message...').classes('w-full')
with ui.row().classes('w-full justify-end gap-2 mt-4'):
ui.button('Cancel', on_click=dialog.close).props('flat')
def send_dm():
text = msg_input.value
if text:
self._put_command({
'action': 'send_dm',
'pubkey': pubkey,
'text': text,
'contact_name': contact_name,
})
dialog.close()
ui.button('Send', on_click=send_dm).classes('bg-blue-500 text-white')
dialog.open()
+40
View File
@@ -0,0 +1,40 @@
"""Device information panel — radio name, frequency, location, firmware."""
from typing import Dict
from nicegui import ui
class DevicePanel:
"""Displays device info in the left column."""
def __init__(self) -> None:
self._label = None
def render(self) -> None:
with ui.card().classes('w-full'):
ui.label('📡 Device').classes('font-bold text-gray-600')
self._label = ui.label('Connecting...').classes(
'text-sm whitespace-pre-line'
)
def update(self, data: Dict) -> None:
if not self._label:
return
lines = []
if data['name']:
lines.append(f"📡 {data['name']}")
if data['public_key']:
lines.append(f"🔑 {data['public_key'][:16]}...")
if data['radio_freq']:
lines.append(f"📻 {data['radio_freq']:.3f} MHz")
lines.append(f"⚙️ SF{data['radio_sf']} / {data['radio_bw']} kHz")
if data['tx_power']:
lines.append(f"⚡ TX: {data['tx_power']} dBm")
if data['adv_lat'] and data['adv_lon']:
lines.append(f"📍 {data['adv_lat']:.4f}, {data['adv_lon']:.4f}")
if data['firmware_version']:
lines.append(f"🏷️ {data['firmware_version']}")
self._label.text = "\n".join(lines) if lines else "Loading..."
+61
View File
@@ -0,0 +1,61 @@
"""Filter panel — channel filter checkboxes and bot toggle."""
from typing import Callable, Dict, List
from nicegui import ui
class FilterPanel:
"""Channel filter checkboxes and bot on/off toggle.
Args:
set_bot_enabled: Callable to toggle the bot in SharedData.
"""
def __init__(self, set_bot_enabled: Callable[[bool], None]) -> None:
self._set_bot_enabled = set_bot_enabled
self._container = None
self._bot_checkbox = None
self._channel_filters: Dict = {}
self._last_channels: List[Dict] = []
@property
def channel_filters(self) -> Dict:
"""Current filter checkboxes (key: channel idx or ``'DM'``)."""
return self._channel_filters
@property
def last_channels(self) -> List[Dict]:
"""Channel list from the most recent update."""
return self._last_channels
def render(self) -> None:
with ui.card().classes('w-full'):
with ui.row().classes('w-full items-center gap-4 justify-center'):
ui.label('📻 Filter:').classes('text-sm text-gray-600')
self._container = ui.row().classes('gap-4')
def update(self, data: Dict) -> None:
"""Rebuild checkboxes when channel data changes."""
if not self._container or not data['channels']:
return
self._container.clear()
self._channel_filters = {}
with self._container:
self._bot_checkbox = ui.checkbox(
'🤖 BOT',
value=data.get('bot_enabled', False),
on_change=lambda e: self._set_bot_enabled(e.value),
)
ui.label('').classes('text-gray-300')
cb_dm = ui.checkbox('DM', value=True)
self._channel_filters['DM'] = cb_dm
for ch in data['channels']:
cb = ui.checkbox(f"[{ch['idx']}] {ch['name']}", value=True)
self._channel_filters[ch['idx']] = cb
self._last_channels = data['channels']
+59
View File
@@ -0,0 +1,59 @@
"""Input panel — message input field, channel selector and send button."""
from typing import Callable, Dict, List
from nicegui import ui
class InputPanel:
"""Message composition panel in the centre column.
Args:
put_command: Callable to enqueue a command dict for the BLE worker.
"""
def __init__(self, put_command: Callable[[Dict], None]) -> None:
self._put_command = put_command
self._msg_input = None
self._channel_select = None
@property
def channel_select(self):
"""Expose channel_select so FilterPanel can update its options."""
return self._channel_select
def render(self) -> None:
with ui.card().classes('w-full'):
with ui.row().classes('w-full items-center gap-2'):
self._msg_input = ui.input(
placeholder='Message...'
).classes('flex-grow')
self._channel_select = ui.select(
options={0: '[0] Public'}, value=0
).classes('w-32')
ui.button(
'Send', on_click=self._send_message
).classes('bg-blue-500 text-white')
def update_channel_options(self, channels: List[Dict]) -> None:
"""Update the channel dropdown options."""
if not self._channel_select or not channels:
return
opts = {ch['idx']: f"[{ch['idx']}] {ch['name']}" for ch in channels}
self._channel_select.options = opts
if self._channel_select.value not in opts:
self._channel_select.value = list(opts.keys())[0]
self._channel_select.update()
def _send_message(self) -> None:
text = self._msg_input.value
channel = self._channel_select.value
if text:
self._put_command({
'action': 'send_message',
'channel': channel,
'text': text,
})
self._msg_input.value = ''
+49
View File
@@ -0,0 +1,49 @@
"""Map panel — Leaflet map with own position and contact markers."""
from typing import Dict, List
from nicegui import ui
class MapPanel:
"""Interactive Leaflet map in the centre column."""
def __init__(self) -> None:
self._map = None
self._markers: List = []
@property
def has_markers(self) -> bool:
return bool(self._markers)
def render(self) -> None:
with ui.card().classes('w-full'):
self._map = ui.leaflet(
center=(52.5, 6.0), zoom=9
).classes('w-full h-72')
def update(self, data: Dict) -> None:
if not self._map:
return
# Remove old markers
for marker in self._markers:
try:
self._map.remove_layer(marker)
except Exception:
pass
self._markers.clear()
# Own position
if data['adv_lat'] and data['adv_lon']:
m = self._map.marker(latlng=(data['adv_lat'], data['adv_lon']))
self._markers.append(m)
self._map.set_center((data['adv_lat'], data['adv_lon']))
# Contact markers
for key, contact in data['contacts'].items():
lat = contact.get('adv_lat', 0)
lon = contact.get('adv_lon', 0)
if lat != 0 or lon != 0:
m = self._map.marker(latlng=(lat, lon))
self._markers.append(m)
+89
View File
@@ -0,0 +1,89 @@
"""Messages panel — filtered message display with route navigation."""
from typing import Dict, List
from nicegui import ui
from meshcore_gui.core.models import Message
class MessagesPanel:
"""Displays filtered messages in the centre column.
Messages are filtered based on channel checkboxes managed by
:class:`~meshcore_gui.gui.panels.filter_panel.FilterPanel`.
"""
def __init__(self) -> None:
self._container = None
def render(self) -> None:
with ui.card().classes('w-full'):
ui.label('💬 Messages').classes('font-bold text-gray-600')
self._container = ui.column().classes(
'w-full h-40 overflow-y-auto gap-0 text-sm font-mono '
'bg-gray-50 p-2 rounded'
)
def update(
self,
data: Dict,
channel_filters: Dict,
last_channels: List[Dict],
) -> None:
"""Refresh messages applying current filter state.
Args:
data: Snapshot dict from SharedData.
channel_filters: ``{channel_idx: checkbox, 'DM': checkbox}``
from FilterPanel.
last_channels: Channel list from FilterPanel.
"""
if not self._container:
return
channel_names = {ch['idx']: ch['name'] for ch in last_channels}
messages: List[Message] = data['messages']
# Apply filters
filtered = []
for orig_idx, msg in enumerate(messages):
if msg.channel is None:
if channel_filters.get('DM') and not channel_filters['DM'].value:
continue
else:
if msg.channel in channel_filters and not channel_filters[msg.channel].value:
continue
filtered.append((orig_idx, msg))
# Rebuild
self._container.clear()
with self._container:
for orig_idx, msg in reversed(filtered[-50:]):
direction = '' if msg.direction == 'out' else ''
ch_label = (
f"[{channel_names.get(msg.channel, f'ch{msg.channel}')}]"
if msg.channel is not None
else '[DM]'
)
path_len = msg.path_len
has_path = bool(msg.path_hashes)
if msg.direction == 'in' and path_len > 0:
hop_tag = f' [{path_len}h{"" if has_path else ""}]'
else:
hop_tag = ''
if msg.sender:
line = f"{msg.time} {direction} {ch_label}{hop_tag} {msg.sender}: {msg.text}"
else:
line = f"{msg.time} {direction} {ch_label}{hop_tag} {msg.text}"
ui.label(line).classes(
'text-xs leading-tight cursor-pointer '
'hover:bg-blue-50 rounded px-1'
).on('click', lambda e, i=orig_idx: ui.navigate.to(
f'/route/{i}', new_tab=True
))
+41
View File
@@ -0,0 +1,41 @@
"""RX log panel — table of recently received packets."""
from typing import Dict, List
from nicegui import ui
from meshcore_gui.core.models import RxLogEntry
class RxLogPanel:
"""RX log table in the right column."""
def __init__(self) -> None:
self._table = None
def render(self) -> None:
with ui.card().classes('w-full'):
ui.label('📊 RX Log').classes('font-bold text-gray-600')
self._table = ui.table(
columns=[
{'name': 'time', 'label': 'Time', 'field': 'time'},
{'name': 'snr', 'label': 'SNR', 'field': 'snr'},
{'name': 'type', 'label': 'Type', 'field': 'type'},
],
rows=[],
).props('dense flat').classes('text-xs max-h-48 overflow-y-auto')
def update(self, data: Dict) -> None:
if not self._table:
return
entries: List[RxLogEntry] = data['rx_log'][:20]
rows = [
{
'time': e.time,
'snr': f"{e.snr:.1f}",
'type': e.payload_type,
}
for e in entries
]
self._table.rows = rows
self._table.update()
@@ -4,15 +4,22 @@ Route visualization page for MeshCore GUI.
Standalone NiceGUI page that opens in a new browser tab when a user
clicks on a message. Shows a Leaflet map with the message route,
a hop count summary, and a details table.
v4.1 changes
~~~~~~~~~~~~~
- Uses :class:`~meshcore_gui.models.Message` and
:class:`~meshcore_gui.models.RouteNode` instead of plain dicts.
"""
from typing import Dict
from typing import Dict, List
from nicegui import ui
from meshcore_gui.config import TYPE_LABELS, debug_print
from meshcore_gui.route_builder import RouteBuilder
from meshcore_gui.protocols import SharedDataReadAndLookup
from meshcore_gui.gui.constants import TYPE_LABELS
from meshcore_gui.config import debug_print
from meshcore_gui.core.models import Message, RouteNode
from meshcore_gui.services.route_builder import RouteBuilder
from meshcore_gui.core.protocols import SharedDataReadAndLookup
class RoutePage:
@@ -32,27 +39,19 @@ class RoutePage:
# ------------------------------------------------------------------
def render(self, msg_index: int) -> None:
"""
Render the route page for a specific message.
Args:
msg_index: Index into SharedData.messages list
"""
data = self._shared.get_snapshot()
messages: List[Message] = data['messages']
# Validate
if msg_index < 0 or msg_index >= len(data['messages']):
if msg_index < 0 or msg_index >= len(messages):
ui.label('❌ Message not found').classes('text-xl p-8')
return
msg = data['messages'][msg_index]
msg = messages[msg_index]
route = self._builder.build(msg, data)
sender = msg.get('sender', 'Unknown')
ui.page_title(f'Route — {sender}')
ui.page_title(f'Route — {msg.sender or "Unknown"}')
ui.dark_mode(False)
# Header
with ui.header().classes('bg-blue-600 text-white'):
ui.label('🗺️ MeshCore Route').classes('text-xl font-bold')
@@ -68,27 +67,25 @@ class RoutePage:
# ------------------------------------------------------------------
@staticmethod
def _render_message_info(msg: Dict) -> None:
"""Message header with sender name and text."""
sender = msg.get('sender', 'Unknown')
direction = '→ Sent' if msg['direction'] == 'out' else '← Received'
def _render_message_info(msg: Message) -> None:
sender = msg.sender or 'Unknown'
direction = '→ Sent' if msg.direction == 'out' else '← Received'
ui.label(f'Message Route — {sender} ({direction})').classes('font-bold text-lg')
ui.label(
f"{msg['time']} {sender}: "
f"{msg['text'][:120]}"
f"{msg.time} {sender}: {msg.text[:120]}"
).classes('text-sm text-gray-600')
@staticmethod
def _render_hop_summary(msg: Dict, route: Dict) -> None:
"""Hop count banner with SNR and path source."""
def _render_hop_summary(msg: Message, route: Dict) -> None:
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_nodes: List[RouteNode] = route['path_nodes']
resolved_hops = len(path_nodes)
path_source = route.get('path_source', 'none')
expected_repeaters = max(msg_path_len - 1, 0)
with ui.card().classes('w-full'):
with ui.row().classes('items-center gap-4'):
if msg['direction'] == 'in':
if msg.direction == 'in':
if msg_path_len == 0:
ui.label('📡 Direct (0 hops)').classes(
'text-lg font-bold text-green-600'
@@ -108,7 +105,6 @@ class RoutePage:
f'📶 SNR: {route["snr"]:.1f} dB'
).classes('text-sm text-gray-600')
# Resolution status
if expected_repeaters > 0 and resolved_hops > 0:
source_label = (
'from received packet'
@@ -118,8 +114,7 @@ class RoutePage:
rpt = 'repeater' if expected_repeaters == 1 else 'repeaters'
ui.label(
f'{resolved_hops} of {expected_repeaters} '
f'{rpt} identified '
f'({source_label})'
f'{rpt} identified ({source_label})'
).classes('text-xs text-gray-500 mt-1')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
@@ -130,12 +125,7 @@ class RoutePage:
@staticmethod
def _render_map(data: Dict, route: Dict) -> None:
"""Leaflet map with route markers and polylines.
Lines are only drawn between nodes that are **adjacent** in the
route and both have GPS coordinates. A node without coordinates
breaks the line so that no false connections are shown.
"""
"""Leaflet map with route markers and polylines."""
with ui.card().classes('w-full'):
if not route['has_locations']:
ui.label(
@@ -150,47 +140,34 @@ class RoutePage:
center=(center_lat, center_lon), zoom=10
).classes('w-full h-96')
# --- Build ordered list of positions (or None) ---
# Build ordered list of positions (or None)
ordered = []
# Sender
if route['sender']:
s = route['sender']
if s['lat'] or s['lon']:
ordered.append((s['lat'], s['lon']))
else:
ordered.append(None)
sender: RouteNode = route['sender']
if sender:
ordered.append((sender.lat, sender.lon) if sender.has_location else None)
else:
ordered.append(None)
# Repeaters
for node in route['path_nodes']:
if node['lat'] or node['lon']:
ordered.append((node['lat'], node['lon']))
else:
ordered.append(None)
ordered.append((node.lat, node.lon) if node.has_location else None)
# Own position (receiver)
if data['adv_lat'] or data['adv_lon']:
ordered.append((data['adv_lat'], data['adv_lon']))
self_node: RouteNode = route['self_node']
if self_node.has_location:
ordered.append((self_node.lat, self_node.lon))
else:
ordered.append(None)
# --- Place markers for all nodes with coordinates ---
all_points = [p for p in ordered if p is not None]
for lat, lon in all_points:
route_map.marker(latlng=(lat, lon))
# --- Draw line between all located nodes (skip unknowns) ---
# Nodes without coordinates are simply skipped so the line
# connects sender → known repeaters → receiver without gaps.
if len(all_points) >= 2:
route_map.generic_layer(
name='polyline',
args=[all_points, {'color': '#2563eb', 'weight': 3}],
)
# Center map on all located nodes
if all_points:
lats = [p[0] for p in all_points]
lons = [p[1] for p in all_points]
@@ -199,10 +176,10 @@ class RoutePage:
)
@staticmethod
def _render_route_table(msg: Dict, data: Dict, route: Dict) -> None:
"""Route details table with sender, hops and receiver."""
def _render_route_table(msg: Message, data: Dict, route: Dict) -> None:
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_nodes: List[RouteNode] = route['path_nodes']
resolved_hops = len(path_nodes)
path_source = route.get('path_source', 'none')
with ui.card().classes('w-full'):
@@ -211,61 +188,55 @@ class RoutePage:
rows = []
# Sender
if route['sender']:
s = route['sender']
has_loc = s['lat'] != 0 or s['lon'] != 0
sender: RouteNode = route['sender']
if sender:
rows.append({
'hop': 'Start',
'name': s['name'],
'hash': s.get('pubkey', '')[:2].upper() if s.get('pubkey') else '-',
'type': TYPE_LABELS.get(s['type'], '-'),
'location': f"{s['lat']:.4f}, {s['lon']:.4f}" if has_loc else '-',
'name': sender.name,
'hash': sender.pubkey[:2].upper() if sender.pubkey else '-',
'type': TYPE_LABELS.get(sender.type, '-'),
'location': f"{sender.lat:.4f}, {sender.lon:.4f}" if sender.has_location else '-',
'role': '📱 Sender',
})
else:
sender_pubkey = msg.get('sender_pubkey', '')
rows.append({
'hop': 'Start',
'name': msg.get('sender', 'Unknown'),
'hash': sender_pubkey[:2].upper() if sender_pubkey else '-',
'name': msg.sender or 'Unknown',
'hash': msg.sender_pubkey[:2].upper() if msg.sender_pubkey else '-',
'type': '-',
'location': '-',
'role': '📱 Sender',
})
# Resolved repeaters (from RX_LOG or out_path)
for i, node in enumerate(route['path_nodes']):
has_loc = node['lat'] != 0 or node['lon'] != 0
# Repeaters
for i, node in enumerate(path_nodes):
rows.append({
'hop': str(i + 1),
'name': node['name'],
'hash': node.get('pubkey', '')[:2].upper() if node.get('pubkey') else '-',
'type': TYPE_LABELS.get(node['type'], '-'),
'location': f"{node['lat']:.4f}, {node['lon']:.4f}" if has_loc else '-',
'name': node.name,
'hash': node.pubkey[:2].upper() if node.pubkey else '-',
'type': TYPE_LABELS.get(node.type, '-'),
'location': f"{node.lat:.4f}, {node.lon:.4f}" if node.has_location else '-',
'role': '📡 Repeater',
})
# Placeholder rows when no path data was resolved
if not route['path_nodes'] and msg_path_len > 0:
# Placeholder rows
if not path_nodes and msg_path_len > 0:
for i in range(msg_path_len):
rows.append({
'hop': str(i + 1),
'name': '-',
'hash': '-',
'type': '-',
'location': '-',
'role': '📡 Repeater',
'name': '-', 'hash': '-', 'type': '-',
'location': '-', 'role': '📡 Repeater',
})
# Own position
self_has_loc = data['adv_lat'] != 0 or data['adv_lon'] != 0
self_node: RouteNode = route['self_node']
rows.append({
'hop': 'End',
'name': data['name'] or 'Me',
'name': self_node.name,
'hash': '-',
'type': 'Companion',
'location': f"{data['adv_lat']:.4f}, {data['adv_lon']:.4f}" if self_has_loc else '-',
'role': '📱 Receiver' if msg['direction'] == 'in' else '📱 Sender',
'location': f"{self_node.lat:.4f}, {self_node.lon:.4f}" if self_node.has_location else '-',
'role': '📱 Receiver' if msg.direction == 'in' else '📱 Sender',
})
ui.table(
@@ -280,8 +251,8 @@ class RoutePage:
rows=rows,
).props('dense flat bordered').classes('w-full')
# Footnote based on path_source
if msg_path_len == 0 and msg['direction'] == 'in':
# Footnotes
if msg_path_len == 0 and msg.direction == 'in':
ui.label(
'️ Direct message — no intermediate hops.'
).classes('text-xs text-gray-400 italic mt-2')
@@ -297,32 +268,25 @@ class RoutePage:
).classes('text-xs text-gray-400 italic mt-2')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
'️ Repeater identities could not be resolved. '
'RX_LOG correlation may have missed the raw packet, '
'and sender has no stored out_path.'
'️ Repeater identities could not be resolved.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg['direction'] == 'out':
elif msg.direction == 'out':
ui.label(
'️ Hop information is only available for received messages.'
).classes('text-xs text-gray-400 italic mt-2')
def _render_send_panel(
self, msg: Dict, route: Dict, data: Dict,
self, msg: Message, route: Dict, data: Dict,
) -> None:
"""Send widget pre-filled with route acknowledgement message."""
sender = msg.get('sender', 'Unknown')
path_len = route['msg_path_len']
path_hashes = msg.get('path_hashes', [])
path_hashes = msg.path_hashes
# Build pre-filled message:
# @SenderName Received in Zwolle path(3); B8>7B>F5
parts = [f"@[{sender}] Received in Zwolle path({path_len})"]
parts = [f"@[{msg.sender or 'Unknown'}] Received in Zwolle path({msg.path_len})"]
if path_hashes:
path_str = '>'.join(h.upper() for h in path_hashes)
parts.append(f"; {path_str}")
prefilled = ''.join(parts)
# Channel options
ch_options = {
ch['idx']: f"[{ch['idx']}] {ch['name']}"
for ch in data['channels']
@@ -332,13 +296,8 @@ class RoutePage:
with ui.card().classes('w-full'):
ui.label('📤 Reply').classes('font-bold text-gray-600')
with ui.row().classes('w-full items-center gap-2'):
msg_input = ui.input(
value=prefilled,
).classes('flex-grow')
ch_select = ui.select(
options=ch_options, value=default_ch,
).classes('w-32')
msg_input = ui.input(value=prefilled).classes('flex-grow')
ch_select = ui.select(options=ch_options, value=default_ch).classes('w-32')
def send(inp=msg_input, sel=ch_select):
text = inp.value
@@ -350,6 +309,4 @@ class RoutePage:
})
inp.value = ''
ui.button(
'Send', on_click=send,
).classes('bg-blue-500 text-white')
ui.button('Send', on_click=send).classes('bg-blue-500 text-white')
+3
View File
@@ -0,0 +1,3 @@
"""
Business logic services — bot, deduplication and route building.
"""
+195
View File
@@ -0,0 +1,195 @@
"""
Keyword-triggered auto-reply bot for MeshCore GUI.
Extracted from BLEWorker to satisfy the Single Responsibility Principle.
The bot listens on a configured channel and replies to messages that
contain recognised keywords.
Open/Closed
~~~~~~~~~~~
New keywords are added via ``BotConfig.keywords`` (data) without
modifying the ``MeshBot`` class (code). Custom matching strategies
can be implemented by subclassing and overriding ``_match_keyword``.
"""
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional
from meshcore_gui.config import debug_print
# ==============================================================================
# Bot defaults (previously in config.py)
# ==============================================================================
# Channel indices the bot listens on (must match CHANNELS_CONFIG).
BOT_CHANNELS: frozenset = frozenset({1, 4}) # #test, #bot
# Display name prepended to every bot reply.
BOT_NAME: str = "Zwolle Bot"
# Minimum seconds between two bot replies (prevents reply-storms).
BOT_COOLDOWN_SECONDS: float = 5.0
# Keyword → reply template mapping.
# Available variables: {bot}, {sender}, {snr}, {path}
# The bot checks whether the incoming message text *contains* the keyword
# (case-insensitive). First match wins.
BOT_KEYWORDS: Dict[str, str] = {
'test': '{bot}: {sender}, rcvd | SNR {snr} | {path}',
'ping': '{bot}: Pong!',
'help': '{bot}: test, ping, help',
}
@dataclass
class BotConfig:
"""Configuration for :class:`MeshBot`.
Attributes:
channels: Channel indices to listen on.
name: Display name prepended to replies.
cooldown_seconds: Minimum seconds between replies.
keywords: Keyword → reply template mapping.
"""
channels: frozenset = field(default_factory=lambda: frozenset(BOT_CHANNELS))
name: str = BOT_NAME
cooldown_seconds: float = BOT_COOLDOWN_SECONDS
keywords: Dict[str, str] = field(default_factory=lambda: dict(BOT_KEYWORDS))
class MeshBot:
"""Keyword-triggered auto-reply bot.
The bot checks incoming messages against a set of keyword → template
pairs. When a keyword is found (case-insensitive substring match,
first match wins), the template is expanded and queued as a channel
message via *command_sink*.
Args:
config: Bot configuration.
command_sink: Callable that enqueues a command dict for the
BLE worker (typically ``shared.put_command``).
enabled_check: Callable that returns ``True`` when the bot is
enabled (typically ``shared.is_bot_enabled``).
"""
def __init__(
self,
config: BotConfig,
command_sink: Callable[[Dict], None],
enabled_check: Callable[[], bool],
) -> None:
self._config = config
self._sink = command_sink
self._enabled = enabled_check
self._last_reply: float = 0.0
def check_and_reply(
self,
sender: str,
text: str,
channel_idx: Optional[int],
snr: Optional[float],
path_len: int,
path_hashes: Optional[List[str]] = None,
) -> None:
"""Evaluate an incoming message and queue a reply if appropriate.
Guards (in order):
1. Bot is enabled (checkbox in GUI).
2. Message is on the configured channel.
3. Sender is not the bot itself.
4. Sender name does not end with ``'Bot'`` (prevent loops).
5. Cooldown period has elapsed.
6. Message text contains a recognised keyword.
"""
# Guard 1: enabled?
if not self._enabled():
return
# Guard 2: correct channel?
if channel_idx not in self._config.channels:
return
# Guard 3: own messages?
if sender == "Me" or (text and text.startswith(self._config.name)):
return
# Guard 4: other bots?
if sender and sender.rstrip().lower().endswith("bot"):
debug_print(f"BOT: skipping message from other bot '{sender}'")
return
# Guard 5: cooldown?
now = time.time()
if now - self._last_reply < self._config.cooldown_seconds:
debug_print("BOT: cooldown active, skipping")
return
# Guard 6: keyword match
template = self._match_keyword(text)
if template is None:
return
# Build reply
path_str = self._format_path(path_len, path_hashes)
snr_str = f"{snr:.1f}" if snr is not None else "?"
reply = template.format(
bot=self._config.name,
sender=sender or "?",
snr=snr_str,
path=path_str,
)
self._last_reply = now
self._sink({
"action": "send_message",
"channel": channel_idx,
"text": reply,
"_bot": True,
})
debug_print(f"BOT: queued reply to '{sender}': {reply}")
# ------------------------------------------------------------------
# Extension point (OCP)
# ------------------------------------------------------------------
def _match_keyword(self, text: str) -> Optional[str]:
"""Return the reply template for the first matching keyword.
Override this method for custom matching strategies (regex,
exact match, priority ordering, etc.).
Returns:
Template string, or ``None`` if no keyword matched.
"""
text_lower = (text or "").lower()
for keyword, template in self._config.keywords.items():
if keyword in text_lower:
return template
return None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _format_path(
path_len: int,
path_hashes: Optional[List[str]],
) -> str:
"""Format path info as ``path(N); 8D>A8`` or ``path(0)``."""
if not path_len:
return "path(0)"
if not path_hashes:
return f"path({path_len})"
hop_names = [h.upper() for h in path_hashes if h and len(h) >= 2]
if hop_names:
return f"path({path_len}); {'>'.join(hop_names)}"
return f"path({path_len})"
+108
View File
@@ -0,0 +1,108 @@
"""
Message deduplication for MeshCore GUI.
Extracted from BLEWorker to satisfy the Single Responsibility Principle.
Provides bounded-size deduplication via message hash and content keys.
Two strategies are used because the two event sources carry different
identifiers:
1. **Hash-based** — ``RX_LOG_DATA`` events produce a deterministic
``message_hash``. When ``CHANNEL_MSG_RECV`` arrives for the same
packet, it is suppressed.
2. **Content-based** — ``CHANNEL_MSG_RECV`` events do *not* include
``message_hash``, so a composite key of ``channel:sender:text`` is
used as a fallback.
Both stores are bounded to prevent unbounded memory growth.
"""
from collections import OrderedDict
class MessageDeduplicator:
"""Bounded-size message deduplication store.
Uses an :class:`OrderedDict` as an LRU-style bounded set.
Oldest entries are evicted when the store exceeds ``max_size``.
Args:
max_size: Maximum number of keys to retain. 200 is generous
for the typical message rate of a mesh network.
"""
def __init__(self, max_size: int = 200) -> None:
self._max = max_size
self._seen: OrderedDict[str, None] = OrderedDict()
def is_seen(self, key: str) -> bool:
"""Check if a key has already been recorded."""
return key in self._seen
def mark(self, key: str) -> None:
"""Record a key. Evicts the oldest entry if at capacity."""
if key in self._seen:
# Move to end (most recent)
self._seen.move_to_end(key)
return
self._seen[key] = None
while len(self._seen) > self._max:
self._seen.popitem(last=False)
def clear(self) -> None:
"""Remove all recorded keys."""
self._seen.clear()
def __len__(self) -> int:
return len(self._seen)
class DualDeduplicator:
"""Combined hash-based and content-based deduplication.
Wraps two :class:`MessageDeduplicator` instances — one for
message hashes and one for content keys — behind a single
interface.
Args:
max_size: Maximum entries per store.
"""
def __init__(self, max_size: int = 200) -> None:
self._by_hash = MessageDeduplicator(max_size)
self._by_content = MessageDeduplicator(max_size)
# -- Hash-based --
def mark_hash(self, message_hash: str) -> None:
"""Record a message hash as processed."""
if message_hash:
self._by_hash.mark(message_hash)
def is_hash_seen(self, message_hash: str) -> bool:
"""Check if a message hash has already been processed."""
return bool(message_hash) and self._by_hash.is_seen(message_hash)
# -- Content-based --
def mark_content(self, sender: str, channel, text: str) -> None:
"""Record a content key as processed."""
key = self._content_key(sender, channel, text)
self._by_content.mark(key)
def is_content_seen(self, sender: str, channel, text: str) -> bool:
"""Check if a content key has already been processed."""
key = self._content_key(sender, channel, text)
return self._by_content.is_seen(key)
# -- Bulk --
def clear(self) -> None:
"""Clear both stores."""
self._by_hash.clear()
self._by_content.clear()
@staticmethod
def _content_key(sender: str, channel, text: str) -> str:
return f"{channel}:{sender}:{text}"
@@ -5,28 +5,18 @@ Pure data logic — no UI code. Given a message and a data snapshot, this
module constructs a route dictionary that describes the path the message
has taken through the mesh network (sender repeaters receiver).
Path data sources (in priority order):
1. **path_hashes** (from the message) decoded from the raw LoRa
packet by ``meshcoredecoder`` via ``RX_LOG_DATA``. Each entry is a
2-char hex string representing the first byte of a repeater's public
key. Always available when the packet was successfully decrypted
(single-source architecture).
2. **out_path** (from the sender's contact record) — hex string where
each byte (2 hex chars) is the first byte of a repeater's public
key. Only available for known contacts with a stored route. This
is the *last known* route to/from that contact, not necessarily the
route of *this* message.
3. **path_len only** hop count from the message frame. Always
available for received messages but contains no repeater identities.
v4.1 changes
~~~~~~~~~~~~~
- ``build()`` now accepts a :class:`~meshcore_gui.models.Message`
dataclass instead of a plain dict.
- Route nodes returned as :class:`~meshcore_gui.models.RouteNode`.
"""
from typing import Dict, List, Optional
from meshcore_gui.config import debug_print
from meshcore_gui.protocols import ContactLookup
from meshcore_gui.core.models import Message, RouteNode
from meshcore_gui.core.protocols import ContactLookup
class RouteBuilder:
@@ -42,20 +32,19 @@ class RouteBuilder:
def __init__(self, shared: ContactLookup) -> None:
self._shared = shared
def build(self, msg: Dict, data: Dict) -> Dict:
def build(self, msg: Message, data: Dict) -> Dict:
"""
Build route data for a single message.
Args:
msg: Message dict (must contain 'sender_pubkey', may contain
'path_len', 'snr' and 'path_hashes')
data: Snapshot dictionary from SharedData.get_snapshot()
msg: Message dataclass instance.
data: Snapshot dictionary from SharedData.get_snapshot().
Returns:
Dictionary with keys:
sender: {name, lat, lon, type, pubkey} or None
self_node: {name, lat, lon}
path_nodes: [{name, lat, lon, type, pubkey}, ]
sender: RouteNode or None
self_node: RouteNode
path_nodes: List[RouteNode]
snr: float or None
msg_path_len: int hop count from the message itself
has_locations: bool True if any node has GPS coords
@@ -63,20 +52,20 @@ class RouteBuilder:
"""
result: Dict = {
'sender': None,
'self_node': {
'name': data['name'] or 'Me',
'lat': data['adv_lat'],
'lon': data['adv_lon'],
},
'self_node': RouteNode(
name=data['name'] or 'Me',
lat=data['adv_lat'],
lon=data['adv_lon'],
),
'path_nodes': [],
'snr': msg.get('snr'),
'msg_path_len': msg.get('path_len', 0),
'snr': msg.snr,
'msg_path_len': msg.path_len,
'has_locations': False,
'path_source': 'none',
}
# Look up sender in contacts
pubkey = msg.get('sender_pubkey', '')
pubkey = msg.sender_pubkey
contact: Optional[Dict] = None
debug_print(
@@ -91,50 +80,37 @@ class RouteBuilder:
f"{'FOUND ' + contact.get('adv_name', '?') if contact else 'NOT FOUND'}"
)
if contact:
result['sender'] = {
'name': contact.get('adv_name', pubkey[:8]),
'lat': contact.get('adv_lat', 0),
'lon': contact.get('adv_lon', 0),
'type': contact.get('type', 0),
'pubkey': pubkey,
}
debug_print(
f"Route build: sender hash will be "
f"{pubkey[:2].upper()!r}"
result['sender'] = RouteNode(
name=contact.get('adv_name', pubkey[:8]),
lat=contact.get('adv_lat', 0),
lon=contact.get('adv_lon', 0),
type=contact.get('type', 0),
pubkey=pubkey,
)
else:
# Deferred sender lookup: try fuzzy name match
# Use sender_full (untruncated) if available, fall back to sender
sender_name = msg.get('sender_full') or msg.get('sender', '')
sender_name = msg.sender
if sender_name:
match = self._shared.get_contact_by_name(sender_name)
if match:
pubkey, contact_data = match
contact = contact_data
result['sender'] = {
'name': contact_data.get('adv_name', pubkey[:8]),
'lat': contact_data.get('adv_lat', 0),
'lon': contact_data.get('adv_lon', 0),
'type': contact_data.get('type', 0),
'pubkey': pubkey,
}
result['sender'] = RouteNode(
name=contact_data.get('adv_name', pubkey[:8]),
lat=contact_data.get('adv_lat', 0),
lon=contact_data.get('adv_lon', 0),
type=contact_data.get('type', 0),
pubkey=pubkey,
)
debug_print(
f"Route build: deferred name lookup "
f"'{sender_name}' → pubkey={pubkey[:16]!r}, "
f"hash={pubkey[:2].upper()!r}"
f"'{sender_name}' → pubkey={pubkey[:16]!r}"
)
else:
debug_print(
f"Route build: deferred name lookup "
f"'{sender_name}' → NOT FOUND"
)
else:
debug_print("Route build: sender_pubkey is EMPTY, no name → hash will be '-'")
# --- Resolve path nodes (priority order) ---
# Priority 1: path_hashes from RX_LOG decode (single-source)
rx_hashes = msg.get('path_hashes', [])
# Priority 1: path_hashes from RX_LOG decode
rx_hashes = msg.path_hashes
if rx_hashes:
result['path_nodes'] = self._resolve_hashes(
@@ -165,15 +141,12 @@ class RouteBuilder:
result['path_source'] = 'contact_out_path'
# Determine if any node has GPS coordinates
all_points = [result['self_node']]
all_nodes: List[RouteNode] = [result['self_node']]
if result['sender']:
all_points.append(result['sender'])
all_points.extend(result['path_nodes'])
all_nodes.append(result['sender'])
all_nodes.extend(result['path_nodes'])
result['has_locations'] = any(
p.get('lat', 0) != 0 or p.get('lon', 0) != 0
for p in all_points
)
result['has_locations'] = any(n.has_location for n in all_nodes)
return result
@@ -185,18 +158,9 @@ class RouteBuilder:
def _resolve_hashes(
hashes: List[str],
contacts: Dict,
) -> List[Dict]:
"""
Resolve a list of 1-byte path hashes into hop node dicts.
Args:
hashes: List of 2-char hex strings (e.g. ["8d", "a8"])
contacts: Contacts dictionary from snapshot
Returns:
List of hop node dicts.
"""
nodes: List[Dict] = []
) -> List[RouteNode]:
"""Resolve a list of 1-byte path hashes into RouteNode objects."""
nodes: List[RouteNode] = []
for hop_hash in hashes:
if not hop_hash or len(hop_hash) < 2:
@@ -207,21 +171,18 @@ class RouteBuilder:
)
if hop_contact:
nodes.append({
'name': hop_contact.get('adv_name', f'0x{hop_hash}'),
'lat': hop_contact.get('adv_lat', 0),
'lon': hop_contact.get('adv_lon', 0),
'type': hop_contact.get('type', 0),
'pubkey': hop_hash,
})
nodes.append(RouteNode(
name=hop_contact.get('adv_name', f'0x{hop_hash}'),
lat=hop_contact.get('adv_lat', 0),
lon=hop_contact.get('adv_lon', 0),
type=hop_contact.get('type', 0),
pubkey=hop_hash,
))
else:
nodes.append({
'name': '-',
'lat': 0,
'lon': 0,
'type': 0,
'pubkey': hop_hash,
})
nodes.append(RouteNode(
name='-',
pubkey=hop_hash,
))
return nodes
@@ -230,18 +191,10 @@ class RouteBuilder:
out_path: str,
out_path_len: int,
contacts: Dict,
) -> List[Dict]:
"""
Parse out_path hex string into a list of hop nodes.
Each byte (2 hex chars) in out_path is the first byte of a
repeater's public key.
Returns:
List of hop node dicts.
"""
) -> List[RouteNode]:
"""Parse out_path hex string into a list of RouteNode objects."""
hashes: List[str] = []
hop_hex_len = 2 # 1 byte = 2 hex chars
hop_hex_len = 2
for i in range(0, min(len(out_path), out_path_len * 2), hop_hex_len):
hop_hash = out_path[i:i + hop_hex_len]
@@ -254,12 +207,6 @@ class RouteBuilder:
def _find_contact_by_pubkey_hash(
hash_hex: str, contacts: Dict,
) -> Optional[Dict]:
"""
Find a contact whose pubkey starts with the given 1-byte hash.
Note: with only 256 possible values, collisions are possible
when there are many contacts. Returns the first match.
"""
hash_hex = hash_hex.lower()
for pubkey, contact in contacts.items():
if pubkey.lower().startswith(hash_hex):