Use more faithful packet frame parsing

This commit is contained in:
Jack Kingsman
2026-03-07 22:35:53 -08:00
parent 48dab293ae
commit 34318e4814
11 changed files with 310 additions and 245 deletions
+14 -71
View File
@@ -107,87 +107,30 @@ def extract_payload(raw_packet: bytes) -> bytes | None:
Returns the payload bytes, or None if packet is malformed.
"""
from app.path_utils import decode_path_byte, path_wire_len
from app.path_utils import parse_packet_envelope
if len(raw_packet) < 2:
return None
try:
header = raw_packet[0]
route_type = header & 0x03
offset = 1
# Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
if route_type in (0x00, 0x03):
if len(raw_packet) < offset + 4:
return None
offset += 4
# Decode packed path byte
if len(raw_packet) < offset + 1:
return None
hop_count, hash_size = decode_path_byte(raw_packet[offset])
offset += 1
# Skip path data
path_bytes = path_wire_len(hop_count, hash_size)
if len(raw_packet) < offset + path_bytes:
return None
offset += path_bytes
# Rest is payload
return raw_packet[offset:]
except (ValueError, IndexError):
return None
envelope = parse_packet_envelope(raw_packet)
return envelope.payload if envelope is not None else None
def parse_packet(raw_packet: bytes) -> PacketInfo | None:
"""Parse a raw packet and extract basic info."""
from app.path_utils import decode_path_byte, path_wire_len
from app.path_utils import parse_packet_envelope
if len(raw_packet) < 2:
envelope = parse_packet_envelope(raw_packet)
if envelope is None:
return None
try:
header = raw_packet[0]
route_type = RouteType(header & 0x03)
payload_type = PayloadType((header >> 2) & 0x0F)
payload_version = (header >> 6) & 0x03
offset = 1
# Skip transport codes if present
if route_type in (RouteType.TRANSPORT_FLOOD, RouteType.TRANSPORT_DIRECT):
if len(raw_packet) < offset + 4:
return None
offset += 4
# Decode packed path byte
if len(raw_packet) < offset + 1:
return None
hop_count, hash_size = decode_path_byte(raw_packet[offset])
offset += 1
# Extract path data
path_byte_len = path_wire_len(hop_count, hash_size)
if len(raw_packet) < offset + path_byte_len:
return None
path = raw_packet[offset : offset + path_byte_len]
offset += path_byte_len
# Rest is payload
payload = raw_packet[offset:]
return PacketInfo(
route_type=route_type,
payload_type=payload_type,
payload_version=payload_version,
path_length=hop_count,
path_hash_size=hash_size,
path=path,
payload=payload,
route_type=RouteType(envelope.route_type),
payload_type=PayloadType(envelope.payload_type),
payload_version=envelope.payload_version,
path_length=envelope.hop_count,
path_hash_size=envelope.hash_size,
path=envelope.path,
payload=envelope.payload,
)
except (ValueError, IndexError):
except ValueError:
return None
+13 -64
View File
@@ -24,7 +24,7 @@ import aiomqtt
import nacl.bindings
from app.fanout.mqtt_base import BaseMqttPublisher
from app.path_utils import split_path_hex
from app.path_utils import parse_packet_envelope, split_path_hex
logger = logging.getLogger(__name__)
@@ -143,41 +143,17 @@ def _calculate_packet_hash(raw_bytes: bytes) -> str:
return "0" * 16
try:
header = raw_bytes[0]
payload_type = (header >> 2) & 0x0F
route_type = header & 0x03
# Transport codes present for TRANSPORT_FLOOD (0) and TRANSPORT_DIRECT (3)
has_transport = route_type in (0x00, 0x03)
offset = 1 # Past header
if has_transport:
offset += 4 # Skip 4 bytes of transport codes
# Read path byte (packed as [hash_mode:2][hop_count:6]).
# Invalid/truncated packets map to zero hash.
if offset >= len(raw_bytes):
envelope = parse_packet_envelope(raw_bytes)
if envelope is None:
return "0" * 16
path_byte = raw_bytes[offset]
offset += 1
hash_mode = (path_byte >> 6) & 0x03
hop_count = path_byte & 0x3F
hash_size = (hash_mode + 1) if hash_mode < 3 else 1
path_wire_len = hop_count * hash_size
# Skip past path to get to payload. Invalid/truncated packets map to zero hash.
if len(raw_bytes) < offset + path_wire_len:
return "0" * 16
payload_start = offset + path_wire_len
payload_data = raw_bytes[payload_start:]
# Hash: payload_type(1 byte) [+ path_byte as uint16_t LE for TRACE] + payload_data
# IMPORTANT: TRACE hash uses the raw wire byte (not decoded hop count) to match firmware.
hash_obj = hashlib.sha256()
hash_obj.update(bytes([payload_type]))
if payload_type == 9: # PAYLOAD_TYPE_TRACE
hash_obj.update(path_byte.to_bytes(2, byteorder="little"))
hash_obj.update(payload_data)
hash_obj.update(bytes([envelope.payload_type]))
if envelope.payload_type == 9: # PAYLOAD_TYPE_TRACE
hash_obj.update(envelope.path_byte.to_bytes(2, byteorder="little"))
hash_obj.update(envelope.payload)
return hash_obj.hexdigest()[:16].upper()
except Exception:
@@ -198,42 +174,15 @@ def _decode_packet_fields(raw_bytes: bytes) -> tuple[str, str, str, list[str], i
payload_type: int | None = None
try:
if len(raw_bytes) < 2:
envelope = parse_packet_envelope(raw_bytes)
if envelope is None or envelope.payload_version != 0:
return route, packet_type, payload_len, path_values, payload_type
header = raw_bytes[0]
payload_version = (header >> 6) & 0x03
if payload_version != 0:
return route, packet_type, payload_len, path_values, payload_type
route_type = header & 0x03
has_transport = route_type in (0x00, 0x03)
offset = 1
if has_transport:
offset += 4
if len(raw_bytes) <= offset:
return route, packet_type, payload_len, path_values, payload_type
path_byte = raw_bytes[offset]
offset += 1
hash_mode = (path_byte >> 6) & 0x03
hop_count = path_byte & 0x3F
hash_size = (hash_mode + 1) if hash_mode < 3 else 1
path_wire_len = hop_count * hash_size
if len(raw_bytes) < offset + path_wire_len:
return route, packet_type, payload_len, path_values, payload_type
path_bytes = raw_bytes[offset : offset + path_wire_len]
offset += path_wire_len
payload_type = (header >> 2) & 0x0F
route = _ROUTE_MAP.get(route_type, "U")
payload_type = envelope.payload_type
route = _ROUTE_MAP.get(envelope.route_type, "U")
packet_type = str(payload_type)
payload_len = str(max(0, len(raw_bytes) - offset))
path_values = split_path_hex(path_bytes.hex(), hop_count)
payload_len = str(len(envelope.payload))
path_values = split_path_hex(envelope.path.hex(), envelope.hop_count)
return route, packet_type, payload_len, path_values, payload_type
except Exception:
+8 -65
View File
@@ -452,43 +452,14 @@ async def _migrate_004_add_payload_hash_column(conn: aiosqlite.Connection) -> No
def _extract_payload_for_hash(raw_packet: bytes) -> bytes | None:
"""
Extract payload from a raw packet for hashing (migration-local copy of decoder logic).
Extract payload from a raw packet for hashing using canonical framing validation.
Returns the payload bytes, or None if packet is malformed.
"""
if len(raw_packet) < 2:
return None
from app.path_utils import parse_packet_envelope
try:
header = raw_packet[0]
route_type = header & 0x03
offset = 1
# Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
if route_type in (0x00, 0x03):
if len(raw_packet) < offset + 4:
return None
offset += 4
# Get path byte (packed as [hash_mode:2][hop_count:6])
if len(raw_packet) < offset + 1:
return None
path_byte = raw_packet[offset]
offset += 1
hash_mode = (path_byte >> 6) & 0x03
hop_count = path_byte & 0x3F
hash_size = (hash_mode + 1) if hash_mode < 3 else 1
path_wire_len = hop_count * hash_size
# Skip path bytes
if len(raw_packet) < offset + path_wire_len:
return None
offset += path_wire_len
# Rest is payload (may be empty, matching decoder.py behavior)
return raw_packet[offset:]
except (IndexError, ValueError):
return None
envelope = parse_packet_envelope(raw_packet)
return envelope.payload if envelope is not None else None
async def _migrate_005_backfill_payload_hashes(conn: aiosqlite.Connection) -> None:
@@ -638,42 +609,14 @@ async def _migrate_006_replace_path_len_with_path(conn: aiosqlite.Connection) ->
def _extract_path_from_packet(raw_packet: bytes) -> str | None:
"""
Extract path hex string from a raw packet (migration-local copy of decoder logic).
Extract path hex string from a raw packet using canonical framing validation.
Returns the path as a hex string, or None if packet is malformed.
"""
if len(raw_packet) < 2:
return None
from app.path_utils import parse_packet_envelope
try:
header = raw_packet[0]
route_type = header & 0x03
offset = 1
# Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
if route_type in (0x00, 0x03):
if len(raw_packet) < offset + 4:
return None
offset += 4
# Get path byte (packed as [hash_mode:2][hop_count:6])
if len(raw_packet) < offset + 1:
return None
path_byte = raw_packet[offset]
offset += 1
hash_mode = (path_byte >> 6) & 0x03
hop_count = path_byte & 0x3F
hash_size = (hash_mode + 1) if hash_mode < 3 else 1
path_wire_len = hop_count * hash_size
# Extract path bytes
if len(raw_packet) < offset + path_wire_len:
return None
path_bytes = raw_packet[offset : offset + path_wire_len]
return path_bytes.hex()
except (IndexError, ValueError):
return None
envelope = parse_packet_envelope(raw_packet)
return envelope.path.hex() if envelope is not None else None
async def _migrate_007_backfill_message_paths(conn: aiosqlite.Connection) -> None:
+94
View File
@@ -9,6 +9,27 @@ The path_len wire byte is packed as [hash_mode:2][hop_count:6]:
Mode 3 (hash_size=4) is reserved and rejected.
"""
from dataclasses import dataclass
MAX_PATH_SIZE = 64
@dataclass(frozen=True)
class ParsedPacketEnvelope:
"""Canonical packet framing parse matching MeshCore Packet::readFrom()."""
header: int
route_type: int
payload_type: int
payload_version: int
path_byte: int
hop_count: int
hash_size: int
path_byte_len: int
path: bytes
payload: bytes
payload_offset: int
def decode_path_byte(path_byte: int) -> tuple[int, int]:
"""Decode a packed path byte into (hop_count, hash_size).
@@ -32,6 +53,79 @@ def path_wire_len(hop_count: int, hash_size: int) -> int:
return hop_count * hash_size
def validate_path_byte(path_byte: int) -> tuple[int, int, int]:
"""Validate a packed path byte using firmware-equivalent rules.
Returns:
(hop_count, hash_size, byte_len)
Raises:
ValueError: If the encoding uses reserved mode 3 or exceeds MAX_PATH_SIZE.
"""
hop_count, hash_size = decode_path_byte(path_byte)
byte_len = path_wire_len(hop_count, hash_size)
if byte_len > MAX_PATH_SIZE:
raise ValueError(
f"Invalid path length {byte_len} bytes exceeds MAX_PATH_SIZE={MAX_PATH_SIZE}"
)
return hop_count, hash_size, byte_len
def parse_packet_envelope(raw_packet: bytes) -> ParsedPacketEnvelope | None:
"""Parse packet framing using firmware Packet::readFrom() semantics.
Validation matches the firmware's path checks:
- reserved mode 3 is invalid
- hop_count * hash_size must not exceed MAX_PATH_SIZE
- at least one payload byte must remain after the path
"""
if len(raw_packet) < 2:
return None
try:
header = raw_packet[0]
route_type = header & 0x03
payload_type = (header >> 2) & 0x0F
payload_version = (header >> 6) & 0x03
offset = 1
if route_type in (0x00, 0x03):
if len(raw_packet) < offset + 4:
return None
offset += 4
if len(raw_packet) < offset + 1:
return None
path_byte = raw_packet[offset]
offset += 1
hop_count, hash_size, path_byte_len = validate_path_byte(path_byte)
if len(raw_packet) < offset + path_byte_len:
return None
path = raw_packet[offset : offset + path_byte_len]
offset += path_byte_len
if offset >= len(raw_packet):
return None
return ParsedPacketEnvelope(
header=header,
route_type=route_type,
payload_type=payload_type,
payload_version=payload_version,
path_byte=path_byte,
hop_count=hop_count,
hash_size=hash_size,
path_byte_len=path_byte_len,
path=path,
payload=raw_packet[offset:],
payload_offset=offset,
)
except (IndexError, ValueError):
return None
def split_path_hex(path_hex: str, hop_count: int) -> list[str]:
"""Split a hex path string into per-hop chunks using the known hop count.
+2 -39
View File
@@ -5,44 +5,7 @@ import type { RawPacket, Channel } from '../types';
import { api } from '../api';
import { toast } from './ui/sonner';
import { cn } from '@/lib/utils';
/**
* Extract the payload from a raw packet hex string, skipping header and path.
* Returns the payload as a hex string, or null if malformed.
*/
function extractPayload(packetHex: string): string | null {
if (packetHex.length < 4) return null; // Need at least 2 bytes
try {
const header = parseInt(packetHex.slice(0, 2), 16);
const routeType = header & 0x03;
let offset = 2; // 1 byte = 2 hex chars
// Skip transport codes if present (TRANSPORT_FLOOD=0, TRANSPORT_DIRECT=3)
if (routeType === 0x00 || routeType === 0x03) {
if (packetHex.length < offset + 8) return null; // Need 4 more bytes
offset += 8; // 4 bytes = 8 hex chars
}
// Get path byte (packed as [hash_mode:2][hop_count:6])
if (packetHex.length < offset + 2) return null;
const pathByte = parseInt(packetHex.slice(offset, offset + 2), 16);
offset += 2;
const hashMode = (pathByte >> 6) & 0x03;
const hopCount = pathByte & 0x3f;
const hashSize = hashMode < 3 ? hashMode + 1 : 1;
const pathHexChars = hopCount * hashSize * 2;
// Skip path data
if (packetHex.length < offset + pathHexChars) return null;
offset += pathHexChars;
// Rest is payload
return packetHex.slice(offset);
} catch {
return null;
}
}
import { extractPacketPayloadHex } from '../utils/pathUtils';
interface CrackedRoom {
roomName: string;
@@ -180,7 +143,7 @@ export function CrackerPanel({
for (const packet of undecryptedGroupText) {
if (!newQueue.has(packet.id)) {
// Extract payload and check for duplicates
const payload = extractPayload(packet.data);
const payload = extractPacketPayloadHex(packet.data);
if (payload && seenPayloadsRef.current.has(payload)) {
// Skip - we already have a packet with this payload queued
newSkipped++;
+23
View File
@@ -1,6 +1,7 @@
import { describe, it, expect } from 'vitest';
import {
parsePathHops,
extractPacketPayloadHex,
findContactsByPrefix,
calculateDistance,
resolvePath,
@@ -107,6 +108,28 @@ describe('parsePathHops', () => {
});
});
describe('extractPacketPayloadHex', () => {
it('extracts payload from legacy 1-byte-hop packet', () => {
expect(extractPacketPayloadHex('0902AABB48656C6C6F')).toBe('48656C6C6F');
});
it('extracts payload from 2-byte-hop packet', () => {
expect(extractPacketPayloadHex('0942AABBCCDD48656C6C6F')).toBe('48656C6C6F');
});
it('rejects reserved mode 3', () => {
expect(extractPacketPayloadHex('09C1AABBCCDDEEFF')).toBeNull();
});
it('rejects oversized path encoding', () => {
expect(extractPacketPayloadHex(`09BF${'AA'.repeat(189)}4869`)).toBeNull();
});
it('rejects packets with no payload after path', () => {
expect(extractPacketPayloadHex('0902AABB')).toBeNull();
});
});
describe('findContactsByPrefix', () => {
const contacts: Contact[] = [
createContact({
+57
View File
@@ -1,6 +1,8 @@
import type { Contact, RadioConfig, MessagePath } from '../types';
import { CONTACT_TYPE_REPEATER } from '../types';
const MAX_PATH_BYTES = 64;
export interface PathHop {
prefix: string; // Hex hop identifier (e.g., "1A" for 1-byte, "1A2B" for 2-byte)
matches: Contact[]; // Matched repeaters (empty=unknown, multiple=ambiguous)
@@ -64,6 +66,61 @@ export function parsePathHops(path: string | null | undefined, hopCount?: number
return hops;
}
/**
* Extract the payload portion from a raw packet hex string using firmware-equivalent
* path-byte validation. Returns null for malformed or payload-less packets.
*/
export function extractPacketPayloadHex(packetHex: string): string | null {
if (packetHex.length < 4) {
return null;
}
try {
const normalized = packetHex.toUpperCase();
const header = parseInt(normalized.slice(0, 2), 16);
const routeType = header & 0x03;
let offset = 2;
if (routeType === 0x00 || routeType === 0x03) {
if (normalized.length < offset + 8) {
return null;
}
offset += 8;
}
if (normalized.length < offset + 2) {
return null;
}
const pathByte = parseInt(normalized.slice(offset, offset + 2), 16);
offset += 2;
const hashMode = (pathByte >> 6) & 0x03;
if (hashMode === 0x03) {
return null;
}
const hopCount = pathByte & 0x3f;
const hashSize = hashMode + 1;
const pathByteLen = hopCount * hashSize;
if (pathByteLen > MAX_PATH_BYTES) {
return null;
}
const pathHexChars = pathByteLen * 2;
if (normalized.length < offset + pathHexChars) {
return null;
}
offset += pathHexChars;
if (offset >= normalized.length) {
return null;
}
return normalized.slice(offset);
} catch {
return null;
}
}
/**
* Find contacts matching first 2 chars of public key (repeaters only for intermediate hops)
*/
+33 -6
View File
@@ -226,7 +226,7 @@ class TestPacketFormatConversion:
def test_packet_type_extraction(self):
# Header 0x14 = type 5, route 0 (TRANSPORT_FLOOD): header + 4 transport + path_len.
data = {"timestamp": 0, "data": "140102030400", "snr": None, "rssi": None}
data = {"timestamp": 0, "data": "140102030400AA", "snr": None, "rssi": None}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["packet_type"] == "5"
assert result["route"] == "F"
@@ -235,10 +235,10 @@ class TestPacketFormatConversion:
# Test all 4 route types (matches meshcore-packet-capture)
# TRANSPORT_FLOOD=0 -> "F", FLOOD=1 -> "F", DIRECT=2 -> "D", TRANSPORT_DIRECT=3 -> "T"
samples = [
("000102030400", "F"), # TRANSPORT_FLOOD: header + transport + path_len
("0100", "F"), # FLOOD: header + path_len
("0200", "D"), # DIRECT: header + path_len
("030102030400", "T"), # TRANSPORT_DIRECT: header + transport + path_len
("000102030400AA", "F"), # TRANSPORT_FLOOD: header + transport + path_len + payload
("0100AA", "F"), # FLOOD: header + path_len + payload
("0200AA", "D"), # DIRECT: header + path_len + payload
("030102030400AA", "T"), # TRANSPORT_DIRECT: header + transport + path_len + payload
]
for raw_hex, expected in samples:
data = {"timestamp": 0, "data": raw_hex, "snr": None, "rssi": None}
@@ -274,7 +274,7 @@ class TestPacketFormatConversion:
assert result["path"] == "aa,bb"
def test_direct_route_includes_empty_path_field(self):
data = {"timestamp": 0, "data": "0200", "snr": 1.0, "rssi": -70}
data = {"timestamp": 0, "data": "0200AA", "snr": 1.0, "rssi": -70}
result = _format_raw_packet(data, "Node", "AA" * 32)
assert result["route"] == "D"
assert "path" in result
@@ -432,6 +432,18 @@ class TestCalculatePacketHash:
raw = bytes([0x09, 0x42, 0xAA, 0xBB])
assert _calculate_packet_hash(raw) == "0" * 16
def test_reserved_mode_returns_zeroes(self):
raw = bytes([0x09, 0xC1, 0xAA, 0xBB, 0xCC])
assert _calculate_packet_hash(raw) == "0" * 16
def test_oversize_path_len_returns_zeroes(self):
raw = bytes([0x09, 0xBF]) + bytes(189) + b"payload"
assert _calculate_packet_hash(raw) == "0" * 16
def test_no_payload_returns_zeroes(self):
raw = bytes([0x09, 0x02, 0xAA, 0xBB])
assert _calculate_packet_hash(raw) == "0" * 16
def test_multibyte_transport_flood_with_2byte_hops(self):
"""TRANSPORT_FLOOD with 2-byte hops correctly skips transport codes + path."""
import hashlib
@@ -527,6 +539,21 @@ class TestDecodePacketFieldsMultibyte:
assert path_values == []
assert plen == "0"
def test_reserved_mode_returns_defaults(self):
raw = bytes([0x09, 0xC1, 0xAA, 0xBB, 0xCC])
route, ptype, plen, path_values, payload_type = _decode_packet_fields(raw)
assert (route, ptype, plen, path_values, payload_type) == ("U", "0", "0", [], None)
def test_oversize_path_len_returns_defaults(self):
raw = bytes([0x09, 0xBF]) + bytes(189) + b"payload"
route, ptype, plen, path_values, payload_type = _decode_packet_fields(raw)
assert (route, ptype, plen, path_values, payload_type) == ("U", "0", "0", [], None)
def test_no_payload_returns_defaults(self):
raw = bytes([0x09, 0x02, 0xAA, 0xBB])
route, ptype, plen, path_values, payload_type = _decode_packet_fields(raw)
assert (route, ptype, plen, path_values, payload_type) == ("U", "0", "0", [], None)
class TestCommunityMqttPublisher:
def test_initial_state(self):
+15
View File
@@ -110,6 +110,11 @@ class TestPacketParsing:
assert parse_packet(header) is None
def test_parse_packet_with_no_payload_returns_none(self):
"""Firmware rejects packets that end exactly after the path."""
packet = bytes([0x15, 0x02, 0xAA, 0xBB])
assert parse_packet(packet) is None
class TestMultiBytePathParsing:
"""Test packet parsing with multi-byte hop path encoding."""
@@ -150,6 +155,11 @@ class TestMultiBytePathParsing:
result = parse_packet(packet)
assert result is None
def test_parse_oversize_path_len_returns_none(self):
"""Oversized-but-well-formed path bytes are invalid per firmware."""
packet = bytes([0x15, 0xBF]) + bytes(189) + b"payload"
assert parse_packet(packet) is None
def test_parse_two_byte_hops_truncated_returns_none(self):
"""Truncated path data for multi-byte hops returns None."""
# path_byte = 0x42 → 2 hops × 2 bytes = 4 bytes needed, only 2 provided
@@ -184,6 +194,11 @@ class TestMultiBytePathParsing:
result = extract_payload(packet)
assert result is None
def test_extract_payload_no_payload_returns_none(self):
"""extract_payload matches firmware and rejects payload-less packets."""
packet = bytes([0x15, 0x02, 0xAA, 0xBB])
assert extract_payload(packet) is None
def test_parse_direct_two_byte_hops_with_transport(self):
"""TRANSPORT_DIRECT with 2-byte hops parses correctly."""
# Header: TRANSPORT_DIRECT = 0x03, GROUP_TEXT = 5 → (5<<2)|3 = 0x17
+22
View File
@@ -1269,3 +1269,25 @@ class TestMigration040:
]
finally:
await conn.close()
class TestMigrationPacketHelpers:
"""Test migration-local packet helpers against canonical path validation."""
def test_extract_payload_for_hash_rejects_oversize_path(self):
from app.migrations import _extract_payload_for_hash
packet = bytes([0x15, 0xBF]) + bytes(189) + b"payload"
assert _extract_payload_for_hash(packet) is None
def test_extract_payload_for_hash_rejects_no_payload_packet(self):
from app.migrations import _extract_payload_for_hash
packet = bytes([0x15, 0x02, 0xAA, 0xBB])
assert _extract_payload_for_hash(packet) is None
def test_extract_path_from_packet_rejects_reserved_mode(self):
from app.migrations import _extract_path_from_packet
packet = bytes([0x15, 0xC1, 0xAA, 0xBB, 0xCC])
assert _extract_path_from_packet(packet) is None
+29
View File
@@ -5,8 +5,10 @@ import pytest
from app.path_utils import (
decode_path_byte,
first_hop_hex,
parse_packet_envelope,
path_wire_len,
split_path_hex,
validate_path_byte,
)
@@ -81,6 +83,33 @@ class TestPathWireLen:
assert path_wire_len(0, 1) == 0
class TestValidatePathByte:
def test_accepts_valid_multibyte_path_len(self):
hop_count, hash_size, byte_len = validate_path_byte(0x42)
assert (hop_count, hash_size, byte_len) == (2, 2, 4)
def test_rejects_oversize_path(self):
with pytest.raises(ValueError, match="MAX_PATH_SIZE"):
validate_path_byte(0xBF)
class TestParsePacketEnvelope:
def test_parses_valid_packet(self):
envelope = parse_packet_envelope(bytes([0x15, 0x42, 0xAA, 0xBB, 0xCC, 0xDD]) + b"hi")
assert envelope is not None
assert envelope.hop_count == 2
assert envelope.hash_size == 2
assert envelope.path == bytes([0xAA, 0xBB, 0xCC, 0xDD])
assert envelope.payload == b"hi"
def test_rejects_packet_with_no_payload(self):
assert parse_packet_envelope(bytes([0x15, 0x02, 0xAA, 0xBB])) is None
def test_rejects_oversize_path_encoding(self):
packet = bytes([0x15, 0xBF]) + bytes(189) + b"x"
assert parse_packet_envelope(packet) is None
class TestSplitPathHex:
def test_one_byte_hops(self):
assert split_path_hex("1a2b3c", 3) == ["1a", "2b", "3c"]