mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
606 lines
18 KiB
Python
606 lines
18 KiB
Python
"""
|
||
MeshCore packet decoder for historical packet decryption.
|
||
Based on https://github.com/michaelhart/meshcore-decoder
|
||
"""
|
||
|
||
import hashlib
|
||
import hmac
|
||
import logging
|
||
from dataclasses import dataclass
|
||
from enum import IntEnum
|
||
|
||
import nacl.bindings
|
||
from Crypto.Cipher import AES
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class PayloadType(IntEnum):
|
||
REQUEST = 0x00
|
||
RESPONSE = 0x01
|
||
TEXT_MESSAGE = 0x02
|
||
ACK = 0x03
|
||
ADVERT = 0x04
|
||
GROUP_TEXT = 0x05
|
||
GROUP_DATA = 0x06
|
||
ANON_REQUEST = 0x07
|
||
PATH = 0x08
|
||
TRACE = 0x09
|
||
MULTIPART = 0x0A
|
||
CONTROL = 0x0B
|
||
RAW_CUSTOM = 0x0F
|
||
|
||
|
||
class RouteType(IntEnum):
|
||
TRANSPORT_FLOOD = 0x00
|
||
FLOOD = 0x01
|
||
DIRECT = 0x02
|
||
TRANSPORT_DIRECT = 0x03
|
||
|
||
|
||
@dataclass
|
||
class DecryptedGroupText:
|
||
"""Result of decrypting a GroupText (channel) message."""
|
||
|
||
timestamp: int
|
||
flags: int
|
||
sender: str | None
|
||
message: str
|
||
channel_hash: str
|
||
|
||
|
||
@dataclass
|
||
class DecryptedDirectMessage:
|
||
"""Result of decrypting a TEXT_MESSAGE (direct message)."""
|
||
|
||
timestamp: int
|
||
flags: int
|
||
message: str
|
||
dest_hash: str # First byte of destination pubkey as hex
|
||
src_hash: str # First byte of sender pubkey as hex
|
||
|
||
|
||
@dataclass
|
||
class ParsedAdvertisement:
|
||
"""Result of parsing an advertisement packet."""
|
||
|
||
public_key: str # 64-char hex
|
||
timestamp: int # Unix timestamp from the advertisement
|
||
name: str | None
|
||
lat: float | None
|
||
lon: float | None
|
||
device_role: int # 1=Chat, 2=Repeater, 3=Room, 4=Sensor
|
||
|
||
|
||
@dataclass
|
||
class PacketInfo:
|
||
"""Basic packet header info."""
|
||
|
||
route_type: RouteType
|
||
payload_type: PayloadType
|
||
payload_version: int
|
||
path_length: int
|
||
path: bytes # The routing path (empty if path_length is 0)
|
||
payload: bytes
|
||
|
||
|
||
def calculate_channel_hash(channel_key: bytes) -> str:
|
||
"""
|
||
Calculate the channel hash from a 16-byte channel key.
|
||
Returns the first byte of SHA256(key) as hex.
|
||
"""
|
||
hash_bytes = hashlib.sha256(channel_key).digest()
|
||
return format(hash_bytes[0], "02x")
|
||
|
||
|
||
def extract_payload(raw_packet: bytes) -> bytes | None:
|
||
"""
|
||
Extract just the payload from a raw packet, skipping header and path.
|
||
|
||
Packet structure:
|
||
- Byte 0: header (route_type, payload_type, version)
|
||
- For TRANSPORT routes: bytes 1-4 are transport codes
|
||
- Next byte: path_length
|
||
- Next path_length bytes: path data
|
||
- Remaining: payload
|
||
|
||
Returns the payload bytes, or None if packet is malformed.
|
||
"""
|
||
if len(raw_packet) < 2:
|
||
return None
|
||
|
||
try:
|
||
header = raw_packet[0]
|
||
route_type = header & 0x03
|
||
offset = 1
|
||
|
||
# Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
|
||
if route_type in (0x00, 0x03):
|
||
if len(raw_packet) < offset + 4:
|
||
return None
|
||
offset += 4
|
||
|
||
# Get path length
|
||
if len(raw_packet) < offset + 1:
|
||
return None
|
||
path_length = raw_packet[offset]
|
||
offset += 1
|
||
|
||
# Skip path data
|
||
if len(raw_packet) < offset + path_length:
|
||
return None
|
||
offset += path_length
|
||
|
||
# Rest is payload
|
||
return raw_packet[offset:]
|
||
except (ValueError, IndexError):
|
||
return None
|
||
|
||
|
||
def parse_packet(raw_packet: bytes) -> PacketInfo | None:
|
||
"""Parse a raw packet and extract basic info."""
|
||
if len(raw_packet) < 2:
|
||
return None
|
||
|
||
try:
|
||
header = raw_packet[0]
|
||
route_type = RouteType(header & 0x03)
|
||
payload_type = PayloadType((header >> 2) & 0x0F)
|
||
payload_version = (header >> 6) & 0x03
|
||
|
||
offset = 1
|
||
|
||
# Skip transport codes if present
|
||
if route_type in (RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT):
|
||
if len(raw_packet) < offset + 4:
|
||
return None
|
||
offset += 4
|
||
|
||
# Get path length
|
||
if len(raw_packet) < offset + 1:
|
||
return None
|
||
path_length = raw_packet[offset]
|
||
offset += 1
|
||
|
||
# Extract path data
|
||
if len(raw_packet) < offset + path_length:
|
||
return None
|
||
path = raw_packet[offset : offset + path_length]
|
||
offset += path_length
|
||
|
||
# Rest is payload
|
||
payload = raw_packet[offset:]
|
||
|
||
return PacketInfo(
|
||
route_type=route_type,
|
||
payload_type=payload_type,
|
||
payload_version=payload_version,
|
||
path_length=path_length,
|
||
path=path,
|
||
payload=payload,
|
||
)
|
||
except (ValueError, IndexError):
|
||
return None
|
||
|
||
|
||
def decrypt_group_text(payload: bytes, channel_key: bytes) -> DecryptedGroupText | None:
|
||
"""
|
||
Decrypt a GroupText payload using the channel key.
|
||
|
||
GroupText structure:
|
||
- channel_hash (1 byte): First byte of SHA256 of channel key
|
||
- cipher_mac (2 bytes): First 2 bytes of HMAC-SHA256
|
||
- ciphertext (rest): AES-128 ECB encrypted content
|
||
|
||
Decrypted content structure:
|
||
- timestamp (4 bytes, little-endian)
|
||
- flags (1 byte)
|
||
- message text (null-terminated string, format: "sender: message")
|
||
"""
|
||
if len(payload) < 3:
|
||
return None
|
||
|
||
channel_hash = format(payload[0], "02x")
|
||
cipher_mac = payload[1:3]
|
||
ciphertext = payload[3:]
|
||
|
||
if len(ciphertext) == 0 or len(ciphertext) % 16 != 0:
|
||
# AES requires 16-byte blocks
|
||
return None
|
||
|
||
# Create the 32-byte channel secret (key + 16 zero bytes)
|
||
channel_secret = channel_key + bytes(16)
|
||
|
||
# Verify MAC: HMAC-SHA256 of ciphertext using full 32-byte secret
|
||
calculated_mac = hmac.new(channel_secret, ciphertext, hashlib.sha256).digest()
|
||
if calculated_mac[:2] != cipher_mac:
|
||
return None
|
||
|
||
# Decrypt using AES-128 ECB with the 16-byte key
|
||
try:
|
||
cipher = AES.new(channel_key, AES.MODE_ECB)
|
||
decrypted = cipher.decrypt(ciphertext)
|
||
except Exception as e:
|
||
logger.debug("AES decryption failed: %s", e)
|
||
return None
|
||
|
||
if len(decrypted) < 5:
|
||
return None
|
||
|
||
# Parse decrypted content
|
||
timestamp = int.from_bytes(decrypted[0:4], "little")
|
||
flags = decrypted[4]
|
||
|
||
# Extract message text (UTF-8, null-terminated)
|
||
message_bytes = decrypted[5:]
|
||
try:
|
||
message_text = message_bytes.decode("utf-8")
|
||
# Remove null terminator and any padding
|
||
null_idx = message_text.find("\x00")
|
||
if null_idx >= 0:
|
||
message_text = message_text[:null_idx]
|
||
except UnicodeDecodeError:
|
||
return None
|
||
|
||
# Parse "sender: message" format
|
||
sender = None
|
||
content = message_text
|
||
colon_idx = message_text.find(": ")
|
||
if 0 < colon_idx < 50:
|
||
potential_sender = message_text[:colon_idx]
|
||
# Check for invalid characters in sender name
|
||
if not any(c in potential_sender for c in ":[]\x00"):
|
||
sender = potential_sender
|
||
content = message_text[colon_idx + 2 :]
|
||
|
||
return DecryptedGroupText(
|
||
timestamp=timestamp,
|
||
flags=flags,
|
||
sender=sender,
|
||
message=content,
|
||
channel_hash=channel_hash,
|
||
)
|
||
|
||
|
||
def try_decrypt_packet_with_channel_key(
|
||
raw_packet: bytes, channel_key: bytes
|
||
) -> DecryptedGroupText | None:
|
||
"""
|
||
Try to decrypt a raw packet using a channel key.
|
||
Returns decrypted content if successful, None otherwise.
|
||
"""
|
||
packet_info = parse_packet(raw_packet)
|
||
if packet_info is None:
|
||
return None
|
||
|
||
# Only GroupText packets can be decrypted with channel keys
|
||
if packet_info.payload_type != PayloadType.GROUP_TEXT:
|
||
return None
|
||
|
||
# Check if channel hash matches
|
||
if len(packet_info.payload) < 1:
|
||
return None
|
||
|
||
packet_channel_hash = format(packet_info.payload[0], "02x")
|
||
expected_hash = calculate_channel_hash(channel_key)
|
||
|
||
if packet_channel_hash != expected_hash:
|
||
return None
|
||
|
||
return decrypt_group_text(packet_info.payload, channel_key)
|
||
|
||
|
||
def get_packet_payload_type(raw_packet: bytes) -> PayloadType | None:
|
||
"""Get the payload type of a raw packet without full parsing."""
|
||
if len(raw_packet) < 1:
|
||
return None
|
||
header = raw_packet[0]
|
||
try:
|
||
return PayloadType((header >> 2) & 0x0F)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def parse_advertisement(payload: bytes) -> ParsedAdvertisement | None:
|
||
"""
|
||
Parse an advertisement payload.
|
||
|
||
Advertisement payload structure (101+ bytes):
|
||
- Bytes 0-31 (32 bytes): Public Key (Ed25519)
|
||
- Bytes 32-35 (4 bytes): Timestamp (Unix timestamp, little-endian)
|
||
- Bytes 36-99 (64 bytes): Signature (Ed25519)
|
||
- Byte 100 (1 byte): App Flags
|
||
- Bits 0-3: Device Role (1=Chat, 2=Repeater, 3=Room, 4=Sensor)
|
||
- Bit 4 (0x10): HasLocation
|
||
- Bit 5 (0x20): HasFeature1
|
||
- Bit 6 (0x40): HasFeature2
|
||
- Bit 7 (0x80): HasName
|
||
- If HasLocation: 8 bytes (4 lat + 4 lon as signed int32 LE / 1e6)
|
||
- If HasFeature1: 2 bytes (skipped)
|
||
- If HasFeature2: 2 bytes (skipped)
|
||
- If HasName: remaining bytes = name (UTF-8)
|
||
"""
|
||
# Minimum: pubkey(32) + timestamp(4) + sig(64) + flags(1) = 101 bytes
|
||
if len(payload) < 101:
|
||
return None
|
||
|
||
# Parse fixed-position fields
|
||
public_key = payload[0:32].hex()
|
||
timestamp = int.from_bytes(payload[32:36], byteorder="little")
|
||
flags = payload[100]
|
||
|
||
# Parse flags
|
||
device_role = flags & 0x0F
|
||
has_location = bool(flags & 0x10)
|
||
has_feature1 = bool(flags & 0x20)
|
||
has_feature2 = bool(flags & 0x40)
|
||
has_name = bool(flags & 0x80)
|
||
|
||
# Start parsing variable-length app data after flags
|
||
offset = 101
|
||
lat = None
|
||
lon = None
|
||
name = None
|
||
|
||
# Parse location if present (8 bytes: 4 lat + 4 lon)
|
||
if has_location:
|
||
if len(payload) < offset + 8:
|
||
return ParsedAdvertisement(
|
||
public_key=public_key,
|
||
timestamp=timestamp,
|
||
name=None,
|
||
lat=None,
|
||
lon=None,
|
||
device_role=device_role,
|
||
)
|
||
lat_raw = int.from_bytes(payload[offset : offset + 4], byteorder="little", signed=True)
|
||
lon_raw = int.from_bytes(payload[offset + 4 : offset + 8], byteorder="little", signed=True)
|
||
lat = lat_raw / 1_000_000
|
||
lon = lon_raw / 1_000_000
|
||
offset += 8
|
||
|
||
# Skip feature fields if present
|
||
if has_feature1:
|
||
offset += 2
|
||
if has_feature2:
|
||
offset += 2
|
||
|
||
# Parse name if present (remaining bytes)
|
||
if has_name and len(payload) > offset:
|
||
name_bytes = payload[offset:]
|
||
try:
|
||
# Decode name, strip null bytes and control characters
|
||
name = name_bytes.decode("utf-8", errors="ignore")
|
||
# Remove null terminator and anything after
|
||
null_idx = name.find("\x00")
|
||
if null_idx >= 0:
|
||
name = name[:null_idx]
|
||
# Strip control characters and whitespace
|
||
name = "".join(c for c in name if c >= " " or c in "\t").strip()
|
||
if not name or not any(c.isalnum() for c in name):
|
||
name = None
|
||
except Exception:
|
||
name = None
|
||
|
||
return ParsedAdvertisement(
|
||
public_key=public_key,
|
||
timestamp=timestamp,
|
||
name=name,
|
||
lat=lat,
|
||
lon=lon,
|
||
device_role=device_role,
|
||
)
|
||
|
||
|
||
# =============================================================================
|
||
# Direct Message (TEXT_MESSAGE) Decryption
|
||
# =============================================================================
|
||
|
||
|
||
def _clamp_scalar(k: bytes) -> bytes:
|
||
"""
|
||
Clamp a 32-byte scalar for X25519.
|
||
|
||
This applies the standard X25519 clamping to ensure the scalar
|
||
is in the correct form for elliptic curve operations.
|
||
|
||
Note: MeshCore private keys are already clamped (they store the post-SHA-512
|
||
scalar directly rather than a seed). Clamping is idempotent, so this is safe.
|
||
"""
|
||
clamped = bytearray(k[:32])
|
||
clamped[0] &= 248
|
||
clamped[31] &= 63
|
||
clamped[31] |= 64
|
||
return bytes(clamped)
|
||
|
||
|
||
def derive_public_key(private_key: bytes) -> bytes:
|
||
"""
|
||
Derive the Ed25519 public key from a MeshCore private key.
|
||
|
||
**MeshCore Key Format:**
|
||
MeshCore stores a non-standard Ed25519 private key format:
|
||
- First 32 bytes: The scalar (already post-SHA-512 and clamped)
|
||
- Last 32 bytes: The signing prefix (used during signature generation)
|
||
|
||
Standard Ed25519 libraries expect a 32-byte seed and derive the scalar via
|
||
SHA-512. Using `SigningKey(private_bytes)` will produce the WRONG public key.
|
||
|
||
To derive the correct public key, we use direct scalar × basepoint multiplication
|
||
with the noclamp variant (since the scalar is already clamped).
|
||
|
||
Args:
|
||
private_key: 64-byte MeshCore private key (or just the first 32 bytes)
|
||
|
||
Returns:
|
||
32-byte Ed25519 public key
|
||
"""
|
||
scalar = private_key[:32]
|
||
# Use noclamp because MeshCore stores already-clamped scalars
|
||
return nacl.bindings.crypto_scalarmult_ed25519_base_noclamp(scalar)
|
||
|
||
|
||
def derive_shared_secret(our_private_key: bytes, their_public_key: bytes) -> bytes:
|
||
"""
|
||
Derive ECDH shared secret from Ed25519 keys.
|
||
|
||
MeshCore uses Ed25519 keys, but ECDH requires X25519. This function:
|
||
1. Clamps our private key scalar for X25519 (idempotent since already clamped)
|
||
2. Converts their Ed25519 public key to X25519
|
||
3. Performs X25519 scalar multiplication to get the shared secret
|
||
|
||
**MeshCore Key Format:**
|
||
MeshCore private keys store the scalar directly (not a seed), so the first
|
||
32 bytes are already the post-SHA-512 clamped scalar. See `derive_public_key`
|
||
for details.
|
||
|
||
Args:
|
||
our_private_key: 64-byte MeshCore private key (only first 32 bytes used)
|
||
their_public_key: Their 32-byte Ed25519 public key
|
||
|
||
Returns:
|
||
32-byte shared secret
|
||
"""
|
||
# Clamp the first 32 bytes of our private key (idempotent for MeshCore keys)
|
||
clamped = _clamp_scalar(our_private_key[:32])
|
||
|
||
# Convert their Ed25519 public key to X25519
|
||
x25519_pub = nacl.bindings.crypto_sign_ed25519_pk_to_curve25519(their_public_key)
|
||
|
||
# Perform X25519 ECDH
|
||
return nacl.bindings.crypto_scalarmult(clamped, x25519_pub)
|
||
|
||
|
||
def decrypt_direct_message(payload: bytes, shared_secret: bytes) -> DecryptedDirectMessage | None:
|
||
"""
|
||
Decrypt a TEXT_MESSAGE payload using the ECDH shared secret.
|
||
|
||
TEXT_MESSAGE payload structure:
|
||
- dest_hash (1 byte): First byte of destination public key
|
||
- src_hash (1 byte): First byte of sender public key
|
||
- mac (2 bytes): First 2 bytes of HMAC-SHA256(shared_secret, ciphertext)
|
||
- ciphertext (rest): AES-128-ECB encrypted content
|
||
|
||
Decrypted content structure:
|
||
- timestamp (4 bytes, little-endian)
|
||
- flags (1 byte)
|
||
- message text (null-padded)
|
||
|
||
Args:
|
||
payload: The TEXT_MESSAGE payload bytes
|
||
shared_secret: 32-byte ECDH shared secret
|
||
|
||
Returns:
|
||
DecryptedDirectMessage if successful, None otherwise
|
||
"""
|
||
if len(payload) < 4:
|
||
return None
|
||
|
||
dest_hash = format(payload[0], "02x")
|
||
src_hash = format(payload[1], "02x")
|
||
mac = payload[2:4]
|
||
ciphertext = payload[4:]
|
||
|
||
if len(ciphertext) == 0 or len(ciphertext) % 16 != 0:
|
||
# AES requires 16-byte blocks
|
||
return None
|
||
|
||
# Verify MAC: HMAC-SHA256(shared_secret, ciphertext)[:2]
|
||
calculated_mac = hmac.new(shared_secret, ciphertext, hashlib.sha256).digest()[:2]
|
||
if calculated_mac != mac:
|
||
return None
|
||
|
||
# Decrypt using AES-128-ECB with shared_secret[:16]
|
||
try:
|
||
cipher = AES.new(shared_secret[:16], AES.MODE_ECB)
|
||
decrypted = cipher.decrypt(ciphertext)
|
||
except Exception as e:
|
||
logger.debug("AES decryption failed for DM: %s", e)
|
||
return None
|
||
|
||
if len(decrypted) < 5:
|
||
return None
|
||
|
||
# Parse decrypted content
|
||
timestamp = int.from_bytes(decrypted[0:4], "little")
|
||
flags = decrypted[4]
|
||
|
||
# Extract message text (UTF-8, null-padded)
|
||
message_bytes = decrypted[5:]
|
||
try:
|
||
message_text = message_bytes.decode("utf-8")
|
||
# Remove null terminator and any padding
|
||
message_text = message_text.rstrip("\x00")
|
||
except UnicodeDecodeError:
|
||
return None
|
||
|
||
return DecryptedDirectMessage(
|
||
timestamp=timestamp,
|
||
flags=flags,
|
||
message=message_text,
|
||
dest_hash=dest_hash,
|
||
src_hash=src_hash,
|
||
)
|
||
|
||
|
||
def try_decrypt_dm(
|
||
raw_packet: bytes,
|
||
our_private_key: bytes,
|
||
their_public_key: bytes,
|
||
our_public_key: bytes | None = None,
|
||
) -> DecryptedDirectMessage | None:
|
||
"""
|
||
Try to decrypt a raw packet as a direct message.
|
||
|
||
This performs several checks before attempting expensive ECDH:
|
||
1. Packet must be TEXT_MESSAGE type
|
||
2. dest_hash must match first byte of our public key (or their key for outbound)
|
||
3. src_hash must match first byte of their public key (or our key for outbound)
|
||
|
||
Args:
|
||
raw_packet: The complete raw packet bytes
|
||
our_private_key: Our 64-byte Ed25519 private key
|
||
their_public_key: Their 32-byte Ed25519 public key
|
||
our_public_key: Our 32-byte Ed25519 public key (optional, for bidirectional check)
|
||
|
||
Returns:
|
||
DecryptedDirectMessage if successful, None otherwise
|
||
"""
|
||
packet_info = parse_packet(raw_packet)
|
||
if packet_info is None:
|
||
return None
|
||
|
||
# Only TEXT_MESSAGE packets can be decrypted as DMs
|
||
if packet_info.payload_type != PayloadType.TEXT_MESSAGE:
|
||
return None
|
||
|
||
if len(packet_info.payload) < 4:
|
||
return None
|
||
|
||
# Extract dest/src hashes from payload
|
||
dest_hash = packet_info.payload[0]
|
||
src_hash = packet_info.payload[1]
|
||
|
||
# Check if this packet is for us (inbound: them -> us)
|
||
their_first_byte = their_public_key[0]
|
||
is_inbound = src_hash == their_first_byte
|
||
|
||
# Check if this packet is from us (outbound: us -> them)
|
||
is_outbound = False
|
||
if our_public_key is not None:
|
||
our_first_byte = our_public_key[0]
|
||
is_outbound = src_hash == our_first_byte and dest_hash == their_first_byte
|
||
|
||
if not is_inbound and not is_outbound:
|
||
# Packet doesn't match this contact conversation
|
||
return None
|
||
|
||
# Derive shared secret and attempt decryption
|
||
try:
|
||
shared_secret = derive_shared_secret(our_private_key, their_public_key)
|
||
except Exception as e:
|
||
logger.debug("Failed to derive shared secret: %s", e)
|
||
return None
|
||
|
||
return decrypt_direct_message(packet_info.payload, shared_secret)
|