Files
meshcore-gui/meshcore_gui/ble/packet_decoder.py
2026-03-09 17:53:29 +01:00

217 lines
7.6 KiB
Python

"""
Packet decoder for MeshCore GUI — single-source approach.
Wraps ``meshcoredecoder`` to decode raw LoRa packets from RX_LOG_DATA
events. A single raw packet contains **everything**: message_hash,
path hashes, hop count, and (with channel keys) the decrypted text
and sender name.
No correlation with CHANNEL_MSG_RECV events is needed.
Channel decryption keys are loaded at startup (fetched from the device
via ``get_channel()`` or derived from the channel name as fallback).
"""
from dataclasses import dataclass, field
from hashlib import sha256
from typing import Dict, List, Optional
from meshcoredecoder import MeshCoreDecoder
from meshcoredecoder.crypto.channel_crypto import ChannelCrypto
from meshcoredecoder.crypto.key_manager import MeshCoreKeyStore
from meshcoredecoder.types.crypto import DecryptionOptions
from meshcoredecoder.types.enums import PayloadType
from meshcoredecoder.utils.enum_names import get_payload_type_name
from meshcore_gui.config import debug_print
# Re-export so other modules don't need to import meshcoredecoder
__all__ = ["PacketDecoder", "DecodedPacket", "PayloadType"]
# ---------------------------------------------------------------------------
# Decoded result
# ---------------------------------------------------------------------------
@dataclass
class DecodedPacket:
"""All data extracted from a single raw LoRa packet.
Attributes:
message_hash: Deterministic packet identifier (hex string).
payload_type: Enum (GroupText, Advert, Ack, …).
path_length: Number of repeater hashes in the path.
path_hashes: 2-char hex strings, one per repeater.
sender: Sender name (GroupText only, after decryption).
text: Message body (GroupText only, after decryption).
channel_idx: Channel index (GroupText only, via hash→idx map).
timestamp: Message timestamp (GroupText only).
is_decrypted: True if payload was successfully decrypted.
"""
message_hash: str
payload_type: PayloadType
path_length: int
path_hashes: List[str] = field(default_factory=list)
# GroupText-specific (populated after successful decryption)
sender: str = ""
text: str = ""
channel_idx: Optional[int] = None
timestamp: int = 0
is_decrypted: bool = False
# ---------------------------------------------------------------------------
# Decoder
# ---------------------------------------------------------------------------
class PacketDecoder:
"""Decode raw LoRa packets with channel-key decryption.
Usage::
decoder = PacketDecoder()
decoder.add_channel_key(0, secret_bytes) # from device
decoder.add_channel_key_from_name(1, "#test") # fallback
result = decoder.decode(payload_hex)
if result and result.is_decrypted:
print(result.sender, result.text, result.path_hashes)
"""
def __init__(self) -> None:
self._key_store = MeshCoreKeyStore()
self._options: Optional[DecryptionOptions] = None
# channel_hash (2-char lower hex) → channel_idx
self._hash_to_idx: Dict[str, int] = {}
# ------------------------------------------------------------------
# Key management
# ------------------------------------------------------------------
def add_channel_key(
self,
channel_idx: int,
secret_bytes: bytes,
source: str = "device",
) -> None:
"""Register a channel decryption key (16 raw bytes from device).
Args:
channel_idx: Channel index (0-based).
secret_bytes: 16-byte channel secret from ``get_channel()``.
source: Label for debug output (e.g. "device", "cache").
"""
secret_hex = secret_bytes.hex()
self._key_store.add_channel_secrets([secret_hex])
self._rebuild_options()
ch_hash = ChannelCrypto.calculate_channel_hash(secret_hex).lower()
self._hash_to_idx[ch_hash] = channel_idx
debug_print(
f"PacketDecoder: key for ch{channel_idx} "
f"(hash={ch_hash}, from {source})"
)
def add_channel_key_from_name(
self, channel_idx: int, channel_name: str,
) -> None:
"""Derive a channel key from the channel name (fallback).
MeshCore derives channel secrets as
``SHA-256(name.encode('utf-8'))[:16]``.
Args:
channel_idx: Channel index (0-based).
channel_name: Channel name string (e.g. ``"#test"``).
"""
secret_bytes = sha256(channel_name.encode("utf-8")).digest()[:16]
self.add_channel_key(channel_idx, secret_bytes, source=f"name '{channel_name}'")
@property
def has_keys(self) -> bool:
"""True if at least one channel key has been registered."""
return self._options is not None
# ------------------------------------------------------------------
# Decode
# ------------------------------------------------------------------
def decode(self, payload_hex: str) -> Optional[DecodedPacket]:
"""Decode a raw LoRa packet hex string.
Args:
payload_hex: Hex string from the RX_LOG_DATA event's
``payload`` field.
Returns:
:class:`DecodedPacket` on success, ``None`` if the data
is invalid or too short.
"""
if not payload_hex:
return None
try:
packet = MeshCoreDecoder.decode(payload_hex, self._options)
except Exception as exc:
debug_print(f"PacketDecoder: decode error: {exc}")
return None
if not packet.is_valid:
debug_print(f"PacketDecoder: invalid: {packet.errors}")
return None
result = DecodedPacket(
message_hash=packet.message_hash,
payload_type=packet.payload_type,
path_length=packet.path_length,
path_hashes=list(packet.path) if packet.path else [],
)
# --- GroupText decryption ---
if packet.payload_type == PayloadType.GroupText:
decoded_payload = packet.payload.get("decoded")
if decoded_payload and decoded_payload.decrypted:
d = decoded_payload.decrypted
result.sender = d.get("sender", "") or ""
result.text = d.get("message", "") or ""
result.timestamp = d.get("timestamp", 0)
result.is_decrypted = True
# Resolve channel_hash → channel_idx
ch_hash = decoded_payload.channel_hash.lower()
result.channel_idx = self._hash_to_idx.get(ch_hash)
debug_print(
f"PacketDecoder: GroupText OK — "
f"hash={result.message_hash}, "
f"sender={result.sender!r}, "
f"ch={result.channel_idx}, "
f"path={result.path_hashes}, "
f"text={result.text[:40]!r}"
)
else:
debug_print(
f"PacketDecoder: GroupText NOT decrypted "
f"(hash={result.message_hash})"
)
return result
def get_payload_type_text(self, payload_type: PayloadType) -> str:
"""Get human-friendly name for a PayloadType enum value."""
return get_payload_type_name(payload_type)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _rebuild_options(self) -> None:
"""Recreate DecryptionOptions after a key change."""
self._options = DecryptionOptions(key_store=self._key_store)