Pass 1 on PATH integration

This commit is contained in:
Jack Kingsman
2026-03-18 19:15:02 -07:00
parent b832239e22
commit 69e09378f5
48 changed files with 1353 additions and 412 deletions
+177
View File
@@ -16,12 +16,14 @@ from app.decoder import (
_clamp_scalar,
decrypt_direct_message,
decrypt_group_text,
decrypt_path_payload,
derive_public_key,
derive_shared_secret,
extract_payload,
parse_packet,
try_decrypt_dm,
try_decrypt_packet_with_channel_key,
try_decrypt_path,
)
@@ -298,6 +300,181 @@ class TestGroupTextDecryption:
assert result is None
class TestPathDecryption:
"""Test PATH payload decryption against the firmware wire format."""
WORKED_PATH_PACKET = bytes.fromhex("22007EDE577469F4134F9B00EDD57EB4353A1B5999B7")
WORKED_PATH_SENDER_PRIV = bytes.fromhex(
"489E11DCC0A5E037E65C90D2327AA11A42EAFE0C9F68DEBE82B0F71C88C0874B"
"CC291D9B2B98A54F5C1426B7AB8156B0D684EAA4EBA755AC614A9FD32B74C308"
)
WORKED_PATH_DEST_PUB = bytes.fromhex(
"7e23132922070404863fe855248ce414b64012c891342c1fc7ee5bd3d51ea405"
)
@staticmethod
def _create_encrypted_path_payload(
*,
shared_secret: bytes,
dest_hash: int,
src_hash: int,
packed_path_len: int,
path_bytes: bytes,
extra_type: int,
extra: bytes,
) -> bytes:
plaintext = bytes([packed_path_len]) + path_bytes + bytes([extra_type]) + extra
pad_len = (16 - len(plaintext) % 16) % 16
if pad_len == 0:
pad_len = 16
plaintext += bytes(pad_len)
cipher = AES.new(shared_secret[:16], AES.MODE_ECB)
ciphertext = cipher.encrypt(plaintext)
mac = hmac.new(shared_secret, ciphertext, hashlib.sha256).digest()[:2]
return bytes([dest_hash, src_hash]) + mac + ciphertext
def test_decrypt_path_payload_matches_firmware_layout(self):
"""PATH packets are dest/src hashes plus MAC+ciphertext; decrypted data is path+extra."""
shared_secret = bytes(range(32))
payload = self._create_encrypted_path_payload(
shared_secret=shared_secret,
dest_hash=0xAE,
src_hash=0x11,
packed_path_len=0x42, # mode 1 (2-byte hops), 2 hops
path_bytes=bytes.fromhex("aabbccdd"),
extra_type=PayloadType.ACK,
extra=bytes.fromhex("01020304"),
)
result = decrypt_path_payload(payload, shared_secret)
assert result is not None
assert result.dest_hash == "ae"
assert result.src_hash == "11"
assert result.returned_path == bytes.fromhex("aabbccdd")
assert result.returned_path_len == 2
assert result.returned_path_hash_mode == 1
assert result.extra_type == PayloadType.ACK
assert result.extra[:4] == bytes.fromhex("01020304")
def test_decrypt_path_payload_rejects_corrupted_mac(self):
"""PATH payloads with a bad MAC must be rejected."""
shared_secret = bytes(range(32))
payload = self._create_encrypted_path_payload(
shared_secret=shared_secret,
dest_hash=0xAE,
src_hash=0x11,
packed_path_len=0x00,
path_bytes=b"",
extra_type=PayloadType.RESPONSE,
extra=b"\x99\x88",
)
corrupted = payload[:2] + bytes([payload[2] ^ 0xFF, payload[3]]) + payload[4:]
result = decrypt_path_payload(corrupted, shared_secret)
assert result is None
def test_decrypt_worked_path_packet_fixture(self):
"""Worked PATH sample from the design doc decrypts as a direct route."""
packet = parse_packet(self.WORKED_PATH_PACKET)
assert packet is not None
assert packet.payload_type == PayloadType.PATH
shared_secret = derive_shared_secret(
self.WORKED_PATH_SENDER_PRIV, self.WORKED_PATH_DEST_PUB
)
result = decrypt_path_payload(packet.payload, shared_secret)
assert result is not None
assert result.dest_hash == "7e"
assert result.src_hash == "de"
assert result.returned_path == b""
assert result.returned_path_len == 0
assert result.returned_path_hash_mode == 0
assert result.extra_type == 0x0F
class TestTryDecryptPath:
"""Test the full PATH decryption wrapper."""
OUR_PRIV = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
THEIR_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7")
@classmethod
def _make_path_packet(
cls,
*,
packed_path_len: int,
path_bytes: bytes,
extra_type: int,
extra: bytes,
) -> bytes:
shared_secret = derive_shared_secret(cls.OUR_PRIV, cls.THEIR_PUB)
plaintext = bytes([packed_path_len]) + path_bytes + bytes([extra_type]) + extra
pad_len = (16 - len(plaintext) % 16) % 16
if pad_len == 0:
pad_len = 16
plaintext += bytes(pad_len)
cipher = AES.new(shared_secret[:16], AES.MODE_ECB)
ciphertext = cipher.encrypt(plaintext)
mac = hmac.new(shared_secret, ciphertext, hashlib.sha256).digest()[:2]
our_public = derive_public_key(cls.OUR_PRIV)
return (
bytes([(PayloadType.PATH << 2) | RouteType.DIRECT, 0x00])
+ bytes([our_public[0], cls.THEIR_PUB[0]])
+ mac
+ ciphertext
)
def test_try_decrypt_path_decrypts_full_packet(self):
"""try_decrypt_path validates hashes, derives ECDH, and returns the route."""
raw_packet = self._make_path_packet(
packed_path_len=0x42,
path_bytes=bytes.fromhex("aabbccdd"),
extra_type=PayloadType.ACK,
extra=bytes.fromhex("01020304"),
)
result = try_decrypt_path(
raw_packet=raw_packet,
our_private_key=self.OUR_PRIV,
their_public_key=self.THEIR_PUB,
our_public_key=derive_public_key(self.OUR_PRIV),
)
assert result is not None
assert result.returned_path == bytes.fromhex("aabbccdd")
assert result.returned_path_len == 2
assert result.returned_path_hash_mode == 1
assert result.extra_type == PayloadType.ACK
assert result.extra[:4] == bytes.fromhex("01020304")
def test_try_decrypt_path_rejects_hash_mismatch(self):
"""Packets addressed to another destination are rejected before decryption."""
raw_packet = self._make_path_packet(
packed_path_len=0x00,
path_bytes=b"",
extra_type=PayloadType.RESPONSE,
extra=b"\xaa",
)
wrong_our_public = bytes.fromhex("ff") + derive_public_key(self.OUR_PRIV)[1:]
result = try_decrypt_path(
raw_packet=raw_packet,
our_private_key=self.OUR_PRIV,
their_public_key=self.THEIR_PUB,
our_public_key=wrong_our_public,
)
assert result is None
class TestTryDecryptPacket:
"""Test the full packet decryption pipeline."""