Route page added

This commit is contained in:
pe1hvh
2026-02-05 07:17:27 +01:00
parent dcb3037db7
commit cfd2d39b25
12 changed files with 1309 additions and 116 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ Usage:
python meshcore_gui.py <BLE_ADDRESS> --debug-on
Author: PE1HVH
Version: 3.2
Version: 4.0
SPDX-License-Identifier: MIT
Copyright: (c) 2026 PE1HVH
"""
+108
View File
@@ -0,0 +1,108 @@
# MeshCore GUI v4.0 — Changelog
## Probleem dat is opgelost
Bij het opstarten van de GUI bufferde het systeem RX_LOG-pakketten die tijdens de
initialisatiefase binnenkwamen (send_appstart, send_device_query, get_contacts).
Deze pakketten werden later foutief gekoppeld aan het eerste echte channel message,
wat resulteerde in onmogelijke route-informatie (bijv. "22 of 7 repeaters identified").
Daarnaast gebruikte de oude code SNR-matching om RX_LOG-entries te koppelen aan
channel messages. Dit werkte niet betrouwbaar omdat het companion protocol en
RX_LOG verschillende SNR-waarden rapporteren voor hetzelfde fysieke pakket
(bijv. companion=13.5, RX_LOG=14.0).
## Drie defensieve lagen
### Laag 1: Startup buffer clearing (`ble_worker.py:126-135`)
Na `start_auto_message_fetching()` worden de RX path buffer én het SharedData
archive geleegd. Alle pakketten die tijdens de init-fase zijn binnengekomen
worden weggegooid voordat het eerste echte channel message kan arriveren.
```python
await self.mc.start_auto_message_fetching()
self._rx_path_buffer.clear()
self.shared.clear_rx_archive()
```
### Laag 2: Path_len sanity checks (alle matching paden)
In elke matching-methode wordt gecontroleerd of het aantal hashes in een RX_LOG
entry niet wild afwijkt van het door het companion protocol gerapporteerde hop
count. De margin is configureerbaar via `PATH_LEN_SANITY_MARGIN = 5`.
Aanwezig in:
- Forward matching (`ble_worker.py:366`)
- Retroactive matching (`ble_worker.py:304`)
- Archive matching (`shared_data.py:354`)
Voorbeeld: een 7-hop bericht accepteert maximaal 12 hashes (7 + 5).
Een entry met 22 hashes wordt geweigerd.
### Laag 3: Display-time guard (`route_page.py:89`)
Laatste vangnet bij het renderen van de route-pagina. Als het aantal resolved
hops meer is dan 2× de path_len, wordt de match als false positive beschouwd
en verworpen.
```python
if msg_path_len > 0 and resolved_hops > 2 * msg_path_len:
resolved_hops = 0
route['path_nodes'] = []
```
## Andere wijzigingen
- **Temporal matching i.p.v. SNR** — Alle RX_LOG correlatie gebruikt nu
tijdproximiteit (binnen 3 seconden) in plaats van SNR-vergelijking.
SNR wordt nog steeds weergegeven in de UI maar niet meer gebruikt voor matching.
- **`PATH_LEN_SANITY_MARGIN`** als configureerbare constante in `config.py`
## Bestanden gewijzigd
| Bestand | Wijziging |
|---------|-----------|
| `meshcore_gui.py` | Versie → 4.0 |
| `meshcore_gui/__init__.py` | `__version__` → "4.0" |
| `meshcore_gui/config.py` | + `PATH_LEN_SANITY_MARGIN = 5` |
| `meshcore_gui/ble_worker.py` | Startup clear, temporal matching, path_len checks |
| `meshcore_gui/shared_data.py` | `clear_rx_archive()`, `find_rx_path()` met path_len check |
| `meshcore_gui/route_page.py` | Display-time sanity guard |
| `meshcore_gui/route_builder.py` | Geeft `msg_path_len` door aan archive lookup |
| `meshcore_gui/protocols.py` | + `clear_rx_archive()` in Writer protocol |
## Installatie
Vervang je huidige bestanden:
```bash
# Backup
cp -r meshcore_gui meshcore_gui.bak
cp meshcore_gui.py meshcore_gui.py.bak
# Vervang
cp -r meshcore-gui-v4.0/meshcore_gui ./
cp meshcore-gui-v4.0/meshcore_gui.py ./
```
## Verwacht gedrag na update
Met `--debug-on` zie je nu bij opstarten:
```
DEBUG: Startup buffer+archive cleared — only post-init packets will be matched
BLE: Ready!
```
En bij een 7-hop bericht met correcte match:
```
DEBUG: Forward match: dt=0.42s, hashes=7
```
In plaats van het oude gedrag:
```
DEBUG: No RX_LOG match: msg_snr=13.5, buffer_snrs=[14.0, 14.0, ...]
DEBUG: RX archive match: snr=14.0, hashes=[...22 total], time_diff=0.81s
```
+1 -1
View File
@@ -5,4 +5,4 @@ A graphical user interface for MeshCore mesh network devices,
communicating via Bluetooth Low Energy (BLE).
"""
__version__ = "3.1"
__version__ = "4.0"
+287 -21
View File
@@ -4,19 +4,50 @@ BLE communication worker for MeshCore GUI.
Runs in a separate thread with its own asyncio event loop. Connects to
the MeshCore device, subscribes to events, and processes commands sent
from the GUI via the SharedData command queue.
Single-source architecture
~~~~~~~~~~~~~~~~~~~~~~~~~~
When a LoRa packet arrives the companion firmware pushes two events:
1. ``RX_LOG_DATA`` — the *raw* LoRa frame with header, path hashes
and encrypted payload.
2. ``CHANNEL_MSG_RECV`` — the *decrypted* message text but **no** path
hashes (only the hop count ``path_len``).
This module uses ``meshcoredecoder`` to fully decode the raw packet
from (1): message_hash, path_hashes, sender name, message text and
channel index are all extracted from that **single frame**.
The ``CHANNEL_MSG_RECV`` event (2) serves only as a fallback for
packets that could not be decrypted from the raw frame (e.g. missing
channel key).
Deduplication is done via ``message_hash``: if the same hash has
already been processed from RX_LOG_DATA, the CHANNEL_MSG_RECV event
is silently dropped.
There is **no temporal correlation**, no ring buffer, no archive, and
no sanity-margin heuristics.
"""
import asyncio
import threading
from datetime import datetime
from typing import Dict, Optional
from typing import Dict, List, Optional, Set
from meshcore import MeshCore, EventType
from meshcore_gui.config import CHANNELS_CONFIG, debug_print
from meshcore_gui.packet_parser import PacketDecoder, PayloadType
from meshcore_gui.protocols import SharedDataWriter
# Maximum number of message_hashes kept for deduplication.
# Oldest entries are evicted first. 200 is generous for the
# typical message rate of a mesh network.
_SEEN_HASHES_MAX = 200
class BLEWorker:
"""
BLE communication worker that runs in a separate thread.
@@ -34,6 +65,22 @@ class BLEWorker:
self.mc: Optional[MeshCore] = None
self.running = True
# Packet decoder (channel keys loaded at startup)
self._decoder = PacketDecoder()
# Deduplication: message_hash values already processed via
# RX_LOG_DATA decode. When CHANNEL_MSG_RECV arrives for the
# same packet, it is silently dropped.
#
# Two dedup strategies:
# 1. message_hash (from decoded packet)
# 2. content key (sender:channel:text) — because CHANNEL_MSG_RECV
# does NOT include message_hash in its payload
self._seen_hashes: Set[str] = set()
self._seen_hashes_order: List[str] = []
self._seen_content: Set[str] = set()
self._seen_content_order: List[str] = []
# ------------------------------------------------------------------
# Thread lifecycle
# ------------------------------------------------------------------
@@ -77,6 +124,7 @@ class BLEWorker:
self.mc.subscribe(EventType.RX_LOG_DATA, self._on_rx_log)
await self._load_data()
await self._load_channel_keys()
await self.mc.start_auto_message_fetching()
self.shared.set_connected(True)
@@ -127,6 +175,95 @@ class BLEWorker:
self.shared.set_contacts(r.payload)
print(f"BLE: Contacts loaded: {len(r.payload)} contacts")
async def _load_channel_keys(self) -> None:
"""
Load channel decryption keys for packet decoding.
Strategy per channel:
1. Try ``get_channel(idx)`` from the device (returns the
authoritative 16-byte ``channel_secret``).
2. If that fails, derive the key from the channel name via
``SHA-256(name)[:16]``. This is correct for channels whose
name starts with ``#`` (like ``#test``). For other channels
the derived key may be wrong, but decryption will simply fail
gracefully.
"""
self.shared.set_status("🔄 Channel keys...")
for ch in CHANNELS_CONFIG:
idx = ch['idx']
name = ch['name']
loaded = False
# Strategy 1: get_channel from device (3 retries)
for attempt in range(3):
try:
r = await self.mc.commands.get_channel(idx)
if r.type != EventType.ERROR:
secret = r.payload.get('channel_secret')
if secret and isinstance(secret, bytes) and len(secret) >= 16:
self._decoder.add_channel_key(idx, secret[:16])
print(
f"BLE: Channel key [{idx}] '{name}' "
f"loaded from device"
)
loaded = True
break
except Exception as exc:
debug_print(
f"get_channel({idx}) attempt {attempt + 1} "
f"error: {exc}"
)
await asyncio.sleep(0.3)
# Strategy 2: derive from name
if not loaded:
self._decoder.add_channel_key_from_name(idx, name)
print(
f"BLE: Channel key [{idx}] '{name}' "
f"derived from name (fallback)"
)
print(
f"BLE: PacketDecoder ready — "
f"has_keys={self._decoder.has_keys}"
)
# ------------------------------------------------------------------
# Deduplication
# ------------------------------------------------------------------
def _mark_seen(self, message_hash: str) -> None:
"""Record a message_hash as processed. Evicts old entries."""
if message_hash in self._seen_hashes:
return
self._seen_hashes.add(message_hash)
self._seen_hashes_order.append(message_hash)
while len(self._seen_hashes_order) > _SEEN_HASHES_MAX:
oldest = self._seen_hashes_order.pop(0)
self._seen_hashes.discard(oldest)
def _mark_content_seen(self, sender: str, channel, text: str) -> None:
"""Record a content key as processed. Evicts old entries."""
key = f"{channel}:{sender}:{text}"
if key in self._seen_content:
return
self._seen_content.add(key)
self._seen_content_order.append(key)
while len(self._seen_content_order) > _SEEN_HASHES_MAX:
oldest = self._seen_content_order.pop(0)
self._seen_content.discard(oldest)
def _is_seen(self, message_hash: str) -> bool:
"""Check if a message_hash has already been processed."""
return message_hash in self._seen_hashes
def _is_content_seen(self, sender: str, channel, text: str) -> bool:
"""Check if a content key has already been processed."""
key = f"{channel}:{sender}:{text}"
return key in self._seen_content
# ------------------------------------------------------------------
# Command handling
# ------------------------------------------------------------------
@@ -159,6 +296,7 @@ class BLEWorker:
'channel': channel,
'direction': 'out',
'sender_pubkey': '',
'path_hashes': [],
})
debug_print(f"Sent message to channel {channel}: {text[:30]}")
@@ -181,6 +319,7 @@ class BLEWorker:
'channel': None,
'direction': 'out',
'sender_pubkey': pubkey,
'path_hashes': [],
})
debug_print(f"Sent DM to {contact_name}: {text[:30]}")
@@ -193,23 +332,157 @@ class BLEWorker:
# Event callbacks
# ------------------------------------------------------------------
def _on_channel_msg(self, event) -> None:
"""Callback for received channel messages."""
def _on_rx_log(self, event) -> None:
"""Callback for RX log data — the single source of truth.
Decodes the raw LoRa frame via ``meshcoredecoder``. For
GroupText packets this yields message_hash, path_hashes,
sender, text and channel_idx — all from **one** frame.
The decoded message is added to SharedData directly. The
message_hash is recorded so that the duplicate
``CHANNEL_MSG_RECV`` event is suppressed.
"""
payload = event.payload
# Always add to the RX log display
self.shared.add_rx_log({
'time': datetime.now().strftime('%H:%M:%S'),
'snr': payload.get('snr', 0),
'rssi': payload.get('rssi', 0),
'payload_type': payload.get('payload_type', '?'),
'hops': payload.get('path_len', 0),
})
# Decode the raw packet
payload_hex = payload.get('payload', '')
if not payload_hex:
return
decoded = self._decoder.decode(payload_hex)
if decoded is None:
return
# Only process decrypted GroupText packets as messages
if (decoded.payload_type == PayloadType.GroupText
and decoded.is_decrypted):
# Mark as seen so CHANNEL_MSG_RECV is suppressed
self._mark_seen(decoded.message_hash)
self._mark_content_seen(
decoded.sender, decoded.channel_idx, decoded.text,
)
# Look up sender pubkey from contact name
sender_pubkey = ''
if decoded.sender:
match = self.shared.get_contact_by_name(decoded.sender)
if match:
sender_pubkey, _contact = match
# Extract SNR from the RX_LOG event
snr = payload.get('snr')
if snr is not None:
try:
snr = float(snr)
except (ValueError, TypeError):
snr = None
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': decoded.sender,
'text': decoded.text,
'channel': decoded.channel_idx,
'direction': 'in',
'snr': snr,
'path_len': decoded.path_length,
'sender_pubkey': sender_pubkey,
'path_hashes': decoded.path_hashes,
'message_hash': decoded.message_hash,
})
debug_print(
f"RX_LOG → message: hash={decoded.message_hash}, "
f"sender={decoded.sender!r}, "
f"ch={decoded.channel_idx}, "
f"path={decoded.path_hashes}"
)
def _on_channel_msg(self, event) -> None:
"""Callback for channel messages — fallback only.
If the same packet was already decoded from ``RX_LOG_DATA``
(checked via ``message_hash``), this event is suppressed.
Otherwise — e.g. when the channel key is missing or decryption
failed — this adds the message without path data.
"""
payload = event.payload
sender = payload.get('sender_name') or payload.get('sender') or ''
debug_print(f"Channel msg payload keys: {list(payload.keys())}")
debug_print(f"Channel msg payload: {payload}")
# --- Check for duplicate via message_hash ---
msg_hash = payload.get('message_hash', '')
if msg_hash and self._is_seen(msg_hash):
debug_print(
f"Channel msg suppressed (hash match): "
f"hash={msg_hash}"
)
return
# --- Extract sender name from text field ---
# Channel text format: "SenderName: message body"
raw_text = payload.get('text', '')
sender = ''
msg_text = raw_text
if ': ' in raw_text:
name_part, body_part = raw_text.split(': ', 1)
sender = name_part.strip()
msg_text = body_part
elif raw_text:
msg_text = raw_text
# --- Check for duplicate via content ---
ch_idx = payload.get('channel_idx')
if self._is_content_seen(sender, ch_idx, msg_text):
debug_print(
f"Channel msg suppressed (content match): "
f"sender={sender!r}, ch={ch_idx}, text={msg_text[:30]!r}"
)
return
debug_print(
f"Channel msg (fallback): sender={sender!r}, "
f"text={msg_text[:40]!r}"
)
# --- Look up sender contact by name to obtain pubkey ---
sender_pubkey = ''
if sender:
match = self.shared.get_contact_by_name(sender)
if match:
sender_pubkey, _contact = match
# Extract SNR
msg_snr = payload.get('SNR') or payload.get('snr')
if msg_snr is not None:
try:
msg_snr = float(msg_snr)
except (ValueError, TypeError):
msg_snr = None
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': sender[:15] if sender else '',
'text': payload.get('text', ''),
'sender': sender,
'text': msg_text,
'channel': payload.get('channel_idx'),
'direction': 'in',
'snr': payload.get('SNR') or payload.get('snr'),
'snr': msg_snr,
'path_len': payload.get('path_len', 0),
'sender_pubkey': payload.get('sender', ''),
'sender_pubkey': sender_pubkey,
'path_hashes': [], # No path data from companion event
'message_hash': msg_hash,
})
def _on_contact_msg(self, event) -> None:
@@ -229,24 +502,17 @@ class BLEWorker:
self.shared.add_message({
'time': datetime.now().strftime('%H:%M:%S'),
'sender': sender[:15] if sender else '',
'sender': sender,
'text': payload.get('text', ''),
'channel': None,
'direction': 'in',
'snr': payload.get('SNR') or payload.get('snr'),
'path_len': payload.get('path_len', 0),
'sender_pubkey': pubkey,
'path_hashes': [], # DMs use out_path from contact record
})
debug_print(f"DM received from {sender}: {payload.get('text', '')[:30]}")
def _on_rx_log(self, event) -> None:
"""Callback for RX log data."""
payload = event.payload
self.shared.add_rx_log({
'time': datetime.now().strftime('%H:%M:%S'),
'snr': payload.get('snr', 0),
'rssi': payload.get('rssi', 0),
'payload_type': payload.get('payload_type', '?'),
'hops': payload.get('path_len', 0),
})
debug_print(
f"DM received from {sender}: "
f"{payload.get('text', '')[:30]}"
)
+9 -6
View File
@@ -314,7 +314,7 @@ class DashboardPage:
channel_names = {ch['idx']: ch['name'] for ch in self._last_channels}
filtered = []
for msg in data['messages']:
for orig_idx, msg in enumerate(data['messages']):
ch = msg['channel']
if ch is None:
if self._channel_filters.get('DM') and not self._channel_filters['DM'].value:
@@ -322,12 +322,12 @@ class DashboardPage:
else:
if ch in self._channel_filters and not self._channel_filters[ch].value:
continue
filtered.append(msg)
filtered.append((orig_idx, msg))
self._messages_container.clear()
with self._messages_container:
for msg in reversed(filtered[-50:]):
for orig_idx, msg in reversed(filtered[-50:]):
direction = '' if msg['direction'] == 'out' else ''
ch = msg['channel']
@@ -339,18 +339,21 @@ class DashboardPage:
sender = msg.get('sender', '')
path_len = msg.get('path_len', 0)
hop_tag = f' [{path_len}h]' if msg['direction'] == 'in' and path_len > 0 else ''
has_path = bool(msg.get('path_hashes'))
if msg['direction'] == 'in' and path_len > 0:
hop_tag = f' [{path_len}h{"" if has_path else ""}]'
else:
hop_tag = ''
if sender:
line = f"{msg['time']} {direction} {ch_label}{hop_tag} {sender}: {msg['text']}"
else:
line = f"{msg['time']} {direction} {ch_label}{hop_tag} {msg['text']}"
msg_idx = len(filtered) - 1 - filtered[::-1].index(msg)
ui.label(line).classes(
'text-xs leading-tight cursor-pointer '
'hover:bg-blue-50 rounded px-1'
).on('click', lambda e, i=msg_idx: ui.navigate.to(
).on('click', lambda e, i=orig_idx: ui.navigate.to(
f'/route/{i}', new_tab=True
))
+113
View File
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""
MeshCore GUI - Threaded BLE Edition
====================================
Entry point. Parses arguments, wires up the components, registers
NiceGUI pages and starts the server.
Usage:
python meshcore_gui.py <BLE_ADDRESS>
python meshcore_gui.py <BLE_ADDRESS> --debug-on
Author: PE1HVH
Version: 4.0
SPDX-License-Identifier: MIT
Copyright: (c) 2026 PE1HVH
"""
import sys
from nicegui import ui
# Allow overriding DEBUG and CHANNELS_CONFIG before anything imports them
import meshcore_gui.config as config
try:
from meshcore import MeshCore, EventType # noqa: F401 — availability check
except ImportError:
print("ERROR: meshcore library not found")
print("Install with: pip install meshcore")
sys.exit(1)
from meshcore_gui.ble_worker import BLEWorker
from meshcore_gui.main_page import DashboardPage
from meshcore_gui.route_page import RoutePage
from meshcore_gui.shared_data import SharedData
# Global instances (needed by NiceGUI page decorators)
_shared = None
_dashboard = None
_route_page = None
@ui.page('/')
def _page_dashboard():
"""NiceGUI page handler — main dashboard."""
if _dashboard:
_dashboard.render()
@ui.page('/route/{msg_index}')
def _page_route(msg_index: int):
"""NiceGUI page handler — route visualization (new tab)."""
if _route_page:
_route_page.render(msg_index)
def main():
"""
Main entry point.
Parses CLI arguments, initialises all components and starts the
NiceGUI server.
"""
global _shared, _dashboard, _route_page
# Parse arguments
args = [a for a in sys.argv[1:] if not a.startswith('--')]
flags = [a for a in sys.argv[1:] if a.startswith('--')]
if not args:
print("MeshCore GUI - Threaded BLE Edition")
print("=" * 40)
print("Usage: python meshcore_gui.py <BLE_ADDRESS> [--debug-on]")
print("Example: python meshcore_gui.py literal:AA:BB:CC:DD:EE:FF")
print(" python meshcore_gui.py literal:AA:BB:CC:DD:EE:FF --debug-on")
print()
print("Options:")
print(" --debug-on Enable verbose debug logging")
print()
print("Tip: Use 'bluetoothctl scan on' to find devices")
sys.exit(1)
ble_address = args[0]
# Apply --debug-on flag
if '--debug-on' in flags:
config.DEBUG = True
# Startup banner
print("=" * 50)
print("MeshCore GUI - Threaded BLE Edition")
print("=" * 50)
print(f"Device: {ble_address}")
print(f"Debug mode: {'ON' if config.DEBUG else 'OFF'}")
print("=" * 50)
# Assemble components
_shared = SharedData()
_dashboard = DashboardPage(_shared)
_route_page = RoutePage(_shared)
# Start BLE worker in background thread
worker = BLEWorker(ble_address, _shared)
worker.start()
# Start NiceGUI server (blocks)
ui.run(title='MeshCore', port=8080, reload=False)
if __name__ == "__main__":
main()
+206
View File
@@ -0,0 +1,206 @@
"""
Packet decoder for MeshCore GUI — single-source approach.
Wraps ``meshcoredecoder`` to decode raw LoRa packets from RX_LOG_DATA
events. A single raw packet contains **everything**: message_hash,
path hashes, hop count, and (with channel keys) the decrypted text
and sender name.
No correlation with CHANNEL_MSG_RECV events is needed.
Channel decryption keys are loaded at startup (fetched from the device
via ``get_channel()`` or derived from the channel name as fallback).
"""
from dataclasses import dataclass, field
from hashlib import sha256
from typing import Dict, List, Optional
from meshcoredecoder import MeshCoreDecoder
from meshcoredecoder.crypto.channel_crypto import ChannelCrypto
from meshcoredecoder.crypto.key_manager import MeshCoreKeyStore
from meshcoredecoder.types.crypto import DecryptionOptions
from meshcoredecoder.types.enums import PayloadType
from meshcore_gui.config import debug_print
# Re-export so other modules don't need to import meshcoredecoder
__all__ = ["PacketDecoder", "DecodedPacket", "PayloadType"]
# ---------------------------------------------------------------------------
# Decoded result
# ---------------------------------------------------------------------------
@dataclass
class DecodedPacket:
"""All data extracted from a single raw LoRa packet.
Attributes:
message_hash: Deterministic packet identifier (hex string).
payload_type: Enum (GroupText, Advert, Ack, …).
path_length: Number of repeater hashes in the path.
path_hashes: 2-char hex strings, one per repeater.
sender: Sender name (GroupText only, after decryption).
text: Message body (GroupText only, after decryption).
channel_idx: Channel index (GroupText only, via hash→idx map).
timestamp: Message timestamp (GroupText only).
is_decrypted: True if payload was successfully decrypted.
"""
message_hash: str
payload_type: PayloadType
path_length: int
path_hashes: List[str] = field(default_factory=list)
# GroupText-specific (populated after successful decryption)
sender: str = ""
text: str = ""
channel_idx: Optional[int] = None
timestamp: int = 0
is_decrypted: bool = False
# ---------------------------------------------------------------------------
# Decoder
# ---------------------------------------------------------------------------
class PacketDecoder:
"""Decode raw LoRa packets with channel-key decryption.
Usage::
decoder = PacketDecoder()
decoder.add_channel_key(0, secret_bytes) # from device
decoder.add_channel_key_from_name(1, "#test") # fallback
result = decoder.decode(payload_hex)
if result and result.is_decrypted:
print(result.sender, result.text, result.path_hashes)
"""
def __init__(self) -> None:
self._key_store = MeshCoreKeyStore()
self._options: Optional[DecryptionOptions] = None
# channel_hash (2-char lower hex) → channel_idx
self._hash_to_idx: Dict[str, int] = {}
# ------------------------------------------------------------------
# Key management
# ------------------------------------------------------------------
def add_channel_key(self, channel_idx: int, secret_bytes: bytes) -> None:
"""Register a channel decryption key (16 raw bytes from device).
Args:
channel_idx: Channel index (0-based).
secret_bytes: 16-byte channel secret from ``get_channel()``.
"""
secret_hex = secret_bytes.hex()
self._key_store.add_channel_secrets([secret_hex])
self._rebuild_options()
ch_hash = ChannelCrypto.calculate_channel_hash(secret_hex).lower()
self._hash_to_idx[ch_hash] = channel_idx
debug_print(
f"PacketDecoder: key for ch{channel_idx} "
f"(hash={ch_hash}, from device)"
)
def add_channel_key_from_name(
self, channel_idx: int, channel_name: str,
) -> None:
"""Derive a channel key from the channel name (fallback).
MeshCore derives channel secrets as
``SHA-256(name.encode('utf-8'))[:16]``.
Args:
channel_idx: Channel index (0-based).
channel_name: Channel name string (e.g. ``"#test"``).
"""
secret_bytes = sha256(channel_name.encode("utf-8")).digest()[:16]
self.add_channel_key(channel_idx, secret_bytes)
debug_print(
f"PacketDecoder: key for ch{channel_idx} "
f"(derived from '{channel_name}')"
)
@property
def has_keys(self) -> bool:
"""True if at least one channel key has been registered."""
return self._options is not None
# ------------------------------------------------------------------
# Decode
# ------------------------------------------------------------------
def decode(self, payload_hex: str) -> Optional[DecodedPacket]:
"""Decode a raw LoRa packet hex string.
Args:
payload_hex: Hex string from the RX_LOG_DATA event's
``payload`` field.
Returns:
:class:`DecodedPacket` on success, ``None`` if the data
is invalid or too short.
"""
if not payload_hex:
return None
try:
packet = MeshCoreDecoder.decode(payload_hex, self._options)
except Exception as exc:
debug_print(f"PacketDecoder: decode error: {exc}")
return None
if not packet.is_valid:
debug_print(f"PacketDecoder: invalid: {packet.errors}")
return None
result = DecodedPacket(
message_hash=packet.message_hash,
payload_type=packet.payload_type,
path_length=packet.path_length,
path_hashes=list(packet.path) if packet.path else [],
)
# --- GroupText decryption ---
if packet.payload_type == PayloadType.GroupText:
decoded_payload = packet.payload.get("decoded")
if decoded_payload and decoded_payload.decrypted:
d = decoded_payload.decrypted
result.sender = d.get("sender", "") or ""
result.text = d.get("message", "") or ""
result.timestamp = d.get("timestamp", 0)
result.is_decrypted = True
# Resolve channel_hash → channel_idx
ch_hash = decoded_payload.channel_hash.lower()
result.channel_idx = self._hash_to_idx.get(ch_hash)
debug_print(
f"PacketDecoder: GroupText OK — "
f"hash={result.message_hash}, "
f"sender={result.sender!r}, "
f"ch={result.channel_idx}, "
f"path={result.path_hashes}, "
f"text={result.text[:40]!r}"
)
else:
debug_print(
f"PacketDecoder: GroupText NOT decrypted "
f"(hash={result.message_hash})"
)
return result
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _rebuild_options(self) -> None:
"""Recreate DecryptionOptions after a key change."""
self._options = DecryptionOptions(key_store=self._key_store)
+6 -2
View File
@@ -37,6 +37,7 @@ class SharedDataWriter(Protocol):
def add_rx_log(self, entry: Dict) -> None: ...
def get_next_command(self) -> Optional[Dict]: ...
def get_contact_name_by_prefix(self, pubkey_prefix: str) -> str: ...
def get_contact_by_name(self, name: str) -> Optional[tuple]: ...
# ----------------------------------------------------------------------
@@ -65,11 +66,14 @@ class SharedDataReader(Protocol):
class ContactLookup(Protocol):
"""Contact lookup interface used by RouteBuilder.
RouteBuilder only needs to resolve public key prefixes to
contact records.
RouteBuilder needs to resolve public key prefixes and names
to contact records. Path hashes are always available in the
message dict (decoded from the raw packet), so no archive
lookup is needed.
"""
def get_contact_by_prefix(self, pubkey_prefix: str) -> Optional[Dict]: ...
def get_contact_by_name(self, name: str) -> Optional[tuple]: ...
# ----------------------------------------------------------------------
+137 -44
View File
@@ -5,15 +5,22 @@ Pure data logic — no UI code. Given a message and a data snapshot, this
module constructs a route dictionary that describes the path the message
has taken through the mesh network (sender → repeaters → receiver).
The route information comes from two sources:
Path data sources (in priority order):
1. **path_len** (from the message itself) — number of hops the message
traveled. Always available for received messages.
1. **path_hashes** (from the message) — decoded from the raw LoRa
packet by ``meshcoredecoder`` via ``RX_LOG_DATA``. Each entry is a
2-char hex string representing the first byte of a repeater's public
key. Always available when the packet was successfully decrypted
(single-source architecture).
2. **out_path** (from the sender's contact record) — hex string where
each byte (2 hex chars) is the first byte of a repeater's public
key. Only available when the sender is a known contact with a stored
route.
key. Only available for known contacts with a stored route. This
is the *last known* route to/from that contact, not necessarily the
route of *this* message.
3. **path_len only** — hop count from the message frame. Always
available for received messages but contains no repeater identities.
"""
from typing import Dict, List, Optional
@@ -41,7 +48,7 @@ class RouteBuilder:
Args:
msg: Message dict (must contain 'sender_pubkey', may contain
'path_len' and 'snr')
'path_len', 'snr' and 'path_hashes')
data: Snapshot dictionary from SharedData.get_snapshot()
Returns:
@@ -52,6 +59,7 @@ class RouteBuilder:
snr: float or None
msg_path_len: int — hop count from the message itself
has_locations: bool — True if any node has GPS coords
path_source: str — 'rx_log', 'contact_out_path' or 'none'
"""
result: Dict = {
'sender': None,
@@ -64,12 +72,24 @@ class RouteBuilder:
'snr': msg.get('snr'),
'msg_path_len': msg.get('path_len', 0),
'has_locations': False,
'path_source': 'none',
}
# Look up sender in contacts
pubkey = msg.get('sender_pubkey', '')
contact: Optional[Dict] = None
debug_print(
f"Route build: sender_pubkey={pubkey!r} "
f"(len={len(pubkey)}, first2={pubkey[:2]!r})"
)
if pubkey:
contact = self._shared.get_contact_by_prefix(pubkey)
debug_print(
f"Route build: contact lookup "
f"{'FOUND ' + contact.get('adv_name', '?') if contact else 'NOT FOUND'}"
)
if contact:
result['sender'] = {
'name': contact.get('adv_name', pubkey[:8]),
@@ -78,21 +98,71 @@ class RouteBuilder:
'type': contact.get('type', 0),
'pubkey': pubkey,
}
# Parse out_path for intermediate hops
out_path = contact.get('out_path', '')
out_path_len = contact.get('out_path_len', 0)
debug_print(
f"Route: sender={contact.get('adv_name')}, "
f"out_path={out_path!r}, out_path_len={out_path_len}, "
f"msg_path_len={result['msg_path_len']}"
f"Route build: sender hash will be "
f"{pubkey[:2].upper()!r}"
)
if out_path and out_path_len and out_path_len > 0:
result['path_nodes'] = self._parse_out_path(
out_path, out_path_len, data['contacts']
else:
# Deferred sender lookup: try fuzzy name match
# Use sender_full (untruncated) if available, fall back to sender
sender_name = msg.get('sender_full') or msg.get('sender', '')
if sender_name:
match = self._shared.get_contact_by_name(sender_name)
if match:
pubkey, contact_data = match
contact = contact_data
result['sender'] = {
'name': contact_data.get('adv_name', pubkey[:8]),
'lat': contact_data.get('adv_lat', 0),
'lon': contact_data.get('adv_lon', 0),
'type': contact_data.get('type', 0),
'pubkey': pubkey,
}
debug_print(
f"Route build: deferred name lookup "
f"'{sender_name}' → pubkey={pubkey[:16]!r}, "
f"hash={pubkey[:2].upper()!r}"
)
else:
debug_print(
f"Route build: deferred name lookup "
f"'{sender_name}' → NOT FOUND"
)
else:
debug_print("Route build: sender_pubkey is EMPTY, no name → hash will be '-'")
# --- Resolve path nodes (priority order) ---
# Priority 1: path_hashes from RX_LOG decode (single-source)
rx_hashes = msg.get('path_hashes', [])
if rx_hashes:
result['path_nodes'] = self._resolve_hashes(
rx_hashes, data['contacts'],
)
result['path_source'] = 'rx_log'
debug_print(
f"Route from RX_LOG: {len(rx_hashes)} hashes → "
f"{len(result['path_nodes'])} nodes"
)
# Priority 2: out_path from sender's contact record
elif contact:
out_path = contact.get('out_path', '')
out_path_len = contact.get('out_path_len', 0)
debug_print(
f"Route: sender={contact.get('adv_name')}, "
f"out_path={out_path!r}, out_path_len={out_path_len}, "
f"msg_path_len={result['msg_path_len']}"
)
if out_path and out_path_len and out_path_len > 0:
result['path_nodes'] = self._parse_out_path(
out_path, out_path_len, data['contacts'],
)
result['path_source'] = 'contact_out_path'
# Determine if any node has GPS coordinates
all_points = [result['self_node']]
@@ -111,6 +181,50 @@ class RouteBuilder:
# Helpers
# ------------------------------------------------------------------
@staticmethod
def _resolve_hashes(
hashes: List[str],
contacts: Dict,
) -> List[Dict]:
"""
Resolve a list of 1-byte path hashes into hop node dicts.
Args:
hashes: List of 2-char hex strings (e.g. ["8d", "a8"])
contacts: Contacts dictionary from snapshot
Returns:
List of hop node dicts.
"""
nodes: List[Dict] = []
for hop_hash in hashes:
if not hop_hash or len(hop_hash) < 2:
continue
hop_contact = RouteBuilder._find_contact_by_pubkey_hash(
hop_hash, contacts,
)
if hop_contact:
nodes.append({
'name': hop_contact.get('adv_name', f'0x{hop_hash}'),
'lat': hop_contact.get('adv_lat', 0),
'lon': hop_contact.get('adv_lon', 0),
'type': hop_contact.get('type', 0),
'pubkey': hop_hash,
})
else:
nodes.append({
'name': '-',
'lat': 0,
'lon': 0,
'type': 0,
'pubkey': hop_hash,
})
return nodes
@staticmethod
def _parse_out_path(
out_path: str,
@@ -126,40 +240,19 @@ class RouteBuilder:
Returns:
List of hop node dicts.
"""
nodes: List[Dict] = []
hashes: List[str] = []
hop_hex_len = 2 # 1 byte = 2 hex chars
for i in range(0, min(len(out_path), out_path_len * 2), hop_hex_len):
hop_hash = out_path[i:i + hop_hex_len]
if not hop_hash or len(hop_hash) < 2:
continue
if hop_hash and len(hop_hash) == 2:
hashes.append(hop_hash)
hop_contact = RouteBuilder._find_contact_by_pubkey_hash(
hop_hash, contacts
)
if hop_contact:
nodes.append({
'name': hop_contact.get('adv_name', f'0x{hop_hash}'),
'lat': hop_contact.get('adv_lat', 0),
'lon': hop_contact.get('adv_lon', 0),
'type': hop_contact.get('type', 0),
'pubkey': hop_hash,
})
else:
nodes.append({
'name': f'Unknown (0x{hop_hash})',
'lat': 0,
'lon': 0,
'type': 0,
'pubkey': hop_hash,
})
return nodes
return RouteBuilder._resolve_hashes(hashes, contacts)
@staticmethod
def _find_contact_by_pubkey_hash(
hash_hex: str, contacts: Dict
hash_hex: str, contacts: Dict,
) -> Optional[Dict]:
"""
Find a contact whose pubkey starts with the given 1-byte hash.
+86 -38
View File
@@ -10,7 +10,7 @@ from typing import Dict
from nicegui import ui
from meshcore_gui.config import TYPE_LABELS
from meshcore_gui.config import TYPE_LABELS, debug_print
from meshcore_gui.route_builder import RouteBuilder
from meshcore_gui.protocols import SharedDataReadAndLookup
@@ -48,6 +48,8 @@ class RoutePage:
msg = data['messages'][msg_index]
route = self._builder.build(msg, data)
sender = msg.get('sender', 'Unknown')
ui.page_title(f'Route — {sender}')
ui.dark_mode(False)
# Header
@@ -66,19 +68,22 @@ class RoutePage:
@staticmethod
def _render_message_info(msg: Dict) -> None:
"""Message header with direction and text."""
"""Message header with sender name and text."""
sender = msg.get('sender', 'Unknown')
direction = '→ Sent' if msg['direction'] == 'out' else '← Received'
ui.label(f'Message Route — {direction}').classes('font-bold text-lg')
ui.label(f'Message Route — {sender} ({direction})').classes('font-bold text-lg')
ui.label(
f"{msg['time']} {msg.get('sender', '')}: "
f"{msg['time']} {sender}: "
f"{msg['text'][:120]}"
).classes('text-sm text-gray-600')
@staticmethod
def _render_hop_summary(msg: Dict, route: Dict) -> None:
"""Hop count banner with SNR."""
"""Hop count banner with SNR and path source."""
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_source = route.get('path_source', 'none')
expected_repeaters = max(msg_path_len - 1, 0)
with ui.card().classes('w-full'):
with ui.row().classes('items-center gap-4'):
@@ -103,22 +108,33 @@ class RoutePage:
).classes('text-sm text-gray-600')
# Resolution status
if msg_path_len > 0 and resolved_hops > 0:
if expected_repeaters > 0 and resolved_hops > 0:
source_label = (
'from received packet'
if path_source == 'rx_log'
else 'from stored contact route'
)
rpt = 'repeater' if expected_repeaters == 1 else 'repeaters'
ui.label(
f'{resolved_hops} of {msg_path_len} '
f'repeater{"s" if msg_path_len != 1 else ""} identified'
f'{resolved_hops} of {expected_repeaters} '
f'{rpt} identified '
f'({source_label})'
).classes('text-xs text-gray-500 mt-1')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
f'{msg_path_len} '
f'hop{"s" if msg_path_len != 1 else ""}'
f'repeater identities not resolved '
f'(not in out_path or not in contacts)'
f'repeater identities not resolved'
).classes('text-xs text-gray-500 mt-1')
@staticmethod
def _render_map(data: Dict, route: Dict) -> None:
"""Leaflet map with route markers and polyline."""
"""Leaflet map with route markers and polylines.
Lines are only drawn between nodes that are **adjacent** in the
route and both have GPS coordinates. A node without coordinates
breaks the line so that no false connections are shown.
"""
with ui.card().classes('w-full'):
if not route['has_locations']:
ui.label(
@@ -133,35 +149,50 @@ class RoutePage:
center=(center_lat, center_lon), zoom=10
).classes('w-full h-96')
path_points = []
# --- Build ordered list of positions (or None) ---
ordered = []
# Sender
if route['sender'] and (route['sender']['lat'] or route['sender']['lon']):
lat, lon = route['sender']['lat'], route['sender']['lon']
route_map.marker(latlng=(lat, lon))
path_points.append((lat, lon))
if route['sender']:
s = route['sender']
if s['lat'] or s['lon']:
ordered.append((s['lat'], s['lon']))
else:
ordered.append(None)
else:
ordered.append(None)
# Repeaters
for node in route['path_nodes']:
if node['lat'] or node['lon']:
lat, lon = node['lat'], node['lon']
route_map.marker(latlng=(lat, lon))
path_points.append((lat, lon))
ordered.append((node['lat'], node['lon']))
else:
ordered.append(None)
# Own position
# Own position (receiver)
if data['adv_lat'] or data['adv_lon']:
route_map.marker(latlng=(data['adv_lat'], data['adv_lon']))
path_points.append((data['adv_lat'], data['adv_lon']))
ordered.append((data['adv_lat'], data['adv_lon']))
else:
ordered.append(None)
# Polyline
if len(path_points) >= 2:
# --- Place markers for all nodes with coordinates ---
all_points = [p for p in ordered if p is not None]
for lat, lon in all_points:
route_map.marker(latlng=(lat, lon))
# --- Draw line between all located nodes (skip unknowns) ---
# Nodes without coordinates are simply skipped so the line
# connects sender → known repeaters → receiver without gaps.
if len(all_points) >= 2:
route_map.generic_layer(
name='polyline',
args=[path_points],
options={'color': '#2563eb', 'weight': 3},
args=[all_points, {'color': '#2563eb', 'weight': 3}],
)
lats = [p[0] for p in path_points]
lons = [p[1] for p in path_points]
# Center map on all located nodes
if all_points:
lats = [p[0] for p in all_points]
lons = [p[1] for p in all_points]
route_map.set_center(
(sum(lats) / len(lats), sum(lons) / len(lons))
)
@@ -171,6 +202,7 @@ class RoutePage:
"""Route details table with sender, hops and receiver."""
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_source = route.get('path_source', 'none')
with ui.card().classes('w-full'):
ui.label('📋 Route Details').classes('font-bold text-gray-600')
@@ -184,36 +216,41 @@ class RoutePage:
rows.append({
'hop': 'Start',
'name': s['name'],
'hash': s.get('pubkey', '')[:2].upper() if s.get('pubkey') else '-',
'type': TYPE_LABELS.get(s['type'], '-'),
'location': f"{s['lat']:.4f}, {s['lon']:.4f}" if has_loc else '-',
'role': '📱 Sender',
})
else:
sender_pubkey = msg.get('sender_pubkey', '')
rows.append({
'hop': 'Start',
'name': msg.get('sender', 'Unknown'),
'hash': sender_pubkey[:2].upper() if sender_pubkey else '-',
'type': '-',
'location': '-',
'role': '📱 Sender',
})
# Resolved repeaters
# Resolved repeaters (from RX_LOG or out_path)
for i, node in enumerate(route['path_nodes']):
has_loc = node['lat'] != 0 or node['lon'] != 0
rows.append({
'hop': str(i + 1),
'name': node['name'],
'hash': node.get('pubkey', '')[:2].upper() if node.get('pubkey') else '-',
'type': TYPE_LABELS.get(node['type'], '-'),
'location': f"{node['lat']:.4f}, {node['lon']:.4f}" if has_loc else '-',
'role': '📡 Repeater',
})
# Placeholder rows for unresolved hops
if msg_path_len > resolved_hops:
for i in range(resolved_hops, msg_path_len):
# Placeholder rows when no path data was resolved
if not route['path_nodes'] and msg_path_len > 0:
for i in range(msg_path_len):
rows.append({
'hop': str(i + 1),
'name': '(unknown repeater)',
'name': '-',
'hash': '-',
'type': '-',
'location': '-',
'role': '📡 Repeater',
@@ -224,6 +261,7 @@ class RoutePage:
rows.append({
'hop': 'End',
'name': data['name'] or 'Me',
'hash': '-',
'type': 'Companion',
'location': f"{data['adv_lat']:.4f}, {data['adv_lon']:.4f}" if self_has_loc else '-',
'role': '📱 Receiver' if msg['direction'] == 'in' else '📱 Sender',
@@ -234,23 +272,33 @@ class RoutePage:
{'name': 'hop', 'label': 'Hop', 'field': 'hop', 'align': 'center'},
{'name': 'role', 'label': 'Role', 'field': 'role'},
{'name': 'name', 'label': 'Name', 'field': 'name'},
{'name': 'hash', 'label': 'ID', 'field': 'hash', 'align': 'center'},
{'name': 'type', 'label': 'Type', 'field': 'type'},
{'name': 'location', 'label': 'Location', 'field': 'location'},
],
rows=rows,
).props('dense flat bordered').classes('w-full')
# Footnote
# Footnote based on path_source
if msg_path_len == 0 and msg['direction'] == 'in':
ui.label(
'️ Direct message — no intermediate hops.'
).classes('text-xs text-gray-400 italic mt-2')
elif path_source == 'rx_log':
ui.label(
'️ Path extracted from received LoRa packet (RX_LOG). '
'Each ID is the first byte of a node\'s public key.'
).classes('text-xs text-gray-400 italic mt-2')
elif path_source == 'contact_out_path':
ui.label(
'️ Path from sender\'s stored contact route (out_path). '
'Last known route, not necessarily this message\'s path.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
"️ The repeater identities could not be resolved. "
"This happens when the sender's out_path is empty "
"(e.g. channel messages) or the repeaters are not in "
"your contacts list."
'️ Repeater identities could not be resolved. '
'RX_LOG correlation may have missed the raw packet, '
'and sender has no stored out_path.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg['direction'] == 'out':
ui.label(
+306
View File
@@ -0,0 +1,306 @@
"""
Route visualization page for MeshCore GUI.
Standalone NiceGUI page that opens in a new browser tab when a user
clicks on a message. Shows a Leaflet map with the message route,
a hop count summary, and a details table.
"""
from typing import Dict
from nicegui import ui
from meshcore_gui.config import TYPE_LABELS, debug_print
from meshcore_gui.route_builder import RouteBuilder
from meshcore_gui.protocols import SharedDataReadAndLookup
class RoutePage:
"""
Route visualization page rendered at ``/route/{msg_index}``.
Args:
shared: SharedDataReadAndLookup for data access and contact lookups
"""
def __init__(self, shared: SharedDataReadAndLookup) -> None:
self._shared = shared
self._builder = RouteBuilder(shared)
# ------------------------------------------------------------------
# Public
# ------------------------------------------------------------------
def render(self, msg_index: int) -> None:
"""
Render the route page for a specific message.
Args:
msg_index: Index into SharedData.messages list
"""
data = self._shared.get_snapshot()
# Validate
if msg_index < 0 or msg_index >= len(data['messages']):
ui.label('❌ Message not found').classes('text-xl p-8')
return
msg = data['messages'][msg_index]
route = self._builder.build(msg, data)
sender = msg.get('sender', 'Unknown')
ui.page_title(f'Route — {sender}')
ui.dark_mode(False)
# Header
with ui.header().classes('bg-blue-600 text-white'):
ui.label('🗺️ MeshCore Route').classes('text-xl font-bold')
with ui.column().classes('w-full max-w-4xl mx-auto p-4 gap-4'):
self._render_message_info(msg)
self._render_hop_summary(msg, route)
self._render_map(data, route)
self._render_route_table(msg, data, route)
# ------------------------------------------------------------------
# Private — sub-sections
# ------------------------------------------------------------------
@staticmethod
def _render_message_info(msg: Dict) -> None:
"""Message header with sender name and text."""
sender = msg.get('sender', 'Unknown')
direction = '→ Sent' if msg['direction'] == 'out' else '← Received'
ui.label(f'Message Route — {sender} ({direction})').classes('font-bold text-lg')
ui.label(
f"{msg['time']} {sender}: "
f"{msg['text'][:120]}"
).classes('text-sm text-gray-600')
@staticmethod
def _render_hop_summary(msg: Dict, route: Dict) -> None:
"""Hop count banner with SNR and path source."""
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_source = route.get('path_source', 'none')
expected_repeaters = max(msg_path_len - 1, 0)
with ui.card().classes('w-full'):
with ui.row().classes('items-center gap-4'):
if msg['direction'] == 'in':
if msg_path_len == 0:
ui.label('📡 Direct (0 hops)').classes(
'text-lg font-bold text-green-600'
)
else:
hop_text = '1 hop' if msg_path_len == 1 else f'{msg_path_len} hops'
ui.label(f'📡 {hop_text}').classes(
'text-lg font-bold text-blue-600'
)
else:
ui.label('📡 Outgoing message').classes(
'text-lg font-bold text-gray-600'
)
if route['snr'] is not None:
ui.label(
f'📶 SNR: {route["snr"]:.1f} dB'
).classes('text-sm text-gray-600')
# Resolution status
if expected_repeaters > 0 and resolved_hops > 0:
source_label = (
'from received packet'
if path_source == 'rx_log'
else 'from stored contact route'
)
rpt = 'repeater' if expected_repeaters == 1 else 'repeaters'
ui.label(
f'{resolved_hops} of {expected_repeaters} '
f'{rpt} identified '
f'({source_label})'
).classes('text-xs text-gray-500 mt-1')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
f'{msg_path_len} '
f'hop{"s" if msg_path_len != 1 else ""}'
f'repeater identities not resolved'
).classes('text-xs text-gray-500 mt-1')
@staticmethod
def _render_map(data: Dict, route: Dict) -> None:
"""Leaflet map with route markers and polylines.
Lines are only drawn between nodes that are **adjacent** in the
route and both have GPS coordinates. A node without coordinates
breaks the line so that no false connections are shown.
"""
with ui.card().classes('w-full'):
if not route['has_locations']:
ui.label(
'📍 No location data available for map display'
).classes('text-gray-500 italic p-4')
return
center_lat = data['adv_lat'] or 52.5
center_lon = data['adv_lon'] or 6.0
route_map = ui.leaflet(
center=(center_lat, center_lon), zoom=10
).classes('w-full h-96')
# --- Build ordered list of positions (or None) ---
ordered = []
# Sender
if route['sender']:
s = route['sender']
if s['lat'] or s['lon']:
ordered.append((s['lat'], s['lon']))
else:
ordered.append(None)
else:
ordered.append(None)
# Repeaters
for node in route['path_nodes']:
if node['lat'] or node['lon']:
ordered.append((node['lat'], node['lon']))
else:
ordered.append(None)
# Own position (receiver)
if data['adv_lat'] or data['adv_lon']:
ordered.append((data['adv_lat'], data['adv_lon']))
else:
ordered.append(None)
# --- Place markers for all nodes with coordinates ---
all_points = [p for p in ordered if p is not None]
for lat, lon in all_points:
route_map.marker(latlng=(lat, lon))
# --- Draw line between all located nodes (skip unknowns) ---
# Nodes without coordinates are simply skipped so the line
# connects sender → known repeaters → receiver without gaps.
if len(all_points) >= 2:
route_map.generic_layer(
name='polyline',
args=[all_points, {'color': '#2563eb', 'weight': 3}],
)
# Center map on all located nodes
if all_points:
lats = [p[0] for p in all_points]
lons = [p[1] for p in all_points]
route_map.set_center(
(sum(lats) / len(lats), sum(lons) / len(lons))
)
@staticmethod
def _render_route_table(msg: Dict, data: Dict, route: Dict) -> None:
"""Route details table with sender, hops and receiver."""
msg_path_len = route['msg_path_len']
resolved_hops = len(route['path_nodes'])
path_source = route.get('path_source', 'none')
with ui.card().classes('w-full'):
ui.label('📋 Route Details').classes('font-bold text-gray-600')
rows = []
# Sender
if route['sender']:
s = route['sender']
has_loc = s['lat'] != 0 or s['lon'] != 0
rows.append({
'hop': 'Start',
'name': s['name'],
'hash': s.get('pubkey', '')[:2].upper() if s.get('pubkey') else '-',
'type': TYPE_LABELS.get(s['type'], '-'),
'location': f"{s['lat']:.4f}, {s['lon']:.4f}" if has_loc else '-',
'role': '📱 Sender',
})
else:
sender_pubkey = msg.get('sender_pubkey', '')
rows.append({
'hop': 'Start',
'name': msg.get('sender', 'Unknown'),
'hash': sender_pubkey[:2].upper() if sender_pubkey else '-',
'type': '-',
'location': '-',
'role': '📱 Sender',
})
# Resolved repeaters (from RX_LOG or out_path)
for i, node in enumerate(route['path_nodes']):
has_loc = node['lat'] != 0 or node['lon'] != 0
rows.append({
'hop': str(i + 1),
'name': node['name'],
'hash': node.get('pubkey', '')[:2].upper() if node.get('pubkey') else '-',
'type': TYPE_LABELS.get(node['type'], '-'),
'location': f"{node['lat']:.4f}, {node['lon']:.4f}" if has_loc else '-',
'role': '📡 Repeater',
})
# Placeholder rows when no path data was resolved
if not route['path_nodes'] and msg_path_len > 0:
for i in range(msg_path_len):
rows.append({
'hop': str(i + 1),
'name': '-',
'hash': '-',
'type': '-',
'location': '-',
'role': '📡 Repeater',
})
# Own position
self_has_loc = data['adv_lat'] != 0 or data['adv_lon'] != 0
rows.append({
'hop': 'End',
'name': data['name'] or 'Me',
'hash': '-',
'type': 'Companion',
'location': f"{data['adv_lat']:.4f}, {data['adv_lon']:.4f}" if self_has_loc else '-',
'role': '📱 Receiver' if msg['direction'] == 'in' else '📱 Sender',
})
ui.table(
columns=[
{'name': 'hop', 'label': 'Hop', 'field': 'hop', 'align': 'center'},
{'name': 'role', 'label': 'Role', 'field': 'role'},
{'name': 'name', 'label': 'Name', 'field': 'name'},
{'name': 'hash', 'label': 'ID', 'field': 'hash', 'align': 'center'},
{'name': 'type', 'label': 'Type', 'field': 'type'},
{'name': 'location', 'label': 'Location', 'field': 'location'},
],
rows=rows,
).props('dense flat bordered').classes('w-full')
# Footnote based on path_source
if msg_path_len == 0 and msg['direction'] == 'in':
ui.label(
'️ Direct message — no intermediate hops.'
).classes('text-xs text-gray-400 italic mt-2')
elif path_source == 'rx_log':
ui.label(
'️ Path extracted from received LoRa packet (RX_LOG). '
'Each ID is the first byte of a node\'s public key.'
).classes('text-xs text-gray-400 italic mt-2')
elif path_source == 'contact_out_path':
ui.label(
'️ Path from sender\'s stored contact route (out_path). '
'Last known route, not necessarily this message\'s path.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg_path_len > 0 and resolved_hops == 0:
ui.label(
'️ Repeater identities could not be resolved. '
'RX_LOG correlation may have missed the raw packet, '
'and sender has no stored out_path.'
).classes('text-xs text-gray-400 italic mt-2')
elif msg['direction'] == 'out':
ui.label(
'️ Hop information is only available for received messages.'
).classes('text-xs text-gray-400 italic mt-2')
+49 -3
View File
@@ -4,11 +4,17 @@ Thread-safe shared data container for MeshCore GUI.
SharedData is the central data store shared between the BLE worker thread
and the GUI main thread. All access goes through methods that acquire a
threading.Lock so both threads can safely read and write.
Single-source architecture
~~~~~~~~~~~~~~~~~~~~~~~~~~
Path data (repeater hashes) is embedded in each message dict at creation
time — decoded from the raw LoRa packet via ``meshcoredecoder``. There
is no temporal archive or deferred matching.
"""
import queue
import threading
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Tuple
from meshcore_gui.config import debug_print
@@ -155,8 +161,9 @@ class SharedData:
Add a message to the messages list (max 100).
Args:
msg: Message dict with time, sender, text, channel,
direction, path_len, snr, sender_pubkey
msg: Message dict with keys: time, sender, text, channel,
direction, path_len, path_hashes, message_hash,
sender_pubkey, snr
"""
with self.lock:
self.messages.append(msg)
@@ -261,3 +268,42 @@ class SharedData:
return name
return pubkey_prefix[:8]
# ------------------------------------------------------------------
# Contact lookup by name
# ------------------------------------------------------------------
def get_contact_by_name(self, name: str) -> Optional[Tuple[str, Dict]]:
"""
Look up a contact by advertised name.
Tries in order: exact match → case-insensitive → startswith
(either direction, to handle truncated names).
Returns:
``(pubkey, contact_dict)`` tuple, or ``None`` if no match.
"""
if not name:
return None
with self.lock:
# Strategy 1: exact match
for key, contact in self.contacts.items():
if contact.get('adv_name', '') == name:
return (key, contact.copy())
# Strategy 2: case-insensitive
name_lower = name.lower()
for key, contact in self.contacts.items():
if contact.get('adv_name', '').lower() == name_lower:
return (key, contact.copy())
# Strategy 3: one name starts with the other
for key, contact in self.contacts.items():
adv = contact.get('adv_name', '')
if not adv:
continue
if name.startswith(adv) or adv.startswith(name):
return (key, contact.copy())
return None