"""Tests for the packet decoder module. These tests verify the cryptographic operations for MeshCore packet decryption, which is critical for correctly interpreting mesh network messages. """ import hashlib import hmac from Crypto.Cipher import AES from app.decoder import ( DecryptedDirectMessage, PayloadType, RouteType, _clamp_scalar, decrypt_direct_message, decrypt_group_text, decrypt_path_payload, derive_public_key, derive_shared_secret, extract_payload, parse_packet, try_decrypt_dm, try_decrypt_packet_with_channel_key, try_decrypt_path, ) class TestChannelKeyDerivation: """Test channel key derivation from hashtag names.""" def test_hashtag_key_derivation(self): """Hashtag channel keys are derived as SHA256(name)[:16].""" channel_name = "#test" expected_key = hashlib.sha256(channel_name.encode("utf-8")).digest()[:16] assert len(expected_key) == 16 class TestPacketParsing: """Test raw packet header parsing.""" def test_parse_flood_packet(self): """Parse a FLOOD route type GROUP_TEXT packet.""" # Header: route_type=FLOOD(1), payload_type=GROUP_TEXT(5), version=0 # Header byte = (0 << 6) | (5 << 2) | 1 = 0x15 # Path length = 0 header = bytes([0x15, 0x00]) + b"payload_data" result = parse_packet(header) assert result is not None assert result.route_type == RouteType.FLOOD assert result.payload_type == PayloadType.GROUP_TEXT assert result.path_length == 0 assert result.payload == b"payload_data" def test_parse_direct_packet_with_path(self): """Parse a DIRECT route type packet with path data.""" # Header: route_type=DIRECT(2), payload_type=TEXT_MESSAGE(2), version=0 # Header byte = (0 << 6) | (2 << 2) | 2 = 0x0A # Path length = 3, path = [0x01, 0x02, 0x03] header = bytes([0x0A, 0x03, 0x01, 0x02, 0x03]) + b"msg" result = parse_packet(header) assert result is not None assert result.route_type == RouteType.DIRECT assert result.payload_type == PayloadType.TEXT_MESSAGE assert result.path_length == 3 assert result.payload == b"msg" def test_parse_transport_flood_skips_transport_code(self): """TRANSPORT_FLOOD packets have 4-byte transport code to skip.""" # Header: route_type=TRANSPORT_FLOOD(0), payload_type=GROUP_TEXT(5) # Header byte = (0 << 6) | (5 << 2) | 0 = 0x14 # Transport code (4 bytes) + path_length + payload header = bytes([0x14, 0xAA, 0xBB, 0xCC, 0xDD, 0x00]) + b"data" result = parse_packet(header) assert result is not None assert result.route_type == RouteType.TRANSPORT_FLOOD assert result.payload_type == PayloadType.GROUP_TEXT assert result.payload == b"data" def test_parse_empty_packet_returns_none(self): """Empty packets return None.""" assert parse_packet(b"") is None assert parse_packet(b"\x00") is None def test_parse_truncated_packet_returns_none(self): """Truncated packets return None.""" # Packet claiming path_length=10 but no path data header = bytes([0x15, 0x0A]) assert parse_packet(header) is None def test_parse_packet_with_no_payload_returns_none(self): """Firmware rejects packets that end exactly after the path.""" packet = bytes([0x15, 0x02, 0xAA, 0xBB]) assert parse_packet(packet) is None class TestMultiBytePathParsing: """Test packet parsing with multi-byte hop path encoding.""" def test_parse_two_byte_hops(self): """Parse packet with 2-byte hops: path_byte=0x42 → 2 hops × 2 bytes.""" # Header: FLOOD GROUP_TEXT = 0x15 # Path byte: 0x42 = mode 1 (2-byte), 2 hops → 4 bytes of path path_data = bytes([0xAA, 0xBB, 0xCC, 0xDD]) packet = bytes([0x15, 0x42]) + path_data + b"payload" result = parse_packet(packet) assert result is not None assert result.path_length == 2 # hop count assert result.path_hash_size == 2 assert result.path == path_data assert result.payload == b"payload" def test_parse_three_byte_hops(self): """Parse packet with 3-byte hops: path_byte=0x81 → 1 hop × 3 bytes.""" path_data = bytes([0x11, 0x22, 0x33]) packet = bytes([0x15, 0x81]) + path_data + b"pay" result = parse_packet(packet) assert result is not None assert result.path_length == 1 assert result.path_hash_size == 3 assert result.path == path_data assert result.payload == b"pay" def test_parse_reserved_mode3_returns_none(self): """Reserved mode 3 (upper bits = 0b11) should be rejected.""" # path_byte = 0xC1 → mode 3, 1 hop packet = bytes([0x15, 0xC1]) + bytes(10) result = parse_packet(packet) assert result is None def test_parse_oversize_path_len_returns_none(self): """Oversized-but-well-formed path bytes are invalid per firmware.""" packet = bytes([0x15, 0xBF]) + bytes(189) + b"payload" assert parse_packet(packet) is None def test_parse_two_byte_hops_truncated_returns_none(self): """Truncated path data for multi-byte hops returns None.""" # path_byte = 0x42 → 2 hops × 2 bytes = 4 bytes needed, only 2 provided packet = bytes([0x15, 0x42, 0xAA, 0xBB]) result = parse_packet(packet) assert result is None def test_backward_compat_one_byte_hops(self): """Old-style 1-byte hop packets still parse correctly with hash_size=1.""" packet = bytes([0x0A, 0x03, 0x01, 0x02, 0x03]) + b"msg" result = parse_packet(packet) assert result is not None assert result.path_length == 3 assert result.path_hash_size == 1 assert result.path == bytes([0x01, 0x02, 0x03]) def test_extract_payload_two_byte_hops(self): """extract_payload correctly skips multi-byte path data.""" path_data = bytes([0xAA, 0xBB, 0xCC, 0xDD]) packet = bytes([0x15, 0x42]) + path_data + b"the_payload" result = extract_payload(packet) assert result == b"the_payload" def test_extract_payload_reserved_mode_returns_none(self): """extract_payload rejects reserved mode 3.""" packet = bytes([0x15, 0xC1]) + bytes(10) result = extract_payload(packet) assert result is None def test_extract_payload_no_payload_returns_none(self): """extract_payload matches firmware and rejects payload-less packets.""" packet = bytes([0x15, 0x02, 0xAA, 0xBB]) assert extract_payload(packet) is None def test_parse_direct_two_byte_hops_with_transport(self): """TRANSPORT_DIRECT with 2-byte hops parses correctly.""" # Header: TRANSPORT_DIRECT = 0x03, GROUP_TEXT = 5 → (5<<2)|3 = 0x17 transport_code = bytes([0x11, 0x22, 0x33, 0x44]) # path_byte = 0x41 → mode 1, 1 hop → 2 bytes of path path_data = bytes([0xAB, 0xCD]) packet = bytes([0x17]) + transport_code + bytes([0x41]) + path_data + b"data" result = parse_packet(packet) assert result is not None assert result.route_type == RouteType.TRANSPORT_DIRECT assert result.path_length == 1 assert result.path_hash_size == 2 assert result.path == path_data assert result.payload == b"data" def test_zero_hops_mode1(self): """Zero hops with mode 1 (2-byte) → direct path, no path bytes.""" # path_byte = 0x40 → mode 1, 0 hops packet = bytes([0x15, 0x40]) + b"payload" result = parse_packet(packet) assert result is not None assert result.path_length == 0 assert result.path_hash_size == 2 assert result.path == b"" assert result.payload == b"payload" class TestGroupTextDecryption: """Test GROUP_TEXT (channel message) decryption.""" def _create_encrypted_payload( self, channel_key: bytes, timestamp: int, flags: int, message: str ) -> bytes: """Helper to create a valid encrypted GROUP_TEXT payload.""" # Build plaintext: timestamp(4) + flags(1) + message + null terminator plaintext = ( timestamp.to_bytes(4, "little") + bytes([flags]) + message.encode("utf-8") + b"\x00" ) # Pad to 16-byte boundary pad_len = (16 - len(plaintext) % 16) % 16 if pad_len == 0: pad_len = 16 plaintext += bytes(pad_len) # Encrypt with AES-128 ECB cipher = AES.new(channel_key, AES.MODE_ECB) ciphertext = cipher.encrypt(plaintext) # Calculate MAC: HMAC-SHA256(channel_secret, ciphertext)[:2] channel_secret = channel_key + bytes(16) mac = hmac.new(channel_secret, ciphertext, hashlib.sha256).digest()[:2] # Build payload: channel_hash(1) + mac(2) + ciphertext channel_hash = hashlib.sha256(channel_key).digest()[0:1] return channel_hash + mac + ciphertext def test_decrypt_valid_message(self): """Decrypt a valid GROUP_TEXT message.""" channel_key = hashlib.sha256(b"#testchannel").digest()[:16] timestamp = 1700000000 message = "TestUser: Hello world" payload = self._create_encrypted_payload(channel_key, timestamp, 0, message) result = decrypt_group_text(payload, channel_key) assert result is not None assert result.timestamp == timestamp assert result.sender == "TestUser" assert result.message == "Hello world" def test_decrypt_message_without_sender_prefix(self): """Messages without 'sender: ' format have no parsed sender.""" channel_key = hashlib.sha256(b"#test").digest()[:16] message = "Just a plain message" payload = self._create_encrypted_payload(channel_key, 1234567890, 0, message) result = decrypt_group_text(payload, channel_key) assert result is not None assert result.sender is None assert result.message == "Just a plain message" def test_decrypt_with_wrong_key_fails(self): """Decryption with wrong key fails MAC verification.""" correct_key = hashlib.sha256(b"#correct").digest()[:16] wrong_key = hashlib.sha256(b"#wrong").digest()[:16] payload = self._create_encrypted_payload(correct_key, 1234567890, 0, "test") result = decrypt_group_text(payload, wrong_key) assert result is None def test_decrypt_corrupted_mac_fails(self): """Corrupted MAC causes decryption to fail.""" channel_key = hashlib.sha256(b"#test").digest()[:16] payload = self._create_encrypted_payload(channel_key, 1234567890, 0, "test") # Corrupt the MAC (bytes 1-2) corrupted = payload[:1] + bytes([payload[1] ^ 0xFF, payload[2] ^ 0xFF]) + payload[3:] result = decrypt_group_text(corrupted, channel_key) assert result is None class TestPathDecryption: """Test PATH payload decryption against the firmware wire format.""" WORKED_PATH_PACKET = bytes.fromhex("22007EDE577469F4134F9B00EDD57EB4353A1B5999B7") WORKED_PATH_SENDER_PRIV = bytes.fromhex( "489E11DCC0A5E037E65C90D2327AA11A42EAFE0C9F68DEBE82B0F71C88C0874B" "CC291D9B2B98A54F5C1426B7AB8156B0D684EAA4EBA755AC614A9FD32B74C308" ) WORKED_PATH_DEST_PUB = bytes.fromhex( "7e23132922070404863fe855248ce414b64012c891342c1fc7ee5bd3d51ea405" ) @staticmethod def _create_encrypted_path_payload( *, shared_secret: bytes, dest_hash: int, src_hash: int, packed_path_len: int, path_bytes: bytes, extra_type: int, extra: bytes, ) -> bytes: plaintext = bytes([packed_path_len]) + path_bytes + bytes([extra_type]) + extra pad_len = (16 - len(plaintext) % 16) % 16 if pad_len == 0: pad_len = 16 plaintext += bytes(pad_len) cipher = AES.new(shared_secret[:16], AES.MODE_ECB) ciphertext = cipher.encrypt(plaintext) mac = hmac.new(shared_secret, ciphertext, hashlib.sha256).digest()[:2] return bytes([dest_hash, src_hash]) + mac + ciphertext def test_decrypt_path_payload_matches_firmware_layout(self): """PATH packets are dest/src hashes plus MAC+ciphertext; decrypted data is path+extra.""" shared_secret = bytes(range(32)) payload = self._create_encrypted_path_payload( shared_secret=shared_secret, dest_hash=0xAE, src_hash=0x11, packed_path_len=0x42, # mode 1 (2-byte hops), 2 hops path_bytes=bytes.fromhex("aabbccdd"), extra_type=PayloadType.ACK, extra=bytes.fromhex("01020304"), ) result = decrypt_path_payload(payload, shared_secret) assert result is not None assert result.dest_hash == "ae" assert result.src_hash == "11" assert result.returned_path == bytes.fromhex("aabbccdd") assert result.returned_path_len == 2 assert result.returned_path_hash_mode == 1 assert result.extra_type == PayloadType.ACK assert result.extra[:4] == bytes.fromhex("01020304") def test_decrypt_path_payload_rejects_corrupted_mac(self): """PATH payloads with a bad MAC must be rejected.""" shared_secret = bytes(range(32)) payload = self._create_encrypted_path_payload( shared_secret=shared_secret, dest_hash=0xAE, src_hash=0x11, packed_path_len=0x00, path_bytes=b"", extra_type=PayloadType.RESPONSE, extra=b"\x99\x88", ) corrupted = payload[:2] + bytes([payload[2] ^ 0xFF, payload[3]]) + payload[4:] result = decrypt_path_payload(corrupted, shared_secret) assert result is None def test_decrypt_worked_path_packet_fixture(self): """Worked PATH sample from the design doc decrypts as a direct route.""" packet = parse_packet(self.WORKED_PATH_PACKET) assert packet is not None assert packet.payload_type == PayloadType.PATH shared_secret = derive_shared_secret( self.WORKED_PATH_SENDER_PRIV, self.WORKED_PATH_DEST_PUB ) result = decrypt_path_payload(packet.payload, shared_secret) assert result is not None assert result.dest_hash == "7e" assert result.src_hash == "de" assert result.returned_path == b"" assert result.returned_path_len == 0 assert result.returned_path_hash_mode == 0 assert result.extra_type == 0x0F class TestTryDecryptPath: """Test the full PATH decryption wrapper.""" OUR_PRIV = bytes.fromhex( "58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B" "77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1" ) THEIR_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7") @classmethod def _make_path_packet( cls, *, packed_path_len: int, path_bytes: bytes, extra_type: int, extra: bytes, ) -> bytes: shared_secret = derive_shared_secret(cls.OUR_PRIV, cls.THEIR_PUB) plaintext = bytes([packed_path_len]) + path_bytes + bytes([extra_type]) + extra pad_len = (16 - len(plaintext) % 16) % 16 if pad_len == 0: pad_len = 16 plaintext += bytes(pad_len) cipher = AES.new(shared_secret[:16], AES.MODE_ECB) ciphertext = cipher.encrypt(plaintext) mac = hmac.new(shared_secret, ciphertext, hashlib.sha256).digest()[:2] our_public = derive_public_key(cls.OUR_PRIV) return ( bytes([(PayloadType.PATH << 2) | RouteType.DIRECT, 0x00]) + bytes([our_public[0], cls.THEIR_PUB[0]]) + mac + ciphertext ) def test_try_decrypt_path_decrypts_full_packet(self): """try_decrypt_path validates hashes, derives ECDH, and returns the route.""" raw_packet = self._make_path_packet( packed_path_len=0x42, path_bytes=bytes.fromhex("aabbccdd"), extra_type=PayloadType.ACK, extra=bytes.fromhex("01020304"), ) result = try_decrypt_path( raw_packet=raw_packet, our_private_key=self.OUR_PRIV, their_public_key=self.THEIR_PUB, our_public_key=derive_public_key(self.OUR_PRIV), ) assert result is not None assert result.returned_path == bytes.fromhex("aabbccdd") assert result.returned_path_len == 2 assert result.returned_path_hash_mode == 1 assert result.extra_type == PayloadType.ACK assert result.extra[:4] == bytes.fromhex("01020304") def test_try_decrypt_path_rejects_hash_mismatch(self): """Packets addressed to another destination are rejected before decryption.""" raw_packet = self._make_path_packet( packed_path_len=0x00, path_bytes=b"", extra_type=PayloadType.RESPONSE, extra=b"\xaa", ) wrong_our_public = bytes.fromhex("ff") + derive_public_key(self.OUR_PRIV)[1:] result = try_decrypt_path( raw_packet=raw_packet, our_private_key=self.OUR_PRIV, their_public_key=self.THEIR_PUB, our_public_key=wrong_our_public, ) assert result is None class TestTryDecryptPacket: """Test the full packet decryption pipeline.""" def test_only_group_text_packets_decrypted(self): """Non-GROUP_TEXT packets return None.""" # TEXT_MESSAGE packet (payload_type=2) # Header: route_type=FLOOD(1), payload_type=TEXT_MESSAGE(2) # Header byte = (0 << 6) | (2 << 2) | 1 = 0x09 packet = bytes([0x09, 0x00]) + b"some_data" key = bytes(16) result = try_decrypt_packet_with_channel_key(packet, key) assert result is None def test_channel_hash_mismatch_returns_none(self): """Packets with non-matching channel hash return None early.""" # GROUP_TEXT packet with channel_hash that doesn't match our key # Header: route_type=FLOOD(1), payload_type=GROUP_TEXT(5) # Header byte = 0x15 wrong_hash = bytes([0xFF]) # Unlikely to match any real key packet = bytes([0x15, 0x00]) + wrong_hash + bytes(20) key = hashlib.sha256(b"#test").digest()[:16] result = try_decrypt_packet_with_channel_key(packet, key) assert result is None class TestRealWorldPackets: """Test with real captured packets to ensure decoder matches protocol.""" def test_decrypt_six77_channel_message(self): """Decrypt a real packet from #six77 channel.""" # Real packet captured from #six77 hashtag channel packet_hex = ( "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D" "99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D35182" "83156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF" "0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D" "07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818" ) packet = bytes.fromhex(packet_hex) # Verify key derivation: SHA256("#six77")[:16] channel_key = hashlib.sha256(b"#six77").digest()[:16] assert channel_key.hex() == "7aba109edcf304a84433cb71d0f3ab73" # Decrypt the packet result = try_decrypt_packet_with_channel_key(packet, channel_key) assert result is not None assert result.sender == "Flightless🥝" assert "hashtag room is essentially public" in result.message assert result.channel_hash == "e6" assert result.timestamp == 1766604717 class TestAdvertisementParsing: """Test parsing of advertisement packets.""" def test_parse_repeater_advertisement_with_gps(self): """Parse a repeater advertisement with GPS coordinates.""" from app.decoder import parse_advertisement, parse_packet # Repeater packet with lat/lon of 49.02056 / -123.82935 # Flags 0x92: Role=Repeater (2), Location=Yes, Name=Yes packet_hex = ( "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63" "AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED" "965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BA" "F9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB" ) packet = bytes.fromhex(packet_hex) info = parse_packet(packet) assert info is not None result = parse_advertisement(info.payload) assert result is not None assert ( result.public_key == "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab" ) assert result.name == "Can O Mesh 2 🥫" assert result.device_role == 2 # Repeater assert result.timestamp > 0 # Has valid timestamp assert result.lat is not None assert result.lon is not None assert abs(result.lat - 49.02056) < 0.000001 assert abs(result.lon - (-123.82935)) < 0.000001 def test_parse_chat_node_advertisement_with_gps(self): """Parse a chat node advertisement with GPS coordinates.""" from app.decoder import parse_advertisement, parse_packet # Chat node packet with lat/lon of 47.786038 / -122.344096 # Flags 0x91: Role=Chat (1), Location=Yes, Name=Yes packet_hex = ( "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" "2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D" "F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D" "913628D902602DB5F8466C696768746C657373F09FA59D" ) packet = bytes.fromhex(packet_hex) info = parse_packet(packet) assert info is not None result = parse_advertisement(info.payload) assert result is not None assert ( result.public_key == "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83" ) assert result.name == "Flightless🥝" assert result.device_role == 1 # Chat node assert result.timestamp > 0 # Has valid timestamp assert result.lat is not None assert result.lon is not None assert abs(result.lat - 47.786038) < 0.000001 assert abs(result.lon - (-122.344096)) < 0.000001 def test_parse_advertisement_without_gps(self): """Parse an advertisement without GPS coordinates.""" from app.decoder import parse_advertisement, parse_packet # Chat node packet without location # Flags 0x81: Role=Chat (1), Location=No, Name=Yes packet_hex = ( "1104D7F9E07A2E38C81F7DC0C1CEDDED6B415B4367CF48F578C5A092CED3490FF0C7" "6EFDF1F5A4BD6669D3D143CFF384D8B3BD950CDCA31C98B7DA789D004D04DED31E16" "B998E1AE352B283EAC8ABCF1F07214EC3BBF7AF3EB8EBF15C00417F2425A259E7CE6" "A875BA0D814D656E6E697344" ) packet = bytes.fromhex(packet_hex) info = parse_packet(packet) assert info is not None result = parse_advertisement(info.payload) assert result is not None assert ( result.public_key == "2e38c81f7dc0c1cedded6b415b4367cf48f578c5a092ced3490ff0c76efdf1f5" ) assert result.name == "MennisD" assert result.device_role == 1 # Chat node assert result.timestamp > 0 # Has valid timestamp assert result.lat is None assert result.lon is None def test_parse_advertisement_discards_out_of_range_gps(self): """Out-of-range advert coordinates are treated as missing.""" from app.decoder import parse_advertisement payload = bytearray() payload.extend( bytes.fromhex("f29fdc7c560f9d813d1593a8587fa46a9e7efe2f5506d38c0af41307bf9e517a") ) payload.extend((1718749967).to_bytes(4, byteorder="little")) payload.extend(bytes(64)) payload.append(0x92) payload.extend((-593497573).to_bytes(4, byteorder="little", signed=True)) payload.extend((-1659939204).to_bytes(4, byteorder="little", signed=True)) payload.extend(b"Tacompton") raw_packet = bytes.fromhex("11") + bytes(payload) result = parse_advertisement(bytes(payload), raw_packet=raw_packet) assert result is not None assert ( result.public_key == "f29fdc7c560f9d813d1593a8587fa46a9e7efe2f5506d38c0af41307bf9e517a" ) assert result.name == "Tacompton" assert result.device_role == 2 assert result.lat is None assert result.lon is None def test_parse_advertisement_extracts_public_key(self): """Advertisement parsing extracts the public key correctly.""" from app.decoder import parse_advertisement, parse_packet packet_hex = ( "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" "2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D" "F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D" "913628D902602DB5F8466C696768746C657373F09FA59D" ) packet = bytes.fromhex(packet_hex) info = parse_packet(packet) assert info is not None result = parse_advertisement(info.payload) assert result is not None assert ( result.public_key == "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83" ) def test_non_advertisement_returns_none(self): """Non-advertisement payload returns None when parsed as advertisement.""" from app.decoder import parse_advertisement, parse_packet # GROUP_TEXT packet, not an advertisement packet = bytes([0x15, 0x00]) + bytes(50) info = parse_packet(packet) assert info is not None result = parse_advertisement(info.payload) assert result is None class TestScalarClamping: """Test X25519 scalar clamping for ECDH.""" def test_clamp_scalar_modifies_first_byte(self): """Clamping clears the lower 3 bits of the first byte.""" # Input with all bits set in first byte scalar = bytes([0xFF]) + bytes(31) result = _clamp_scalar(scalar) # First byte should have lower 3 bits cleared: 0xFF & 248 = 0xF8 assert result[0] == 0xF8 def test_clamp_scalar_modifies_last_byte(self): """Clamping modifies the last byte for correct group operations.""" # Input with all bits set in last byte scalar = bytes(31) + bytes([0xFF]) result = _clamp_scalar(scalar) # Last byte: (0xFF & 63) | 64 = 0x7F assert result[31] == 0x7F def test_clamp_scalar_preserves_middle_bytes(self): """Clamping preserves the middle bytes unchanged.""" # Known middle bytes scalar = bytes([0xAB]) + bytes([0x12, 0x34, 0x56] * 10)[:30] + bytes([0xCD]) result = _clamp_scalar(scalar) # Middle bytes should be unchanged assert result[1:31] == scalar[1:31] def test_clamp_scalar_truncates_to_32_bytes(self): """Clamping uses only first 32 bytes of input.""" # 64-byte input (typical Ed25519 private key) scalar = bytes(64) result = _clamp_scalar(scalar) assert len(result) == 32 class TestPublicKeyDerivation: """Test deriving Ed25519 public key from MeshCore private key.""" # Test data from real MeshCore keys # The private key's first 32 bytes are the scalar (post-SHA-512 clamped) # The public key is derived via scalar × basepoint, NOT from the last 32 bytes # # IMPORTANT: The last 32 bytes of a MeshCore private key are the signing prefix, # NOT the public key! Standard Ed25519 libraries will give wrong results because # they expect a seed, not a raw scalar. FACE12_PRIV = bytes.fromhex( "58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B" "77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1" ) # Expected public key derived from scalar × basepoint # Note: This starts with "face12" - the derived public key, NOT the signing prefix FACE12_PUB_EXPECTED = bytes.fromhex( "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46" ) def test_derive_public_key_from_meshcore_private(self): """derive_public_key correctly derives public key from MeshCore private key.""" result = derive_public_key(self.FACE12_PRIV) assert len(result) == 32 assert result == self.FACE12_PUB_EXPECTED def test_derive_public_key_from_scalar_only(self): """derive_public_key works with just the 32-byte scalar.""" scalar_only = self.FACE12_PRIV[:32] result = derive_public_key(scalar_only) assert len(result) == 32 assert result == self.FACE12_PUB_EXPECTED def test_derive_public_key_deterministic(self): """Same private key always produces same public key.""" result1 = derive_public_key(self.FACE12_PRIV) result2 = derive_public_key(self.FACE12_PRIV) assert result1 == result2 class TestSharedSecretDerivation: """Test ECDH shared secret derivation from Ed25519 keys.""" # Test data from real MeshCore keys # The private key's first 32 bytes are the scalar (post-SHA-512 clamped) # The last 32 bytes are the signing prefix (NOT the public key, though they may match) FACE12_PRIV = bytes.fromhex( "58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B" "77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1" ) # a1b2c3 public key (32 bytes) A1B2C3_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7") def test_derive_shared_secret_returns_32_bytes(self): """Shared secret derivation returns 32-byte value.""" result = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) assert len(result) == 32 def test_derive_shared_secret_deterministic(self): """Same inputs always produce same shared secret.""" result1 = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) result2 = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) assert result1 == result2 def test_derive_shared_secret_different_keys_different_result(self): """Different key pairs produce different shared secrets.""" # Use the real FACE12 public key as a second peer key (valid curve point) face12_pub = derive_public_key(self.FACE12_PRIV) result1 = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) result2 = derive_shared_secret(self.FACE12_PRIV, face12_pub) assert result1 != result2 class TestDirectMessageDecryption: """Test TEXT_MESSAGE (direct message) payload decryption.""" # Real test vector from user # Payload: [dest_hash:1][src_hash:1][mac:2][ciphertext] PAYLOAD = bytes.fromhex( "FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B" ) # Keys for deriving shared secret FACE12_PRIV = bytes.fromhex( "58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B" "77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1" ) A1B2C3_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7") EXPECTED_MESSAGE = "Hello there, Mr. Face!" def test_decrypt_real_dm_payload(self): """Decrypt a real DM payload with known shared secret.""" shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) result = decrypt_direct_message(self.PAYLOAD, shared_secret) assert result is not None assert result.message == self.EXPECTED_MESSAGE assert result.dest_hash == "fa" # First byte of payload assert result.src_hash == "a1" # Second byte, matches a1b2c3 def test_decrypt_extracts_timestamp(self): """Decrypted message contains valid timestamp.""" shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) result = decrypt_direct_message(self.PAYLOAD, shared_secret) assert result is not None assert result.timestamp > 0 # Non-zero timestamp assert result.timestamp < 2**32 # Within uint32 range def test_decrypt_extracts_flags(self): """Decrypted message contains flags byte.""" shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) result = decrypt_direct_message(self.PAYLOAD, shared_secret) assert result is not None assert isinstance(result.flags, int) assert 0 <= result.flags <= 255 def test_decrypt_with_wrong_secret_fails(self): """Decryption with incorrect shared secret fails MAC verification.""" wrong_secret = bytes(32) # All zeros result = decrypt_direct_message(self.PAYLOAD, wrong_secret) assert result is None def test_decrypt_with_corrupted_mac_fails(self): """Corrupted MAC causes decryption to fail.""" shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) # Corrupt the MAC (bytes 2-3) corrupted = self.PAYLOAD[:2] + bytes([0xFF, 0xFF]) + self.PAYLOAD[4:] result = decrypt_direct_message(corrupted, shared_secret) assert result is None def test_decrypt_too_short_payload_returns_none(self): """Payloads shorter than minimum (4 bytes) return None.""" shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) result = decrypt_direct_message(bytes(3), shared_secret) assert result is None def test_decrypt_invalid_ciphertext_length_returns_none(self): """Ciphertext not a multiple of 16 bytes returns None.""" shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB) # 4-byte header + 15-byte ciphertext (not multiple of 16) invalid_payload = bytes(4 + 15) result = decrypt_direct_message(invalid_payload, shared_secret) assert result is None def test_decrypt_signed_room_post_extracts_author_prefix(self): """TXT_TYPE_SIGNED_PLAIN room posts expose the 4-byte author prefix separately.""" shared_secret = bytes(range(32)) timestamp = 1_700_000_000 flags = (2 << 2) | 1 author_prefix = bytes.fromhex("aabbccdd") plaintext = ( timestamp.to_bytes(4, "little") + bytes([flags]) + author_prefix + b"hello room" + b"\x00" ) padded = plaintext + (b"\x00" * ((16 - (len(plaintext) % 16)) % 16)) cipher = AES.new(shared_secret[:16], AES.MODE_ECB) ciphertext = cipher.encrypt(padded) mac = hmac.new(shared_secret, ciphertext, hashlib.sha256).digest()[:2] payload = bytes.fromhex("1020") + mac + ciphertext result = decrypt_direct_message(payload, shared_secret) assert result is not None assert result.txt_type == 2 assert result.attempt == 1 assert result.signed_sender_prefix == "aabbccdd" assert result.message == "hello room" class TestTryDecryptDM: """Test full packet decryption for direct messages.""" # Full packet: header + path_length + payload # Header byte = 0x09: route_type=FLOOD(1), payload_type=TEXT_MESSAGE(2) # Header byte = (0 << 6) | (2 << 2) | 1 = 0x09 # Path length = 0 FULL_PACKET = bytes.fromhex( "0900FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B" ) # Keys FACE12_PRIV = bytes.fromhex( "58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B" "77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1" ) # FACE12 public key - derived via scalar × basepoint, NOT the last 32 bytes! # The last 32 bytes (77AC...) are the signing prefix, not the public key. FACE12_PUB = bytes.fromhex("FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46") A1B2C3_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7") EXPECTED_MESSAGE = "Hello there, Mr. Face!" def test_try_decrypt_dm_full_packet(self): """Decrypt a full TEXT_MESSAGE packet.""" result = try_decrypt_dm( self.FULL_PACKET, self.FACE12_PRIV, self.A1B2C3_PUB, our_public_key=self.FACE12_PUB, ) assert result is not None assert result.message == self.EXPECTED_MESSAGE def test_try_decrypt_dm_inbound_message(self): """Decrypt an inbound message (from them to us).""" # src_hash = a1 matches A1B2C3's first byte result = try_decrypt_dm( self.FULL_PACKET, self.FACE12_PRIV, self.A1B2C3_PUB, our_public_key=None, # Without our pubkey, only checks inbound ) assert result is not None assert result.src_hash == "a1" def test_try_decrypt_dm_non_text_message_returns_none(self): """Non-TEXT_MESSAGE packets return None.""" # GROUP_TEXT packet (payload_type=5) # Header byte = (0 << 6) | (5 << 2) | 1 = 0x15 group_text_packet = bytes([0x15, 0x00]) + self.FULL_PACKET[2:] result = try_decrypt_dm( group_text_packet, self.FACE12_PRIV, self.A1B2C3_PUB, ) assert result is None def test_try_decrypt_dm_wrong_src_hash_returns_none(self): """Packets from unknown senders return None.""" # Create a packet with different src_hash # Original: FA A1 ... -> dest=FA, src=A1 # Modified: FA BB ... -> dest=FA, src=BB (doesn't match A1B2C3) modified_payload = bytes([0xFA, 0xBB]) + self.FULL_PACKET[4:] modified_packet = self.FULL_PACKET[:2] + modified_payload result = try_decrypt_dm( modified_packet, self.FACE12_PRIV, self.A1B2C3_PUB, our_public_key=self.FACE12_PUB, ) assert result is None def test_try_decrypt_dm_empty_packet_returns_none(self): """Empty packets return None.""" result = try_decrypt_dm( b"", self.FACE12_PRIV, self.A1B2C3_PUB, ) assert result is None def test_try_decrypt_dm_truncated_packet_returns_none(self): """Truncated packets return None.""" result = try_decrypt_dm( self.FULL_PACKET[:5], # Only header + partial payload self.FACE12_PRIV, self.A1B2C3_PUB, ) assert result is None class TestRealWorldDMPacket: """End-to-end test with exact real-world test data.""" def test_full_dm_decryption_flow(self): """ Complete decryption flow with real test vectors. Test data from user: - face12 private key (64 bytes Ed25519) - a1b2c3 public key (32 bytes) - Encrypted payload producing "Hello there, Mr. Face!" """ # Keys face12_priv = bytes.fromhex( "58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B" "77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1" ) # Derived public key (scalar × basepoint) - NOT the signing prefix from bytes 32-64 # First byte is 0xFA, matching dest_hash in test packet face12_pub = bytes.fromhex( "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46" ) a1b2c3_pub = bytes.fromhex( "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7" ) # Full packet with header full_packet = bytes.fromhex( "0900FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B" ) # Decrypt result = try_decrypt_dm( full_packet, face12_priv, a1b2c3_pub, our_public_key=face12_pub, ) # Verify assert result is not None assert isinstance(result, DecryptedDirectMessage) assert result.message == "Hello there, Mr. Face!" assert result.dest_hash == "fa" # First byte of derived face12 pubkey (0xFA) assert result.src_hash == "a1" # First byte of a1b2c3 pubkey