""" MeshCore packet decoder for historical packet decryption. Based on https://github.com/michaelhart/meshcore-decoder """ import hmac import hashlib import logging from dataclasses import dataclass from enum import IntEnum from Crypto.Cipher import AES logger = logging.getLogger(__name__) class PayloadType(IntEnum): REQUEST = 0x00 RESPONSE = 0x01 TEXT_MESSAGE = 0x02 ACK = 0x03 ADVERT = 0x04 GROUP_TEXT = 0x05 GROUP_DATA = 0x06 ANON_REQUEST = 0x07 PATH = 0x08 TRACE = 0x09 MULTIPART = 0x0A CONTROL = 0x0B RAW_CUSTOM = 0x0F class RouteType(IntEnum): TRANSPORT_FLOOD = 0x00 FLOOD = 0x01 DIRECT = 0x02 TRANSPORT_DIRECT = 0x03 @dataclass class DecryptedGroupText: """Result of decrypting a GroupText (channel) message.""" timestamp: int flags: int sender: str | None message: str channel_hash: str @dataclass class ParsedAdvertisement: """Result of parsing an advertisement packet.""" public_key: str # 64-char hex timestamp: int # Unix timestamp from the advertisement name: str | None lat: float | None lon: float | None device_role: int # 1=Chat, 2=Repeater, 3=Room, 4=Sensor @dataclass class PacketInfo: """Basic packet header info.""" route_type: RouteType payload_type: PayloadType payload_version: int path_length: int payload: bytes def calculate_channel_hash(channel_key: bytes) -> str: """ Calculate the channel hash from a 16-byte channel key. Returns the first byte of SHA256(key) as hex. """ hash_bytes = hashlib.sha256(channel_key).digest() return format(hash_bytes[0], "02x") def extract_payload(raw_packet: bytes) -> bytes | None: """ Extract just the payload from a raw packet, skipping header and path. Packet structure: - Byte 0: header (route_type, payload_type, version) - For TRANSPORT routes: bytes 1-4 are transport codes - Next byte: path_length - Next path_length bytes: path data - Remaining: payload Returns the payload bytes, or None if packet is malformed. """ if len(raw_packet) < 2: return None try: header = raw_packet[0] route_type = header & 0x03 offset = 1 # Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3) if route_type in (0x00, 0x03): if len(raw_packet) < offset + 4: return None offset += 4 # Get path length if len(raw_packet) < offset + 1: return None path_length = raw_packet[offset] offset += 1 # Skip path data if len(raw_packet) < offset + path_length: return None offset += path_length # Rest is payload return raw_packet[offset:] except (ValueError, IndexError): return None def parse_packet(raw_packet: bytes) -> PacketInfo | None: """Parse a raw packet and extract basic info.""" if len(raw_packet) < 2: return None try: header = raw_packet[0] route_type = RouteType(header & 0x03) payload_type = PayloadType((header >> 2) & 0x0F) payload_version = (header >> 6) & 0x03 offset = 1 # Skip transport codes if present if route_type in (RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT): if len(raw_packet) < offset + 4: return None offset += 4 # Get path length if len(raw_packet) < offset + 1: return None path_length = raw_packet[offset] offset += 1 # Skip path data if len(raw_packet) < offset + path_length: return None offset += path_length # Rest is payload payload = raw_packet[offset:] return PacketInfo( route_type=route_type, payload_type=payload_type, payload_version=payload_version, path_length=path_length, payload=payload, ) except (ValueError, IndexError): return None def decrypt_group_text( payload: bytes, channel_key: bytes ) -> DecryptedGroupText | None: """ Decrypt a GroupText payload using the channel key. GroupText structure: - channel_hash (1 byte): First byte of SHA256 of channel key - cipher_mac (2 bytes): First 2 bytes of HMAC-SHA256 - ciphertext (rest): AES-128 ECB encrypted content Decrypted content structure: - timestamp (4 bytes, little-endian) - flags (1 byte) - message text (null-terminated string, format: "sender: message") """ if len(payload) < 3: return None channel_hash = format(payload[0], "02x") cipher_mac = payload[1:3] ciphertext = payload[3:] if len(ciphertext) == 0 or len(ciphertext) % 16 != 0: # AES requires 16-byte blocks return None # Create the 32-byte channel secret (key + 16 zero bytes) channel_secret = channel_key + bytes(16) # Verify MAC: HMAC-SHA256 of ciphertext using full 32-byte secret calculated_mac = hmac.new(channel_secret, ciphertext, hashlib.sha256).digest() if calculated_mac[:2] != cipher_mac: return None # Decrypt using AES-128 ECB with the 16-byte key try: cipher = AES.new(channel_key, AES.MODE_ECB) decrypted = cipher.decrypt(ciphertext) except Exception as e: logger.debug("AES decryption failed: %s", e) return None if len(decrypted) < 5: return None # Parse decrypted content timestamp = int.from_bytes(decrypted[0:4], "little") flags = decrypted[4] # Extract message text (UTF-8, null-terminated) message_bytes = decrypted[5:] try: message_text = message_bytes.decode("utf-8") # Remove null terminator and any padding null_idx = message_text.find("\x00") if null_idx >= 0: message_text = message_text[:null_idx] except UnicodeDecodeError: return None # Parse "sender: message" format sender = None content = message_text colon_idx = message_text.find(": ") if 0 < colon_idx < 50: potential_sender = message_text[:colon_idx] # Check for invalid characters in sender name if not any(c in potential_sender for c in ":[]\x00"): sender = potential_sender content = message_text[colon_idx + 2 :] return DecryptedGroupText( timestamp=timestamp, flags=flags, sender=sender, message=content, channel_hash=channel_hash, ) def try_decrypt_packet_with_channel_key( raw_packet: bytes, channel_key: bytes ) -> DecryptedGroupText | None: """ Try to decrypt a raw packet using a channel key. Returns decrypted content if successful, None otherwise. """ packet_info = parse_packet(raw_packet) if packet_info is None: return None # Only GroupText packets can be decrypted with channel keys if packet_info.payload_type != PayloadType.GROUP_TEXT: return None # Check if channel hash matches if len(packet_info.payload) < 1: return None packet_channel_hash = format(packet_info.payload[0], "02x") expected_hash = calculate_channel_hash(channel_key) if packet_channel_hash != expected_hash: return None return decrypt_group_text(packet_info.payload, channel_key) def get_packet_payload_type(raw_packet: bytes) -> PayloadType | None: """Get the payload type of a raw packet without full parsing.""" if len(raw_packet) < 1: return None header = raw_packet[0] try: return PayloadType((header >> 2) & 0x0F) except ValueError: return None def parse_advertisement(payload: bytes) -> ParsedAdvertisement | None: """ Parse an advertisement payload. Advertisement payload structure (101+ bytes): - Bytes 0-31 (32 bytes): Public Key (Ed25519) - Bytes 32-35 (4 bytes): Timestamp (Unix timestamp, little-endian) - Bytes 36-99 (64 bytes): Signature (Ed25519) - Byte 100 (1 byte): App Flags - Bits 0-3: Device Role (1=Chat, 2=Repeater, 3=Room, 4=Sensor) - Bit 4 (0x10): HasLocation - Bit 5 (0x20): HasFeature1 - Bit 6 (0x40): HasFeature2 - Bit 7 (0x80): HasName - If HasLocation: 8 bytes (4 lat + 4 lon as signed int32 LE / 1e6) - If HasFeature1: 2 bytes (skipped) - If HasFeature2: 2 bytes (skipped) - If HasName: remaining bytes = name (UTF-8) """ # Minimum: pubkey(32) + timestamp(4) + sig(64) + flags(1) = 101 bytes if len(payload) < 101: return None # Parse fixed-position fields public_key = payload[0:32].hex() timestamp = int.from_bytes(payload[32:36], byteorder="little") # signature = payload[36:100] # Not currently verified flags = payload[100] # Parse flags device_role = flags & 0x0F has_location = bool(flags & 0x10) has_feature1 = bool(flags & 0x20) has_feature2 = bool(flags & 0x40) has_name = bool(flags & 0x80) # Start parsing variable-length app data after flags offset = 101 lat = None lon = None name = None # Parse location if present (8 bytes: 4 lat + 4 lon) if has_location: if len(payload) < offset + 8: return ParsedAdvertisement( public_key=public_key, timestamp=timestamp, name=None, lat=None, lon=None, device_role=device_role, ) lat_raw = int.from_bytes(payload[offset:offset + 4], byteorder="little", signed=True) lon_raw = int.from_bytes(payload[offset + 4:offset + 8], byteorder="little", signed=True) lat = lat_raw / 1_000_000 lon = lon_raw / 1_000_000 offset += 8 # Skip feature fields if present if has_feature1: offset += 2 if has_feature2: offset += 2 # Parse name if present (remaining bytes) if has_name and len(payload) > offset: name_bytes = payload[offset:] try: # Decode name, strip null bytes and control characters name = name_bytes.decode("utf-8", errors="ignore") # Remove null terminator and anything after null_idx = name.find("\x00") if null_idx >= 0: name = name[:null_idx] # Strip control characters and whitespace name = "".join(c for c in name if c >= " " or c in "\t").strip() if not name or not any(c.isalnum() for c in name): name = None except Exception: name = None return ParsedAdvertisement( public_key=public_key, timestamp=timestamp, name=name, lat=lat, lon=lon, device_role=device_role, ) def try_parse_advertisement(raw_packet: bytes) -> ParsedAdvertisement | None: """ Try to parse a raw packet as an advertisement. Returns parsed advertisement if successful, None otherwise. """ packet_info = parse_packet(raw_packet) if packet_info is None: return None if packet_info.payload_type != PayloadType.ADVERT: return None return parse_advertisement(packet_info.payload)