mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
Phase 0.5 & 1: Centralize path utils, multi-hop packet decoding, updated PacketInfo shape
This commit is contained in:
@@ -19,6 +19,7 @@ from app.decoder import (
|
||||
decrypt_group_text,
|
||||
derive_public_key,
|
||||
derive_shared_secret,
|
||||
extract_payload,
|
||||
parse_packet,
|
||||
try_decrypt_dm,
|
||||
try_decrypt_packet_with_channel_key,
|
||||
@@ -110,6 +111,110 @@ class TestPacketParsing:
|
||||
assert parse_packet(header) is None
|
||||
|
||||
|
||||
class TestMultiBytePathParsing:
|
||||
"""Test packet parsing with multi-byte hop path encoding."""
|
||||
|
||||
def test_parse_two_byte_hops(self):
|
||||
"""Parse packet with 2-byte hops: path_byte=0x42 → 2 hops × 2 bytes."""
|
||||
# Header: FLOOD GROUP_TEXT = 0x15
|
||||
# Path byte: 0x42 = mode 1 (2-byte), 2 hops → 4 bytes of path
|
||||
path_data = bytes([0xAA, 0xBB, 0xCC, 0xDD])
|
||||
packet = bytes([0x15, 0x42]) + path_data + b"payload"
|
||||
|
||||
result = parse_packet(packet)
|
||||
|
||||
assert result is not None
|
||||
assert result.path_length == 2 # hop count
|
||||
assert result.path_hash_size == 2
|
||||
assert result.path == path_data
|
||||
assert result.payload == b"payload"
|
||||
|
||||
def test_parse_three_byte_hops(self):
|
||||
"""Parse packet with 3-byte hops: path_byte=0x81 → 1 hop × 3 bytes."""
|
||||
path_data = bytes([0x11, 0x22, 0x33])
|
||||
packet = bytes([0x15, 0x81]) + path_data + b"pay"
|
||||
|
||||
result = parse_packet(packet)
|
||||
|
||||
assert result is not None
|
||||
assert result.path_length == 1
|
||||
assert result.path_hash_size == 3
|
||||
assert result.path == path_data
|
||||
assert result.payload == b"pay"
|
||||
|
||||
def test_parse_reserved_mode3_returns_none(self):
|
||||
"""Reserved mode 3 (upper bits = 0b11) should be rejected."""
|
||||
# path_byte = 0xC1 → mode 3, 1 hop
|
||||
packet = bytes([0x15, 0xC1]) + bytes(10)
|
||||
|
||||
result = parse_packet(packet)
|
||||
assert result is None
|
||||
|
||||
def test_parse_two_byte_hops_truncated_returns_none(self):
|
||||
"""Truncated path data for multi-byte hops returns None."""
|
||||
# path_byte = 0x42 → 2 hops × 2 bytes = 4 bytes needed, only 2 provided
|
||||
packet = bytes([0x15, 0x42, 0xAA, 0xBB])
|
||||
|
||||
result = parse_packet(packet)
|
||||
assert result is None
|
||||
|
||||
def test_backward_compat_one_byte_hops(self):
|
||||
"""Old-style 1-byte hop packets still parse correctly with hash_size=1."""
|
||||
packet = bytes([0x0A, 0x03, 0x01, 0x02, 0x03]) + b"msg"
|
||||
|
||||
result = parse_packet(packet)
|
||||
|
||||
assert result is not None
|
||||
assert result.path_length == 3
|
||||
assert result.path_hash_size == 1
|
||||
assert result.path == bytes([0x01, 0x02, 0x03])
|
||||
|
||||
def test_extract_payload_two_byte_hops(self):
|
||||
"""extract_payload correctly skips multi-byte path data."""
|
||||
path_data = bytes([0xAA, 0xBB, 0xCC, 0xDD])
|
||||
packet = bytes([0x15, 0x42]) + path_data + b"the_payload"
|
||||
|
||||
result = extract_payload(packet)
|
||||
assert result == b"the_payload"
|
||||
|
||||
def test_extract_payload_reserved_mode_returns_none(self):
|
||||
"""extract_payload rejects reserved mode 3."""
|
||||
packet = bytes([0x15, 0xC1]) + bytes(10)
|
||||
|
||||
result = extract_payload(packet)
|
||||
assert result is None
|
||||
|
||||
def test_parse_direct_two_byte_hops_with_transport(self):
|
||||
"""TRANSPORT_DIRECT with 2-byte hops parses correctly."""
|
||||
# Header: TRANSPORT_DIRECT = 0x03, GROUP_TEXT = 5 → (5<<2)|3 = 0x17
|
||||
transport_code = bytes([0x11, 0x22, 0x33, 0x44])
|
||||
# path_byte = 0x41 → mode 1, 1 hop → 2 bytes of path
|
||||
path_data = bytes([0xAB, 0xCD])
|
||||
packet = bytes([0x17]) + transport_code + bytes([0x41]) + path_data + b"data"
|
||||
|
||||
result = parse_packet(packet)
|
||||
|
||||
assert result is not None
|
||||
assert result.route_type == RouteType.TRANSPORT_DIRECT
|
||||
assert result.path_length == 1
|
||||
assert result.path_hash_size == 2
|
||||
assert result.path == path_data
|
||||
assert result.payload == b"data"
|
||||
|
||||
def test_zero_hops_mode1(self):
|
||||
"""Zero hops with mode 1 (2-byte) → direct path, no path bytes."""
|
||||
# path_byte = 0x40 → mode 1, 0 hops
|
||||
packet = bytes([0x15, 0x40]) + b"payload"
|
||||
|
||||
result = parse_packet(packet)
|
||||
|
||||
assert result is not None
|
||||
assert result.path_length == 0
|
||||
assert result.path_hash_size == 2
|
||||
assert result.path == b""
|
||||
assert result.payload == b"payload"
|
||||
|
||||
|
||||
class TestGroupTextDecryption:
|
||||
"""Test GROUP_TEXT (channel message) decryption."""
|
||||
|
||||
|
||||
145
tests/test_path_utils.py
Normal file
145
tests/test_path_utils.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Tests for the centralized path encoding/decoding helpers."""
|
||||
|
||||
import pytest
|
||||
|
||||
from app.path_utils import (
|
||||
decode_path_byte,
|
||||
first_hop_hex,
|
||||
infer_hash_size,
|
||||
path_wire_len,
|
||||
split_path_hex,
|
||||
)
|
||||
|
||||
|
||||
class TestDecodePathByte:
|
||||
"""Test decoding the packed [hash_mode:2][hop_count:6] byte."""
|
||||
|
||||
def test_mode0_single_hop(self):
|
||||
"""Mode 0 (1-byte hops), 1 hop → path_byte = 0x01."""
|
||||
hop_count, hash_size = decode_path_byte(0x01)
|
||||
assert hop_count == 1
|
||||
assert hash_size == 1
|
||||
|
||||
def test_mode0_three_hops(self):
|
||||
"""Mode 0, 3 hops → path_byte = 0x03."""
|
||||
hop_count, hash_size = decode_path_byte(0x03)
|
||||
assert hop_count == 3
|
||||
assert hash_size == 1
|
||||
|
||||
def test_mode0_zero_hops(self):
|
||||
"""Mode 0, 0 hops (direct) → path_byte = 0x00."""
|
||||
hop_count, hash_size = decode_path_byte(0x00)
|
||||
assert hop_count == 0
|
||||
assert hash_size == 1
|
||||
|
||||
def test_mode1_two_byte_hops(self):
|
||||
"""Mode 1 (2-byte hops), 2 hops → path_byte = 0x42."""
|
||||
hop_count, hash_size = decode_path_byte(0x42)
|
||||
assert hop_count == 2
|
||||
assert hash_size == 2
|
||||
|
||||
def test_mode1_single_hop(self):
|
||||
"""Mode 1 (2-byte hops), 1 hop → path_byte = 0x41."""
|
||||
hop_count, hash_size = decode_path_byte(0x41)
|
||||
assert hop_count == 1
|
||||
assert hash_size == 2
|
||||
|
||||
def test_mode2_three_byte_hops(self):
|
||||
"""Mode 2 (3-byte hops), 1 hop → path_byte = 0x81."""
|
||||
hop_count, hash_size = decode_path_byte(0x81)
|
||||
assert hop_count == 1
|
||||
assert hash_size == 3
|
||||
|
||||
def test_mode2_max_hops(self):
|
||||
"""Mode 2, 63 hops (maximum) → path_byte = 0xBF."""
|
||||
hop_count, hash_size = decode_path_byte(0xBF)
|
||||
assert hop_count == 63
|
||||
assert hash_size == 3
|
||||
|
||||
def test_mode3_reserved_raises(self):
|
||||
"""Mode 3 is reserved and should raise ValueError."""
|
||||
with pytest.raises(ValueError, match="Reserved path hash mode 3"):
|
||||
decode_path_byte(0xC0)
|
||||
|
||||
def test_mode3_with_hops_raises(self):
|
||||
"""Mode 3 with hop count should also raise."""
|
||||
with pytest.raises(ValueError, match="Reserved"):
|
||||
decode_path_byte(0xC5)
|
||||
|
||||
def test_backward_compat_old_firmware(self):
|
||||
"""Old firmware packets have upper bits = 0, so mode=0 and path_byte = hop count."""
|
||||
for n in range(0, 64):
|
||||
hop_count, hash_size = decode_path_byte(n)
|
||||
assert hop_count == n
|
||||
assert hash_size == 1
|
||||
|
||||
|
||||
class TestPathWireLen:
|
||||
def test_basic(self):
|
||||
assert path_wire_len(3, 1) == 3
|
||||
assert path_wire_len(2, 2) == 4
|
||||
assert path_wire_len(1, 3) == 3
|
||||
assert path_wire_len(0, 1) == 0
|
||||
|
||||
|
||||
class TestSplitPathHex:
|
||||
def test_one_byte_hops(self):
|
||||
assert split_path_hex("1a2b3c", 3) == ["1a", "2b", "3c"]
|
||||
|
||||
def test_two_byte_hops(self):
|
||||
assert split_path_hex("1a2b3c4d", 2) == ["1a2b", "3c4d"]
|
||||
|
||||
def test_three_byte_hops(self):
|
||||
assert split_path_hex("1a2b3c4d5e6f", 2) == ["1a2b3c", "4d5e6f"]
|
||||
|
||||
def test_empty_path(self):
|
||||
assert split_path_hex("", 0) == []
|
||||
assert split_path_hex("", 3) == []
|
||||
|
||||
def test_zero_hop_count(self):
|
||||
assert split_path_hex("1a2b", 0) == []
|
||||
|
||||
def test_inconsistent_length_falls_back(self):
|
||||
"""If hex length doesn't divide evenly by hop_count, fall back to 2-char chunks."""
|
||||
assert split_path_hex("1a2b3c", 2) == ["1a", "2b", "3c"]
|
||||
|
||||
def test_single_hop_one_byte(self):
|
||||
assert split_path_hex("ab", 1) == ["ab"]
|
||||
|
||||
def test_single_hop_two_bytes(self):
|
||||
assert split_path_hex("abcd", 1) == ["abcd"]
|
||||
|
||||
|
||||
class TestFirstHopHex:
|
||||
def test_one_byte_hops(self):
|
||||
assert first_hop_hex("1a2b3c", 3) == "1a"
|
||||
|
||||
def test_two_byte_hops(self):
|
||||
assert first_hop_hex("1a2b3c4d", 2) == "1a2b"
|
||||
|
||||
def test_empty(self):
|
||||
assert first_hop_hex("", 0) is None
|
||||
assert first_hop_hex("", 1) is None
|
||||
|
||||
def test_direct_path(self):
|
||||
assert first_hop_hex("", 0) is None
|
||||
|
||||
|
||||
class TestInferHashSize:
|
||||
def test_one_byte(self):
|
||||
assert infer_hash_size("1a2b3c", 3) == 1
|
||||
|
||||
def test_two_byte(self):
|
||||
assert infer_hash_size("1a2b3c4d", 2) == 2
|
||||
|
||||
def test_three_byte(self):
|
||||
assert infer_hash_size("1a2b3c4d5e6f", 2) == 3
|
||||
|
||||
def test_empty_defaults_to_1(self):
|
||||
assert infer_hash_size("", 0) == 1
|
||||
|
||||
def test_inconsistent_defaults_to_1(self):
|
||||
assert infer_hash_size("1a2b3", 2) == 1
|
||||
|
||||
def test_zero_hop_count_defaults_to_1(self):
|
||||
assert infer_hash_size("1a2b", 0) == 1
|
||||
Reference in New Issue
Block a user