Files
meshcore-gui/meshcore_gui/ble/packet_decoder.py
pe1hvh 4a58a95cf0 fix: channel attribution, dedup, cache bugs + channel edit/move (v1.19.0)
fix(packet_decoder): brute-force channel resolution when hash lookup fails

ChannelCrypto.calculate_channel_hash() and the MeshCore firmware compute
different channel identifiers for the same secret, causing _hash_to_idx to
return None for all hashtag-channel messages. Fallback: try each registered
key individually via a single-key keystore. First valid decryption wins.
Result cached in _hash_to_idx for O(1) resolution on subsequent packets.

fix(events): channel-agnostic sentinel prevents cross-channel duplicates

on_rx_log now marks '*' in DualDeduplicator after storing a message.
on_channel_msg checks this sentinel and suppresses storage regardless of
channel_idx or message_hash differences between the two library systems.
Fixes #mc-radar messages appearing in #weather and vice versa.

fix(events): secondary path-cache keyed by content for hash-mismatch

_path_cache keyed by meshcoredecoder hash; CHANNEL_MSG_RECV carries meshcore
hash (different value). Added _path_cache_by_content ("sender:text[:100]")
as fallback so path_hashes are recovered when the two hashes disagree.

fix(commands,cache): remove stale channel key after del_channel reindex

Cache entry for old_idx not removed after slot move, causing same channel
to appear twice. New DeviceCache.remove_channel_key(idx) called after each
move. asyncio.sleep(0.5) added before re-discovery to let device settle.

feat(channel_panel,commands,dashboard): channel edit — move/reindex support

First channel-edit capability in the GUI. New '↕️ Move / Reindex' mode in
Channel Manager dialog. Source channel selected from dropdown, target index
in number field. ↕ button inline with 🗑 in Messages and Archive submenus.
_cmd_move_channel reads secret from cache or device, writes new slot, clears
old slot, updates cache atomically, triggers re-discovery with settle delay.
2026-04-06 10:23:24 +02:00

243 lines
9.4 KiB
Python

"""
Packet decoder for MeshCore GUI — single-source approach.
Wraps ``meshcoredecoder`` to decode raw LoRa packets from RX_LOG_DATA
events. A single raw packet contains **everything**: message_hash,
path hashes, hop count, and (with channel keys) the decrypted text
and sender name.
No correlation with CHANNEL_MSG_RECV events is needed.
Channel attribution
~~~~~~~~~~~~~~~~~~~
The channel a message belongs to is determined by **which registered key
successfully decrypts the payload** — not by any channel index or the
``channel_hash`` embedded in the packet header. The firmware-embedded
``channel_hash`` is ignored for attribution because ``ChannelCrypto``
and the firmware may compute it differently, making hash-based lookup
unreliable.
Decryption is therefore attempted per-key (one ``MeshCoreDecoder.decode``
call per registered channel). For a typical deployment with fewer than
ten channels this cost is negligible, and the correct channel is always
identified deterministically.
Channel decryption keys are loaded at startup (fetched from the device
via ``get_channel()`` or derived from the channel name as fallback).
"""
from dataclasses import dataclass, field
from hashlib import sha256
from typing import Dict, List, Optional
from meshcoredecoder import MeshCoreDecoder
from meshcoredecoder.crypto.key_manager import MeshCoreKeyStore
from meshcoredecoder.types.crypto import DecryptionOptions
from meshcoredecoder.types.enums import PayloadType
from meshcoredecoder.utils.enum_names import get_payload_type_name
from meshcore_gui.config import debug_print
# Re-export so other modules don't need to import meshcoredecoder
__all__ = ["PacketDecoder", "DecodedPacket", "PayloadType"]
# ---------------------------------------------------------------------------
# Decoded result
# ---------------------------------------------------------------------------
@dataclass
class DecodedPacket:
"""All data extracted from a single raw LoRa packet.
Attributes:
message_hash: Deterministic packet identifier (hex string).
payload_type: Enum (GroupText, Advert, Ack, …).
path_length: Number of repeater hashes in the path.
path_hashes: 2-char hex strings, one per repeater.
sender: Sender name (GroupText only, after decryption).
text: Message body (GroupText only, after decryption).
channel_idx: Channel index (GroupText only, resolved by key match).
timestamp: Message timestamp (GroupText only).
is_decrypted: True if payload was successfully decrypted.
"""
message_hash: str
payload_type: PayloadType
path_length: int
path_hashes: List[str] = field(default_factory=list)
# GroupText-specific (populated after successful decryption)
sender: str = ""
text: str = ""
channel_idx: Optional[int] = None
timestamp: int = 0
is_decrypted: bool = False
# ---------------------------------------------------------------------------
# Decoder
# ---------------------------------------------------------------------------
class PacketDecoder:
"""Decode raw LoRa packets with per-key channel attribution.
Channel attribution is done by key matching: the registered secret that
successfully decrypts a GroupText packet identifies its channel. This
avoids relying on the ``channel_hash`` mechanism, which requires the
MeshCore firmware and ``meshcoredecoder`` to compute identical hashes —
a dependency that cannot always be guaranteed.
Usage::
decoder = PacketDecoder()
decoder.add_channel_key(0, secret_bytes) # from device
decoder.add_channel_key_from_name(1, "#test") # fallback
result = decoder.decode(payload_hex)
if result and result.is_decrypted:
print(result.sender, result.text, result.channel_idx)
"""
def __init__(self) -> None:
# secret_hex → channel_idx (primary channel attribution map)
self._secret_to_idx: Dict[str, int] = {}
# ------------------------------------------------------------------
# Key management
# ------------------------------------------------------------------
def add_channel_key(
self,
channel_idx: int,
secret_bytes: bytes,
source: str = "device",
) -> None:
"""Register a channel decryption key (16 raw bytes from device).
Args:
channel_idx: Channel index (0-based).
secret_bytes: 16-byte channel secret from ``get_channel()``.
source: Label for debug output (e.g. "device", "cache").
"""
secret_hex = secret_bytes.hex()
self._secret_to_idx[secret_hex] = channel_idx
debug_print(
f"PacketDecoder: key registered for ch{channel_idx} "
f"(source={source}, secret={secret_hex[:8]}…)"
)
def add_channel_key_from_name(
self, channel_idx: int, channel_name: str,
) -> None:
"""Derive a channel key from the channel name (fallback).
MeshCore derives channel secrets as
``SHA-256(name.encode('utf-8'))[:16]``.
Args:
channel_idx: Channel index (0-based).
channel_name: Channel name string (e.g. ``"#test"``).
"""
secret_bytes = sha256(channel_name.encode("utf-8")).digest()[:16]
self.add_channel_key(channel_idx, secret_bytes, source=f"name '{channel_name}'")
@property
def has_keys(self) -> bool:
"""True if at least one channel key has been registered."""
return bool(self._secret_to_idx)
# ------------------------------------------------------------------
# Decode
# ------------------------------------------------------------------
def decode(self, payload_hex: str) -> Optional[DecodedPacket]:
"""Decode a raw LoRa packet hex string.
Two-phase approach:
1. Decode packet **structure** without any key: extracts
``message_hash``, ``payload_type``, ``path_length`` and
``path_hashes``. These fields are in the unencrypted header.
2. For GroupText packets, attempt decryption with each registered
key individually. The key that produces a valid decryption
**is** the channel identifier — ``channel_idx`` is set directly
from the matching key's registration.
Args:
payload_hex: Hex string from the RX_LOG_DATA event's
``payload`` field.
Returns:
:class:`DecodedPacket` on success, ``None`` if the data
is invalid or too short.
"""
if not payload_hex:
return None
# ── Phase 1: structural decode (no key required) ──────────────
try:
packet = MeshCoreDecoder.decode(payload_hex, None)
except Exception as exc:
debug_print(f"PacketDecoder: structural decode error: {exc}")
return None
if not packet.is_valid:
debug_print(f"PacketDecoder: invalid packet: {packet.errors}")
return None
result = DecodedPacket(
message_hash=packet.message_hash,
payload_type=packet.payload_type,
path_length=packet.path_length,
path_hashes=list(packet.path) if packet.path else [],
)
# ── Phase 2: per-key decryption (GroupText only) ──────────────
if packet.payload_type == PayloadType.GroupText and self._secret_to_idx:
for secret_hex, idx in self._secret_to_idx.items():
try:
ks = MeshCoreKeyStore()
ks.add_channel_secrets([secret_hex])
opts = DecryptionOptions(key_store=ks)
dec_pkt = MeshCoreDecoder.decode(payload_hex, opts)
if not dec_pkt.is_valid:
continue
dec_payload = dec_pkt.payload.get("decoded")
if dec_payload and dec_payload.decrypted:
d = dec_payload.decrypted
result.sender = d.get("sender", "") or ""
result.text = d.get("message", "") or ""
result.timestamp = d.get("timestamp", 0)
result.channel_idx = idx
result.is_decrypted = True
debug_print(
f"PacketDecoder: GroupText OK — "
f"hash={result.message_hash}, "
f"sender={result.sender!r}, "
f"ch={result.channel_idx} (key-matched), "
f"path={result.path_hashes}, "
f"text={result.text[:40]!r}"
)
break
except Exception as exc:
debug_print(
f"PacketDecoder: key for ch{idx} error: {exc}"
)
continue
if not result.is_decrypted:
debug_print(
f"PacketDecoder: GroupText NOT decrypted "
f"(hash={result.message_hash}, "
f"{len(self._secret_to_idx)} keys tried)"
)
return result
def get_payload_type_text(self, payload_type: PayloadType) -> str:
"""Get human-friendly name for a PayloadType enum value."""
return get_payload_type_name(payload_type)