mirror of
https://github.com/pe1hvh/meshcore-gui.git
synced 2026-05-05 13:02:27 +02:00
fix(packet_decoder): brute-force channel resolution when hash lookup fails
ChannelCrypto.calculate_channel_hash() and the MeshCore firmware compute
different channel identifiers for the same secret, causing _hash_to_idx to
return None for all hashtag-channel messages. Fallback: try each registered
key individually via a single-key keystore. First valid decryption wins.
Result cached in _hash_to_idx for O(1) resolution on subsequent packets.
fix(events): channel-agnostic sentinel prevents cross-channel duplicates
on_rx_log now marks '*' in DualDeduplicator after storing a message.
on_channel_msg checks this sentinel and suppresses storage regardless of
channel_idx or message_hash differences between the two library systems.
Fixes #mc-radar messages appearing in #weather and vice versa.
fix(events): secondary path-cache keyed by content for hash-mismatch
_path_cache keyed by meshcoredecoder hash; CHANNEL_MSG_RECV carries meshcore
hash (different value). Added _path_cache_by_content ("sender:text[:100]")
as fallback so path_hashes are recovered when the two hashes disagree.
fix(commands,cache): remove stale channel key after del_channel reindex
Cache entry for old_idx not removed after slot move, causing same channel
to appear twice. New DeviceCache.remove_channel_key(idx) called after each
move. asyncio.sleep(0.5) added before re-discovery to let device settle.
feat(channel_panel,commands,dashboard): channel edit — move/reindex support
First channel-edit capability in the GUI. New '↕️ Move / Reindex' mode in
Channel Manager dialog. Source channel selected from dropdown, target index
in number field. ↕ button inline with 🗑 in Messages and Archive submenus.
_cmd_move_channel reads secret from cache or device, writes new slot, clears
old slot, updates cache atomically, triggers re-discovery with settle delay.
243 lines
9.4 KiB
Python
243 lines
9.4 KiB
Python
"""
|
|
Packet decoder for MeshCore GUI — single-source approach.
|
|
|
|
Wraps ``meshcoredecoder`` to decode raw LoRa packets from RX_LOG_DATA
|
|
events. A single raw packet contains **everything**: message_hash,
|
|
path hashes, hop count, and (with channel keys) the decrypted text
|
|
and sender name.
|
|
|
|
No correlation with CHANNEL_MSG_RECV events is needed.
|
|
|
|
Channel attribution
|
|
~~~~~~~~~~~~~~~~~~~
|
|
The channel a message belongs to is determined by **which registered key
|
|
successfully decrypts the payload** — not by any channel index or the
|
|
``channel_hash`` embedded in the packet header. The firmware-embedded
|
|
``channel_hash`` is ignored for attribution because ``ChannelCrypto``
|
|
and the firmware may compute it differently, making hash-based lookup
|
|
unreliable.
|
|
|
|
Decryption is therefore attempted per-key (one ``MeshCoreDecoder.decode``
|
|
call per registered channel). For a typical deployment with fewer than
|
|
ten channels this cost is negligible, and the correct channel is always
|
|
identified deterministically.
|
|
|
|
Channel decryption keys are loaded at startup (fetched from the device
|
|
via ``get_channel()`` or derived from the channel name as fallback).
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from hashlib import sha256
|
|
from typing import Dict, List, Optional
|
|
|
|
from meshcoredecoder import MeshCoreDecoder
|
|
from meshcoredecoder.crypto.key_manager import MeshCoreKeyStore
|
|
from meshcoredecoder.types.crypto import DecryptionOptions
|
|
from meshcoredecoder.types.enums import PayloadType
|
|
from meshcoredecoder.utils.enum_names import get_payload_type_name
|
|
|
|
from meshcore_gui.config import debug_print
|
|
|
|
|
|
# Re-export so other modules don't need to import meshcoredecoder
|
|
__all__ = ["PacketDecoder", "DecodedPacket", "PayloadType"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decoded result
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class DecodedPacket:
|
|
"""All data extracted from a single raw LoRa packet.
|
|
|
|
Attributes:
|
|
message_hash: Deterministic packet identifier (hex string).
|
|
payload_type: Enum (GroupText, Advert, Ack, …).
|
|
path_length: Number of repeater hashes in the path.
|
|
path_hashes: 2-char hex strings, one per repeater.
|
|
sender: Sender name (GroupText only, after decryption).
|
|
text: Message body (GroupText only, after decryption).
|
|
channel_idx: Channel index (GroupText only, resolved by key match).
|
|
timestamp: Message timestamp (GroupText only).
|
|
is_decrypted: True if payload was successfully decrypted.
|
|
"""
|
|
|
|
message_hash: str
|
|
payload_type: PayloadType
|
|
path_length: int
|
|
path_hashes: List[str] = field(default_factory=list)
|
|
|
|
# GroupText-specific (populated after successful decryption)
|
|
sender: str = ""
|
|
text: str = ""
|
|
channel_idx: Optional[int] = None
|
|
timestamp: int = 0
|
|
is_decrypted: bool = False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Decoder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class PacketDecoder:
|
|
"""Decode raw LoRa packets with per-key channel attribution.
|
|
|
|
Channel attribution is done by key matching: the registered secret that
|
|
successfully decrypts a GroupText packet identifies its channel. This
|
|
avoids relying on the ``channel_hash`` mechanism, which requires the
|
|
MeshCore firmware and ``meshcoredecoder`` to compute identical hashes —
|
|
a dependency that cannot always be guaranteed.
|
|
|
|
Usage::
|
|
|
|
decoder = PacketDecoder()
|
|
decoder.add_channel_key(0, secret_bytes) # from device
|
|
decoder.add_channel_key_from_name(1, "#test") # fallback
|
|
|
|
result = decoder.decode(payload_hex)
|
|
if result and result.is_decrypted:
|
|
print(result.sender, result.text, result.channel_idx)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
# secret_hex → channel_idx (primary channel attribution map)
|
|
self._secret_to_idx: Dict[str, int] = {}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Key management
|
|
# ------------------------------------------------------------------
|
|
|
|
def add_channel_key(
|
|
self,
|
|
channel_idx: int,
|
|
secret_bytes: bytes,
|
|
source: str = "device",
|
|
) -> None:
|
|
"""Register a channel decryption key (16 raw bytes from device).
|
|
|
|
Args:
|
|
channel_idx: Channel index (0-based).
|
|
secret_bytes: 16-byte channel secret from ``get_channel()``.
|
|
source: Label for debug output (e.g. "device", "cache").
|
|
"""
|
|
secret_hex = secret_bytes.hex()
|
|
self._secret_to_idx[secret_hex] = channel_idx
|
|
debug_print(
|
|
f"PacketDecoder: key registered for ch{channel_idx} "
|
|
f"(source={source}, secret={secret_hex[:8]}…)"
|
|
)
|
|
|
|
def add_channel_key_from_name(
|
|
self, channel_idx: int, channel_name: str,
|
|
) -> None:
|
|
"""Derive a channel key from the channel name (fallback).
|
|
|
|
MeshCore derives channel secrets as
|
|
``SHA-256(name.encode('utf-8'))[:16]``.
|
|
|
|
Args:
|
|
channel_idx: Channel index (0-based).
|
|
channel_name: Channel name string (e.g. ``"#test"``).
|
|
"""
|
|
secret_bytes = sha256(channel_name.encode("utf-8")).digest()[:16]
|
|
self.add_channel_key(channel_idx, secret_bytes, source=f"name '{channel_name}'")
|
|
|
|
@property
|
|
def has_keys(self) -> bool:
|
|
"""True if at least one channel key has been registered."""
|
|
return bool(self._secret_to_idx)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Decode
|
|
# ------------------------------------------------------------------
|
|
|
|
def decode(self, payload_hex: str) -> Optional[DecodedPacket]:
|
|
"""Decode a raw LoRa packet hex string.
|
|
|
|
Two-phase approach:
|
|
|
|
1. Decode packet **structure** without any key: extracts
|
|
``message_hash``, ``payload_type``, ``path_length`` and
|
|
``path_hashes``. These fields are in the unencrypted header.
|
|
|
|
2. For GroupText packets, attempt decryption with each registered
|
|
key individually. The key that produces a valid decryption
|
|
**is** the channel identifier — ``channel_idx`` is set directly
|
|
from the matching key's registration.
|
|
|
|
Args:
|
|
payload_hex: Hex string from the RX_LOG_DATA event's
|
|
``payload`` field.
|
|
|
|
Returns:
|
|
:class:`DecodedPacket` on success, ``None`` if the data
|
|
is invalid or too short.
|
|
"""
|
|
if not payload_hex:
|
|
return None
|
|
|
|
# ── Phase 1: structural decode (no key required) ──────────────
|
|
try:
|
|
packet = MeshCoreDecoder.decode(payload_hex, None)
|
|
except Exception as exc:
|
|
debug_print(f"PacketDecoder: structural decode error: {exc}")
|
|
return None
|
|
|
|
if not packet.is_valid:
|
|
debug_print(f"PacketDecoder: invalid packet: {packet.errors}")
|
|
return None
|
|
|
|
result = DecodedPacket(
|
|
message_hash=packet.message_hash,
|
|
payload_type=packet.payload_type,
|
|
path_length=packet.path_length,
|
|
path_hashes=list(packet.path) if packet.path else [],
|
|
)
|
|
|
|
# ── Phase 2: per-key decryption (GroupText only) ──────────────
|
|
if packet.payload_type == PayloadType.GroupText and self._secret_to_idx:
|
|
for secret_hex, idx in self._secret_to_idx.items():
|
|
try:
|
|
ks = MeshCoreKeyStore()
|
|
ks.add_channel_secrets([secret_hex])
|
|
opts = DecryptionOptions(key_store=ks)
|
|
dec_pkt = MeshCoreDecoder.decode(payload_hex, opts)
|
|
if not dec_pkt.is_valid:
|
|
continue
|
|
dec_payload = dec_pkt.payload.get("decoded")
|
|
if dec_payload and dec_payload.decrypted:
|
|
d = dec_payload.decrypted
|
|
result.sender = d.get("sender", "") or ""
|
|
result.text = d.get("message", "") or ""
|
|
result.timestamp = d.get("timestamp", 0)
|
|
result.channel_idx = idx
|
|
result.is_decrypted = True
|
|
debug_print(
|
|
f"PacketDecoder: GroupText OK — "
|
|
f"hash={result.message_hash}, "
|
|
f"sender={result.sender!r}, "
|
|
f"ch={result.channel_idx} (key-matched), "
|
|
f"path={result.path_hashes}, "
|
|
f"text={result.text[:40]!r}"
|
|
)
|
|
break
|
|
except Exception as exc:
|
|
debug_print(
|
|
f"PacketDecoder: key for ch{idx} error: {exc}"
|
|
)
|
|
continue
|
|
|
|
if not result.is_decrypted:
|
|
debug_print(
|
|
f"PacketDecoder: GroupText NOT decrypted "
|
|
f"(hash={result.message_hash}, "
|
|
f"{len(self._secret_to_idx)} keys tried)"
|
|
)
|
|
|
|
return result
|
|
|
|
def get_payload_type_text(self, payload_type: PayloadType) -> str:
|
|
"""Get human-friendly name for a PayloadType enum value."""
|
|
return get_payload_type_name(payload_type)
|