From 1fc041538e0dee04d23add34c11bc3c1906ae1b2 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sat, 7 Mar 2026 18:34:47 -0800 Subject: [PATCH] Phase 0.5 & 1: Centralize path utils, multi-hop packet decoding, updated PacketInfo shape --- app/decoder.py | 36 ++++++---- app/path_utils.py | 70 +++++++++++++++++++ tests/test_decoder.py | 105 ++++++++++++++++++++++++++++ tests/test_path_utils.py | 145 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 342 insertions(+), 14 deletions(-) create mode 100644 app/path_utils.py create mode 100644 tests/test_path_utils.py diff --git a/app/decoder.py b/app/decoder.py index f6ffeea..e363b97 100644 --- a/app/decoder.py +++ b/app/decoder.py @@ -79,9 +79,10 @@ class PacketInfo: route_type: RouteType payload_type: PayloadType payload_version: int - path_length: int - path: bytes # The routing path (empty if path_length is 0) + path_length: int # Decoded hop count (not the raw wire byte) + path: bytes # The routing path bytes (empty if path_length is 0) payload: bytes + path_hash_size: int = 1 # Bytes per hop: 1, 2, or 3 def calculate_channel_hash(channel_key: bytes) -> str: @@ -100,12 +101,14 @@ def extract_payload(raw_packet: bytes) -> bytes | None: Packet structure: - Byte 0: header (route_type, payload_type, version) - For TRANSPORT routes: bytes 1-4 are transport codes - - Next byte: path_length - - Next path_length bytes: path data + - Next byte: path byte (packed as [hash_mode:2][hop_count:6]) + - Next hop_count * hash_size bytes: path data - Remaining: payload Returns the payload bytes, or None if packet is malformed. """ + from app.path_utils import decode_path_byte, path_wire_len + if len(raw_packet) < 2: return None @@ -120,16 +123,17 @@ def extract_payload(raw_packet: bytes) -> bytes | None: return None offset += 4 - # Get path length + # Decode packed path byte if len(raw_packet) < offset + 1: return None - path_length = raw_packet[offset] + hop_count, hash_size = decode_path_byte(raw_packet[offset]) offset += 1 # Skip path data - if len(raw_packet) < offset + path_length: + path_bytes = path_wire_len(hop_count, hash_size) + if len(raw_packet) < offset + path_bytes: return None - offset += path_length + offset += path_bytes # Rest is payload return raw_packet[offset:] @@ -139,6 +143,8 @@ def extract_payload(raw_packet: bytes) -> bytes | None: def parse_packet(raw_packet: bytes) -> PacketInfo | None: """Parse a raw packet and extract basic info.""" + from app.path_utils import decode_path_byte, path_wire_len + if len(raw_packet) < 2: return None @@ -156,17 +162,18 @@ def parse_packet(raw_packet: bytes) -> PacketInfo | None: return None offset += 4 - # Get path length + # Decode packed path byte if len(raw_packet) < offset + 1: return None - path_length = raw_packet[offset] + hop_count, hash_size = decode_path_byte(raw_packet[offset]) offset += 1 # Extract path data - if len(raw_packet) < offset + path_length: + path_byte_len = path_wire_len(hop_count, hash_size) + if len(raw_packet) < offset + path_byte_len: return None - path = raw_packet[offset : offset + path_length] - offset += path_length + path = raw_packet[offset : offset + path_byte_len] + offset += path_byte_len # Rest is payload payload = raw_packet[offset:] @@ -175,7 +182,8 @@ def parse_packet(raw_packet: bytes) -> PacketInfo | None: route_type=route_type, payload_type=payload_type, payload_version=payload_version, - path_length=path_length, + path_length=hop_count, + path_hash_size=hash_size, path=path, payload=payload, ) diff --git a/app/path_utils.py b/app/path_utils.py new file mode 100644 index 0000000..9e8da67 --- /dev/null +++ b/app/path_utils.py @@ -0,0 +1,70 @@ +""" +Centralized helpers for MeshCore multi-byte path encoding. + +The path_len wire byte is packed as [hash_mode:2][hop_count:6]: + - hash_size = (hash_mode) + 1 → 1, 2, or 3 bytes per hop + - hop_count = lower 6 bits → 0–63 hops + - wire bytes = hop_count × hash_size + +Mode 3 (hash_size=4) is reserved and rejected. +""" + + +def decode_path_byte(path_byte: int) -> tuple[int, int]: + """Decode a packed path byte into (hop_count, hash_size). + + Returns: + (hop_count, hash_size) where hash_size is 1, 2, or 3. + + Raises: + ValueError: If hash_mode is 3 (reserved). + """ + hash_mode = (path_byte >> 6) & 0x03 + if hash_mode == 3: + raise ValueError(f"Reserved path hash mode 3 (path_byte=0x{path_byte:02X})") + hop_count = path_byte & 0x3F + hash_size = hash_mode + 1 + return hop_count, hash_size + + +def path_wire_len(hop_count: int, hash_size: int) -> int: + """Wire byte length of path data.""" + return hop_count * hash_size + + +def split_path_hex(path_hex: str, hop_count: int) -> list[str]: + """Split a hex path string into per-hop chunks using the known hop count. + + If hop_count is 0 or the hex length doesn't divide evenly, falls back + to 2-char (1-byte) chunks for backward compatibility. + """ + if not path_hex or hop_count <= 0: + return [] + chars_per_hop = len(path_hex) // hop_count + if chars_per_hop < 2 or chars_per_hop % 2 != 0 or chars_per_hop * hop_count != len(path_hex): + # Inconsistent — fall back to legacy 2-char split + return [path_hex[i : i + 2] for i in range(0, len(path_hex), 2)] + return [path_hex[i : i + chars_per_hop] for i in range(0, len(path_hex), chars_per_hop)] + + +def first_hop_hex(path_hex: str, hop_count: int) -> str | None: + """Extract the first hop identifier from a path hex string. + + Returns None for empty/direct paths. + """ + hops = split_path_hex(path_hex, hop_count) + return hops[0] if hops else None + + +def infer_hash_size(path_hex: str, hop_count: int) -> int: + """Derive bytes-per-hop from path hex length and hop count. + + Returns 1 as default for ambiguous or legacy cases. + """ + if hop_count <= 0 or not path_hex: + return 1 + hex_per_hop = len(path_hex) // hop_count + byte_per_hop = hex_per_hop // 2 + if byte_per_hop in (1, 2, 3) and hex_per_hop * hop_count == len(path_hex): + return byte_per_hop + return 1 diff --git a/tests/test_decoder.py b/tests/test_decoder.py index 7544db8..9a7dea8 100644 --- a/tests/test_decoder.py +++ b/tests/test_decoder.py @@ -19,6 +19,7 @@ from app.decoder import ( decrypt_group_text, derive_public_key, derive_shared_secret, + extract_payload, parse_packet, try_decrypt_dm, try_decrypt_packet_with_channel_key, @@ -110,6 +111,110 @@ class TestPacketParsing: assert parse_packet(header) is None +class TestMultiBytePathParsing: + """Test packet parsing with multi-byte hop path encoding.""" + + def test_parse_two_byte_hops(self): + """Parse packet with 2-byte hops: path_byte=0x42 → 2 hops × 2 bytes.""" + # Header: FLOOD GROUP_TEXT = 0x15 + # Path byte: 0x42 = mode 1 (2-byte), 2 hops → 4 bytes of path + path_data = bytes([0xAA, 0xBB, 0xCC, 0xDD]) + packet = bytes([0x15, 0x42]) + path_data + b"payload" + + result = parse_packet(packet) + + assert result is not None + assert result.path_length == 2 # hop count + assert result.path_hash_size == 2 + assert result.path == path_data + assert result.payload == b"payload" + + def test_parse_three_byte_hops(self): + """Parse packet with 3-byte hops: path_byte=0x81 → 1 hop × 3 bytes.""" + path_data = bytes([0x11, 0x22, 0x33]) + packet = bytes([0x15, 0x81]) + path_data + b"pay" + + result = parse_packet(packet) + + assert result is not None + assert result.path_length == 1 + assert result.path_hash_size == 3 + assert result.path == path_data + assert result.payload == b"pay" + + def test_parse_reserved_mode3_returns_none(self): + """Reserved mode 3 (upper bits = 0b11) should be rejected.""" + # path_byte = 0xC1 → mode 3, 1 hop + packet = bytes([0x15, 0xC1]) + bytes(10) + + result = parse_packet(packet) + assert result is None + + def test_parse_two_byte_hops_truncated_returns_none(self): + """Truncated path data for multi-byte hops returns None.""" + # path_byte = 0x42 → 2 hops × 2 bytes = 4 bytes needed, only 2 provided + packet = bytes([0x15, 0x42, 0xAA, 0xBB]) + + result = parse_packet(packet) + assert result is None + + def test_backward_compat_one_byte_hops(self): + """Old-style 1-byte hop packets still parse correctly with hash_size=1.""" + packet = bytes([0x0A, 0x03, 0x01, 0x02, 0x03]) + b"msg" + + result = parse_packet(packet) + + assert result is not None + assert result.path_length == 3 + assert result.path_hash_size == 1 + assert result.path == bytes([0x01, 0x02, 0x03]) + + def test_extract_payload_two_byte_hops(self): + """extract_payload correctly skips multi-byte path data.""" + path_data = bytes([0xAA, 0xBB, 0xCC, 0xDD]) + packet = bytes([0x15, 0x42]) + path_data + b"the_payload" + + result = extract_payload(packet) + assert result == b"the_payload" + + def test_extract_payload_reserved_mode_returns_none(self): + """extract_payload rejects reserved mode 3.""" + packet = bytes([0x15, 0xC1]) + bytes(10) + + result = extract_payload(packet) + assert result is None + + def test_parse_direct_two_byte_hops_with_transport(self): + """TRANSPORT_DIRECT with 2-byte hops parses correctly.""" + # Header: TRANSPORT_DIRECT = 0x03, GROUP_TEXT = 5 → (5<<2)|3 = 0x17 + transport_code = bytes([0x11, 0x22, 0x33, 0x44]) + # path_byte = 0x41 → mode 1, 1 hop → 2 bytes of path + path_data = bytes([0xAB, 0xCD]) + packet = bytes([0x17]) + transport_code + bytes([0x41]) + path_data + b"data" + + result = parse_packet(packet) + + assert result is not None + assert result.route_type == RouteType.TRANSPORT_DIRECT + assert result.path_length == 1 + assert result.path_hash_size == 2 + assert result.path == path_data + assert result.payload == b"data" + + def test_zero_hops_mode1(self): + """Zero hops with mode 1 (2-byte) → direct path, no path bytes.""" + # path_byte = 0x40 → mode 1, 0 hops + packet = bytes([0x15, 0x40]) + b"payload" + + result = parse_packet(packet) + + assert result is not None + assert result.path_length == 0 + assert result.path_hash_size == 2 + assert result.path == b"" + assert result.payload == b"payload" + + class TestGroupTextDecryption: """Test GROUP_TEXT (channel message) decryption.""" diff --git a/tests/test_path_utils.py b/tests/test_path_utils.py new file mode 100644 index 0000000..93a0e68 --- /dev/null +++ b/tests/test_path_utils.py @@ -0,0 +1,145 @@ +"""Tests for the centralized path encoding/decoding helpers.""" + +import pytest + +from app.path_utils import ( + decode_path_byte, + first_hop_hex, + infer_hash_size, + path_wire_len, + split_path_hex, +) + + +class TestDecodePathByte: + """Test decoding the packed [hash_mode:2][hop_count:6] byte.""" + + def test_mode0_single_hop(self): + """Mode 0 (1-byte hops), 1 hop → path_byte = 0x01.""" + hop_count, hash_size = decode_path_byte(0x01) + assert hop_count == 1 + assert hash_size == 1 + + def test_mode0_three_hops(self): + """Mode 0, 3 hops → path_byte = 0x03.""" + hop_count, hash_size = decode_path_byte(0x03) + assert hop_count == 3 + assert hash_size == 1 + + def test_mode0_zero_hops(self): + """Mode 0, 0 hops (direct) → path_byte = 0x00.""" + hop_count, hash_size = decode_path_byte(0x00) + assert hop_count == 0 + assert hash_size == 1 + + def test_mode1_two_byte_hops(self): + """Mode 1 (2-byte hops), 2 hops → path_byte = 0x42.""" + hop_count, hash_size = decode_path_byte(0x42) + assert hop_count == 2 + assert hash_size == 2 + + def test_mode1_single_hop(self): + """Mode 1 (2-byte hops), 1 hop → path_byte = 0x41.""" + hop_count, hash_size = decode_path_byte(0x41) + assert hop_count == 1 + assert hash_size == 2 + + def test_mode2_three_byte_hops(self): + """Mode 2 (3-byte hops), 1 hop → path_byte = 0x81.""" + hop_count, hash_size = decode_path_byte(0x81) + assert hop_count == 1 + assert hash_size == 3 + + def test_mode2_max_hops(self): + """Mode 2, 63 hops (maximum) → path_byte = 0xBF.""" + hop_count, hash_size = decode_path_byte(0xBF) + assert hop_count == 63 + assert hash_size == 3 + + def test_mode3_reserved_raises(self): + """Mode 3 is reserved and should raise ValueError.""" + with pytest.raises(ValueError, match="Reserved path hash mode 3"): + decode_path_byte(0xC0) + + def test_mode3_with_hops_raises(self): + """Mode 3 with hop count should also raise.""" + with pytest.raises(ValueError, match="Reserved"): + decode_path_byte(0xC5) + + def test_backward_compat_old_firmware(self): + """Old firmware packets have upper bits = 0, so mode=0 and path_byte = hop count.""" + for n in range(0, 64): + hop_count, hash_size = decode_path_byte(n) + assert hop_count == n + assert hash_size == 1 + + +class TestPathWireLen: + def test_basic(self): + assert path_wire_len(3, 1) == 3 + assert path_wire_len(2, 2) == 4 + assert path_wire_len(1, 3) == 3 + assert path_wire_len(0, 1) == 0 + + +class TestSplitPathHex: + def test_one_byte_hops(self): + assert split_path_hex("1a2b3c", 3) == ["1a", "2b", "3c"] + + def test_two_byte_hops(self): + assert split_path_hex("1a2b3c4d", 2) == ["1a2b", "3c4d"] + + def test_three_byte_hops(self): + assert split_path_hex("1a2b3c4d5e6f", 2) == ["1a2b3c", "4d5e6f"] + + def test_empty_path(self): + assert split_path_hex("", 0) == [] + assert split_path_hex("", 3) == [] + + def test_zero_hop_count(self): + assert split_path_hex("1a2b", 0) == [] + + def test_inconsistent_length_falls_back(self): + """If hex length doesn't divide evenly by hop_count, fall back to 2-char chunks.""" + assert split_path_hex("1a2b3c", 2) == ["1a", "2b", "3c"] + + def test_single_hop_one_byte(self): + assert split_path_hex("ab", 1) == ["ab"] + + def test_single_hop_two_bytes(self): + assert split_path_hex("abcd", 1) == ["abcd"] + + +class TestFirstHopHex: + def test_one_byte_hops(self): + assert first_hop_hex("1a2b3c", 3) == "1a" + + def test_two_byte_hops(self): + assert first_hop_hex("1a2b3c4d", 2) == "1a2b" + + def test_empty(self): + assert first_hop_hex("", 0) is None + assert first_hop_hex("", 1) is None + + def test_direct_path(self): + assert first_hop_hex("", 0) is None + + +class TestInferHashSize: + def test_one_byte(self): + assert infer_hash_size("1a2b3c", 3) == 1 + + def test_two_byte(self): + assert infer_hash_size("1a2b3c4d", 2) == 2 + + def test_three_byte(self): + assert infer_hash_size("1a2b3c4d5e6f", 2) == 3 + + def test_empty_defaults_to_1(self): + assert infer_hash_size("", 0) == 1 + + def test_inconsistent_defaults_to_1(self): + assert infer_hash_size("1a2b3", 2) == 1 + + def test_zero_hop_count_defaults_to_1(self): + assert infer_hash_size("1a2b", 0) == 1