From 34318e481442b7a649ba2aa925fd9f9b5c0a73c2 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sat, 7 Mar 2026 22:35:53 -0800 Subject: [PATCH] Use more faithful packet frame parsing --- app/decoder.py | 85 ++++----------------- app/fanout/community_mqtt.py | 77 ++++--------------- app/migrations.py | 73 ++---------------- app/path_utils.py | 94 ++++++++++++++++++++++++ frontend/src/components/CrackerPanel.tsx | 41 +---------- frontend/src/test/pathUtils.test.ts | 23 ++++++ frontend/src/utils/pathUtils.ts | 57 ++++++++++++++ tests/test_community_mqtt.py | 39 ++++++++-- tests/test_decoder.py | 15 ++++ tests/test_migrations.py | 22 ++++++ tests/test_path_utils.py | 29 ++++++++ 11 files changed, 310 insertions(+), 245 deletions(-) diff --git a/app/decoder.py b/app/decoder.py index e363b97..314f1ed 100644 --- a/app/decoder.py +++ b/app/decoder.py @@ -107,87 +107,30 @@ def extract_payload(raw_packet: bytes) -> bytes | None: Returns the payload bytes, or None if packet is malformed. """ - from app.path_utils import decode_path_byte, path_wire_len + from app.path_utils import parse_packet_envelope - if len(raw_packet) < 2: - return None - - try: - header = raw_packet[0] - route_type = header & 0x03 - offset = 1 - - # Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3) - if route_type in (0x00, 0x03): - if len(raw_packet) < offset + 4: - return None - offset += 4 - - # Decode packed path byte - if len(raw_packet) < offset + 1: - return None - hop_count, hash_size = decode_path_byte(raw_packet[offset]) - offset += 1 - - # Skip path data - path_bytes = path_wire_len(hop_count, hash_size) - if len(raw_packet) < offset + path_bytes: - return None - offset += path_bytes - - # Rest is payload - return raw_packet[offset:] - except (ValueError, IndexError): - return None + envelope = parse_packet_envelope(raw_packet) + return envelope.payload if envelope is not None else None def parse_packet(raw_packet: bytes) -> PacketInfo | None: """Parse a raw packet and extract basic info.""" - from app.path_utils import decode_path_byte, path_wire_len + from app.path_utils import parse_packet_envelope - if len(raw_packet) < 2: + envelope = parse_packet_envelope(raw_packet) + if envelope is None: return None - try: - header = raw_packet[0] - route_type = RouteType(header & 0x03) - payload_type = PayloadType((header >> 2) & 0x0F) - payload_version = (header >> 6) & 0x03 - - offset = 1 - - # Skip transport codes if present - if route_type in (RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT): - if len(raw_packet) < offset + 4: - return None - offset += 4 - - # Decode packed path byte - if len(raw_packet) < offset + 1: - return None - hop_count, hash_size = decode_path_byte(raw_packet[offset]) - offset += 1 - - # Extract path data - path_byte_len = path_wire_len(hop_count, hash_size) - if len(raw_packet) < offset + path_byte_len: - return None - path = raw_packet[offset : offset + path_byte_len] - offset += path_byte_len - - # Rest is payload - payload = raw_packet[offset:] - return PacketInfo( - route_type=route_type, - payload_type=payload_type, - payload_version=payload_version, - path_length=hop_count, - path_hash_size=hash_size, - path=path, - payload=payload, + route_type=RouteType(envelope.route_type), + payload_type=PayloadType(envelope.payload_type), + payload_version=envelope.payload_version, + path_length=envelope.hop_count, + path_hash_size=envelope.hash_size, + path=envelope.path, + payload=envelope.payload, ) - except (ValueError, IndexError): + except ValueError: return None diff --git a/app/fanout/community_mqtt.py b/app/fanout/community_mqtt.py index 48799f7..67f901f 100644 --- a/app/fanout/community_mqtt.py +++ b/app/fanout/community_mqtt.py @@ -24,7 +24,7 @@ import aiomqtt import nacl.bindings from app.fanout.mqtt_base import BaseMqttPublisher -from app.path_utils import split_path_hex +from app.path_utils import parse_packet_envelope, split_path_hex logger = logging.getLogger(__name__) @@ -143,41 +143,17 @@ def _calculate_packet_hash(raw_bytes: bytes) -> str: return "0" * 16 try: - header = raw_bytes[0] - payload_type = (header >> 2) & 0x0F - route_type = header & 0x03 - - # Transport codes present for TRANSPORT_FLOOD (0) and TRANSPORT_DIRECT (3) - has_transport = route_type in (0x00, 0x03) - - offset = 1 # Past header - if has_transport: - offset += 4 # Skip 4 bytes of transport codes - - # Read path byte (packed as [hash_mode:2][hop_count:6]). - # Invalid/truncated packets map to zero hash. - if offset >= len(raw_bytes): + envelope = parse_packet_envelope(raw_bytes) + if envelope is None: return "0" * 16 - path_byte = raw_bytes[offset] - offset += 1 - hash_mode = (path_byte >> 6) & 0x03 - hop_count = path_byte & 0x3F - hash_size = (hash_mode + 1) if hash_mode < 3 else 1 - path_wire_len = hop_count * hash_size - - # Skip past path to get to payload. Invalid/truncated packets map to zero hash. - if len(raw_bytes) < offset + path_wire_len: - return "0" * 16 - payload_start = offset + path_wire_len - payload_data = raw_bytes[payload_start:] # Hash: payload_type(1 byte) [+ path_byte as uint16_t LE for TRACE] + payload_data # IMPORTANT: TRACE hash uses the raw wire byte (not decoded hop count) to match firmware. hash_obj = hashlib.sha256() - hash_obj.update(bytes([payload_type])) - if payload_type == 9: # PAYLOAD_TYPE_TRACE - hash_obj.update(path_byte.to_bytes(2, byteorder="little")) - hash_obj.update(payload_data) + hash_obj.update(bytes([envelope.payload_type])) + if envelope.payload_type == 9: # PAYLOAD_TYPE_TRACE + hash_obj.update(envelope.path_byte.to_bytes(2, byteorder="little")) + hash_obj.update(envelope.payload) return hash_obj.hexdigest()[:16].upper() except Exception: @@ -198,42 +174,15 @@ def _decode_packet_fields(raw_bytes: bytes) -> tuple[str, str, str, list[str], i payload_type: int | None = None try: - if len(raw_bytes) < 2: + envelope = parse_packet_envelope(raw_bytes) + if envelope is None or envelope.payload_version != 0: return route, packet_type, payload_len, path_values, payload_type - header = raw_bytes[0] - payload_version = (header >> 6) & 0x03 - if payload_version != 0: - return route, packet_type, payload_len, path_values, payload_type - - route_type = header & 0x03 - has_transport = route_type in (0x00, 0x03) - - offset = 1 - if has_transport: - offset += 4 - - if len(raw_bytes) <= offset: - return route, packet_type, payload_len, path_values, payload_type - - path_byte = raw_bytes[offset] - offset += 1 - hash_mode = (path_byte >> 6) & 0x03 - hop_count = path_byte & 0x3F - hash_size = (hash_mode + 1) if hash_mode < 3 else 1 - path_wire_len = hop_count * hash_size - - if len(raw_bytes) < offset + path_wire_len: - return route, packet_type, payload_len, path_values, payload_type - - path_bytes = raw_bytes[offset : offset + path_wire_len] - offset += path_wire_len - - payload_type = (header >> 2) & 0x0F - route = _ROUTE_MAP.get(route_type, "U") + payload_type = envelope.payload_type + route = _ROUTE_MAP.get(envelope.route_type, "U") packet_type = str(payload_type) - payload_len = str(max(0, len(raw_bytes) - offset)) - path_values = split_path_hex(path_bytes.hex(), hop_count) + payload_len = str(len(envelope.payload)) + path_values = split_path_hex(envelope.path.hex(), envelope.hop_count) return route, packet_type, payload_len, path_values, payload_type except Exception: diff --git a/app/migrations.py b/app/migrations.py index bddd80d..6382c8f 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -452,43 +452,14 @@ async def _migrate_004_add_payload_hash_column(conn: aiosqlite.Connection) -> No def _extract_payload_for_hash(raw_packet: bytes) -> bytes | None: """ - Extract payload from a raw packet for hashing (migration-local copy of decoder logic). + Extract payload from a raw packet for hashing using canonical framing validation. Returns the payload bytes, or None if packet is malformed. """ - if len(raw_packet) < 2: - return None + from app.path_utils import parse_packet_envelope - try: - header = raw_packet[0] - route_type = header & 0x03 - offset = 1 - - # Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3) - if route_type in (0x00, 0x03): - if len(raw_packet) < offset + 4: - return None - offset += 4 - - # Get path byte (packed as [hash_mode:2][hop_count:6]) - if len(raw_packet) < offset + 1: - return None - path_byte = raw_packet[offset] - offset += 1 - hash_mode = (path_byte >> 6) & 0x03 - hop_count = path_byte & 0x3F - hash_size = (hash_mode + 1) if hash_mode < 3 else 1 - path_wire_len = hop_count * hash_size - - # Skip path bytes - if len(raw_packet) < offset + path_wire_len: - return None - offset += path_wire_len - - # Rest is payload (may be empty, matching decoder.py behavior) - return raw_packet[offset:] - except (IndexError, ValueError): - return None + envelope = parse_packet_envelope(raw_packet) + return envelope.payload if envelope is not None else None async def _migrate_005_backfill_payload_hashes(conn: aiosqlite.Connection) -> None: @@ -638,42 +609,14 @@ async def _migrate_006_replace_path_len_with_path(conn: aiosqlite.Connection) -> def _extract_path_from_packet(raw_packet: bytes) -> str | None: """ - Extract path hex string from a raw packet (migration-local copy of decoder logic). + Extract path hex string from a raw packet using canonical framing validation. Returns the path as a hex string, or None if packet is malformed. """ - if len(raw_packet) < 2: - return None + from app.path_utils import parse_packet_envelope - try: - header = raw_packet[0] - route_type = header & 0x03 - offset = 1 - - # Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3) - if route_type in (0x00, 0x03): - if len(raw_packet) < offset + 4: - return None - offset += 4 - - # Get path byte (packed as [hash_mode:2][hop_count:6]) - if len(raw_packet) < offset + 1: - return None - path_byte = raw_packet[offset] - offset += 1 - hash_mode = (path_byte >> 6) & 0x03 - hop_count = path_byte & 0x3F - hash_size = (hash_mode + 1) if hash_mode < 3 else 1 - path_wire_len = hop_count * hash_size - - # Extract path bytes - if len(raw_packet) < offset + path_wire_len: - return None - path_bytes = raw_packet[offset : offset + path_wire_len] - - return path_bytes.hex() - except (IndexError, ValueError): - return None + envelope = parse_packet_envelope(raw_packet) + return envelope.path.hex() if envelope is not None else None async def _migrate_007_backfill_message_paths(conn: aiosqlite.Connection) -> None: diff --git a/app/path_utils.py b/app/path_utils.py index 3e9e95c..0c58744 100644 --- a/app/path_utils.py +++ b/app/path_utils.py @@ -9,6 +9,27 @@ The path_len wire byte is packed as [hash_mode:2][hop_count:6]: Mode 3 (hash_size=4) is reserved and rejected. """ +from dataclasses import dataclass + +MAX_PATH_SIZE = 64 + + +@dataclass(frozen=True) +class ParsedPacketEnvelope: + """Canonical packet framing parse matching MeshCore Packet::readFrom().""" + + header: int + route_type: int + payload_type: int + payload_version: int + path_byte: int + hop_count: int + hash_size: int + path_byte_len: int + path: bytes + payload: bytes + payload_offset: int + def decode_path_byte(path_byte: int) -> tuple[int, int]: """Decode a packed path byte into (hop_count, hash_size). @@ -32,6 +53,79 @@ def path_wire_len(hop_count: int, hash_size: int) -> int: return hop_count * hash_size +def validate_path_byte(path_byte: int) -> tuple[int, int, int]: + """Validate a packed path byte using firmware-equivalent rules. + + Returns: + (hop_count, hash_size, byte_len) + + Raises: + ValueError: If the encoding uses reserved mode 3 or exceeds MAX_PATH_SIZE. + """ + hop_count, hash_size = decode_path_byte(path_byte) + byte_len = path_wire_len(hop_count, hash_size) + if byte_len > MAX_PATH_SIZE: + raise ValueError( + f"Invalid path length {byte_len} bytes exceeds MAX_PATH_SIZE={MAX_PATH_SIZE}" + ) + return hop_count, hash_size, byte_len + + +def parse_packet_envelope(raw_packet: bytes) -> ParsedPacketEnvelope | None: + """Parse packet framing using firmware Packet::readFrom() semantics. + + Validation matches the firmware's path checks: + - reserved mode 3 is invalid + - hop_count * hash_size must not exceed MAX_PATH_SIZE + - at least one payload byte must remain after the path + """ + if len(raw_packet) < 2: + return None + + try: + header = raw_packet[0] + route_type = header & 0x03 + payload_type = (header >> 2) & 0x0F + payload_version = (header >> 6) & 0x03 + + offset = 1 + if route_type in (0x00, 0x03): + if len(raw_packet) < offset + 4: + return None + offset += 4 + + if len(raw_packet) < offset + 1: + return None + path_byte = raw_packet[offset] + offset += 1 + + hop_count, hash_size, path_byte_len = validate_path_byte(path_byte) + if len(raw_packet) < offset + path_byte_len: + return None + + path = raw_packet[offset : offset + path_byte_len] + offset += path_byte_len + + if offset >= len(raw_packet): + return None + + return ParsedPacketEnvelope( + header=header, + route_type=route_type, + payload_type=payload_type, + payload_version=payload_version, + path_byte=path_byte, + hop_count=hop_count, + hash_size=hash_size, + path_byte_len=path_byte_len, + path=path, + payload=raw_packet[offset:], + payload_offset=offset, + ) + except (IndexError, ValueError): + return None + + def split_path_hex(path_hex: str, hop_count: int) -> list[str]: """Split a hex path string into per-hop chunks using the known hop count. diff --git a/frontend/src/components/CrackerPanel.tsx b/frontend/src/components/CrackerPanel.tsx index a7f6793..0e94a26 100644 --- a/frontend/src/components/CrackerPanel.tsx +++ b/frontend/src/components/CrackerPanel.tsx @@ -5,44 +5,7 @@ import type { RawPacket, Channel } from '../types'; import { api } from '../api'; import { toast } from './ui/sonner'; import { cn } from '@/lib/utils'; - -/** - * Extract the payload from a raw packet hex string, skipping header and path. - * Returns the payload as a hex string, or null if malformed. - */ -function extractPayload(packetHex: string): string | null { - if (packetHex.length < 4) return null; // Need at least 2 bytes - - try { - const header = parseInt(packetHex.slice(0, 2), 16); - const routeType = header & 0x03; - let offset = 2; // 1 byte = 2 hex chars - - // Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3) - if (routeType === 0x00 || routeType === 0x03) { - if (packetHex.length < offset + 8) return null; // Need 4 more bytes - offset += 8; // 4 bytes = 8 hex chars - } - - // Get path byte (packed as [hash_mode:2][hop_count:6]) - if (packetHex.length < offset + 2) return null; - const pathByte = parseInt(packetHex.slice(offset, offset + 2), 16); - offset += 2; - const hashMode = (pathByte >> 6) & 0x03; - const hopCount = pathByte & 0x3f; - const hashSize = hashMode < 3 ? hashMode + 1 : 1; - const pathHexChars = hopCount * hashSize * 2; - - // Skip path data - if (packetHex.length < offset + pathHexChars) return null; - offset += pathHexChars; - - // Rest is payload - return packetHex.slice(offset); - } catch { - return null; - } -} +import { extractPacketPayloadHex } from '../utils/pathUtils'; interface CrackedRoom { roomName: string; @@ -180,7 +143,7 @@ export function CrackerPanel({ for (const packet of undecryptedGroupText) { if (!newQueue.has(packet.id)) { // Extract payload and check for duplicates - const payload = extractPayload(packet.data); + const payload = extractPacketPayloadHex(packet.data); if (payload && seenPayloadsRef.current.has(payload)) { // Skip - we already have a packet with this payload queued newSkipped++; diff --git a/frontend/src/test/pathUtils.test.ts b/frontend/src/test/pathUtils.test.ts index 367fd27..5ff5110 100644 --- a/frontend/src/test/pathUtils.test.ts +++ b/frontend/src/test/pathUtils.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from 'vitest'; import { parsePathHops, + extractPacketPayloadHex, findContactsByPrefix, calculateDistance, resolvePath, @@ -107,6 +108,28 @@ describe('parsePathHops', () => { }); }); +describe('extractPacketPayloadHex', () => { + it('extracts payload from legacy 1-byte-hop packet', () => { + expect(extractPacketPayloadHex('0902AABB48656C6C6F')).toBe('48656C6C6F'); + }); + + it('extracts payload from 2-byte-hop packet', () => { + expect(extractPacketPayloadHex('0942AABBCCDD48656C6C6F')).toBe('48656C6C6F'); + }); + + it('rejects reserved mode 3', () => { + expect(extractPacketPayloadHex('09C1AABBCCDDEEFF')).toBeNull(); + }); + + it('rejects oversized path encoding', () => { + expect(extractPacketPayloadHex(`09BF${'AA'.repeat(189)}4869`)).toBeNull(); + }); + + it('rejects packets with no payload after path', () => { + expect(extractPacketPayloadHex('0902AABB')).toBeNull(); + }); +}); + describe('findContactsByPrefix', () => { const contacts: Contact[] = [ createContact({ diff --git a/frontend/src/utils/pathUtils.ts b/frontend/src/utils/pathUtils.ts index 2fc5ca1..baf49c2 100644 --- a/frontend/src/utils/pathUtils.ts +++ b/frontend/src/utils/pathUtils.ts @@ -1,6 +1,8 @@ import type { Contact, RadioConfig, MessagePath } from '../types'; import { CONTACT_TYPE_REPEATER } from '../types'; +const MAX_PATH_BYTES = 64; + export interface PathHop { prefix: string; // Hex hop identifier (e.g., "1A" for 1-byte, "1A2B" for 2-byte) matches: Contact[]; // Matched repeaters (empty=unknown, multiple=ambiguous) @@ -64,6 +66,61 @@ export function parsePathHops(path: string | null | undefined, hopCount?: number return hops; } +/** + * Extract the payload portion from a raw packet hex string using firmware-equivalent + * path-byte validation. Returns null for malformed or payload-less packets. + */ +export function extractPacketPayloadHex(packetHex: string): string | null { + if (packetHex.length < 4) { + return null; + } + + try { + const normalized = packetHex.toUpperCase(); + const header = parseInt(normalized.slice(0, 2), 16); + const routeType = header & 0x03; + let offset = 2; + + if (routeType === 0x00 || routeType === 0x03) { + if (normalized.length < offset + 8) { + return null; + } + offset += 8; + } + + if (normalized.length < offset + 2) { + return null; + } + const pathByte = parseInt(normalized.slice(offset, offset + 2), 16); + offset += 2; + + const hashMode = (pathByte >> 6) & 0x03; + if (hashMode === 0x03) { + return null; + } + const hopCount = pathByte & 0x3f; + const hashSize = hashMode + 1; + const pathByteLen = hopCount * hashSize; + if (pathByteLen > MAX_PATH_BYTES) { + return null; + } + + const pathHexChars = pathByteLen * 2; + if (normalized.length < offset + pathHexChars) { + return null; + } + offset += pathHexChars; + + if (offset >= normalized.length) { + return null; + } + + return normalized.slice(offset); + } catch { + return null; + } +} + /** * Find contacts matching first 2 chars of public key (repeaters only for intermediate hops) */ diff --git a/tests/test_community_mqtt.py b/tests/test_community_mqtt.py index b75dcdc..130a209 100644 --- a/tests/test_community_mqtt.py +++ b/tests/test_community_mqtt.py @@ -226,7 +226,7 @@ class TestPacketFormatConversion: def test_packet_type_extraction(self): # Header 0x14 = type 5, route 0 (TRANSPORT_FLOOD): header + 4 transport + path_len. - data = {"timestamp": 0, "data": "140102030400", "snr": None, "rssi": None} + data = {"timestamp": 0, "data": "140102030400AA", "snr": None, "rssi": None} result = _format_raw_packet(data, "Node", "AA" * 32) assert result["packet_type"] == "5" assert result["route"] == "F" @@ -235,10 +235,10 @@ class TestPacketFormatConversion: # Test all 4 route types (matches meshcore-packet-capture) # TRANSPORT_FLOOD=0 -> "F", FLOOD=1 -> "F", DIRECT=2 -> "D", TRANSPORT_DIRECT=3 -> "T" samples = [ - ("000102030400", "F"), # TRANSPORT_FLOOD: header + transport + path_len - ("0100", "F"), # FLOOD: header + path_len - ("0200", "D"), # DIRECT: header + path_len - ("030102030400", "T"), # TRANSPORT_DIRECT: header + transport + path_len + ("000102030400AA", "F"), # TRANSPORT_FLOOD: header + transport + path_len + payload + ("0100AA", "F"), # FLOOD: header + path_len + payload + ("0200AA", "D"), # DIRECT: header + path_len + payload + ("030102030400AA", "T"), # TRANSPORT_DIRECT: header + transport + path_len + payload ] for raw_hex, expected in samples: data = {"timestamp": 0, "data": raw_hex, "snr": None, "rssi": None} @@ -274,7 +274,7 @@ class TestPacketFormatConversion: assert result["path"] == "aa,bb" def test_direct_route_includes_empty_path_field(self): - data = {"timestamp": 0, "data": "0200", "snr": 1.0, "rssi": -70} + data = {"timestamp": 0, "data": "0200AA", "snr": 1.0, "rssi": -70} result = _format_raw_packet(data, "Node", "AA" * 32) assert result["route"] == "D" assert "path" in result @@ -432,6 +432,18 @@ class TestCalculatePacketHash: raw = bytes([0x09, 0x42, 0xAA, 0xBB]) assert _calculate_packet_hash(raw) == "0" * 16 + def test_reserved_mode_returns_zeroes(self): + raw = bytes([0x09, 0xC1, 0xAA, 0xBB, 0xCC]) + assert _calculate_packet_hash(raw) == "0" * 16 + + def test_oversize_path_len_returns_zeroes(self): + raw = bytes([0x09, 0xBF]) + bytes(189) + b"payload" + assert _calculate_packet_hash(raw) == "0" * 16 + + def test_no_payload_returns_zeroes(self): + raw = bytes([0x09, 0x02, 0xAA, 0xBB]) + assert _calculate_packet_hash(raw) == "0" * 16 + def test_multibyte_transport_flood_with_2byte_hops(self): """TRANSPORT_FLOOD with 2-byte hops correctly skips transport codes + path.""" import hashlib @@ -527,6 +539,21 @@ class TestDecodePacketFieldsMultibyte: assert path_values == [] assert plen == "0" + def test_reserved_mode_returns_defaults(self): + raw = bytes([0x09, 0xC1, 0xAA, 0xBB, 0xCC]) + route, ptype, plen, path_values, payload_type = _decode_packet_fields(raw) + assert (route, ptype, plen, path_values, payload_type) == ("U", "0", "0", [], None) + + def test_oversize_path_len_returns_defaults(self): + raw = bytes([0x09, 0xBF]) + bytes(189) + b"payload" + route, ptype, plen, path_values, payload_type = _decode_packet_fields(raw) + assert (route, ptype, plen, path_values, payload_type) == ("U", "0", "0", [], None) + + def test_no_payload_returns_defaults(self): + raw = bytes([0x09, 0x02, 0xAA, 0xBB]) + route, ptype, plen, path_values, payload_type = _decode_packet_fields(raw) + assert (route, ptype, plen, path_values, payload_type) == ("U", "0", "0", [], None) + class TestCommunityMqttPublisher: def test_initial_state(self): diff --git a/tests/test_decoder.py b/tests/test_decoder.py index 9a7dea8..11b2879 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -110,6 +110,11 @@ class TestPacketParsing: assert parse_packet(header) is None + def test_parse_packet_with_no_payload_returns_none(self): + """Firmware rejects packets that end exactly after the path.""" + packet = bytes([0x15, 0x02, 0xAA, 0xBB]) + assert parse_packet(packet) is None + class TestMultiBytePathParsing: """Test packet parsing with multi-byte hop path encoding.""" @@ -150,6 +155,11 @@ class TestMultiBytePathParsing: result = parse_packet(packet) assert result is None + def test_parse_oversize_path_len_returns_none(self): + """Oversized-but-well-formed path bytes are invalid per firmware.""" + packet = bytes([0x15, 0xBF]) + bytes(189) + b"payload" + assert parse_packet(packet) is None + def test_parse_two_byte_hops_truncated_returns_none(self): """Truncated path data for multi-byte hops returns None.""" # path_byte = 0x42 → 2 hops × 2 bytes = 4 bytes needed, only 2 provided @@ -184,6 +194,11 @@ class TestMultiBytePathParsing: result = extract_payload(packet) assert result is None + def test_extract_payload_no_payload_returns_none(self): + """extract_payload matches firmware and rejects payload-less packets.""" + packet = bytes([0x15, 0x02, 0xAA, 0xBB]) + assert extract_payload(packet) is None + def test_parse_direct_two_byte_hops_with_transport(self): """TRANSPORT_DIRECT with 2-byte hops parses correctly.""" # Header: TRANSPORT_DIRECT = 0x03, GROUP_TEXT = 5 → (5<<2)|3 = 0x17 diff --git a/tests/test_migrations.py b/tests/test_migrations.py index ac032f7..4a33f31 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -1269,3 +1269,25 @@ class TestMigration040: ] finally: await conn.close() + + +class TestMigrationPacketHelpers: + """Test migration-local packet helpers against canonical path validation.""" + + def test_extract_payload_for_hash_rejects_oversize_path(self): + from app.migrations import _extract_payload_for_hash + + packet = bytes([0x15, 0xBF]) + bytes(189) + b"payload" + assert _extract_payload_for_hash(packet) is None + + def test_extract_payload_for_hash_rejects_no_payload_packet(self): + from app.migrations import _extract_payload_for_hash + + packet = bytes([0x15, 0x02, 0xAA, 0xBB]) + assert _extract_payload_for_hash(packet) is None + + def test_extract_path_from_packet_rejects_reserved_mode(self): + from app.migrations import _extract_path_from_packet + + packet = bytes([0x15, 0xC1, 0xAA, 0xBB, 0xCC]) + assert _extract_path_from_packet(packet) is None diff --git a/tests/test_path_utils.py b/tests/test_path_utils.py index e77550f..d6e846c 100644 --- a/tests/test_path_utils.py +++ b/tests/test_path_utils.py @@ -5,8 +5,10 @@ import pytest from app.path_utils import ( decode_path_byte, first_hop_hex, + parse_packet_envelope, path_wire_len, split_path_hex, + validate_path_byte, ) @@ -81,6 +83,33 @@ class TestPathWireLen: assert path_wire_len(0, 1) == 0 +class TestValidatePathByte: + def test_accepts_valid_multibyte_path_len(self): + hop_count, hash_size, byte_len = validate_path_byte(0x42) + assert (hop_count, hash_size, byte_len) == (2, 2, 4) + + def test_rejects_oversize_path(self): + with pytest.raises(ValueError, match="MAX_PATH_SIZE"): + validate_path_byte(0xBF) + + +class TestParsePacketEnvelope: + def test_parses_valid_packet(self): + envelope = parse_packet_envelope(bytes([0x15, 0x42, 0xAA, 0xBB, 0xCC, 0xDD]) + b"hi") + assert envelope is not None + assert envelope.hop_count == 2 + assert envelope.hash_size == 2 + assert envelope.path == bytes([0xAA, 0xBB, 0xCC, 0xDD]) + assert envelope.payload == b"hi" + + def test_rejects_packet_with_no_payload(self): + assert parse_packet_envelope(bytes([0x15, 0x02, 0xAA, 0xBB])) is None + + def test_rejects_oversize_path_encoding(self): + packet = bytes([0x15, 0xBF]) + bytes(189) + b"x" + assert parse_packet_envelope(packet) is None + + class TestSplitPathHex: def test_one_byte_hops(self): assert split_path_hex("1a2b3c", 3) == ["1a", "2b", "3c"]