"""Tests for the packet decoder module. These tests verify the cryptographic operations for MeshCore packet decryption, which is critical for correctly interpreting mesh network messages. """ import hashlib import hmac import pytest from Crypto.Cipher import AES from app.decoder import ( DecryptedGroupText, PacketInfo, PayloadType, RouteType, calculate_channel_hash, decrypt_group_text, parse_packet, try_decrypt_packet_with_channel_key, ) class TestChannelKeyDerivation: """Test channel key derivation from hashtag names.""" def test_hashtag_key_derivation(self): """Hashtag channel keys are derived as SHA256(name)[:16].""" channel_name = "#test" expected_key = hashlib.sha256(channel_name.encode("utf-8")).digest()[:16] # This matches the meshcore_py implementation assert len(expected_key) == 16 def test_channel_hash_calculation(self): """Channel hash is the first byte of SHA256(key) as hex.""" key = bytes(16) # All zeros expected_hash = format(hashlib.sha256(key).digest()[0], "02x") result = calculate_channel_hash(key) assert result == expected_hash assert len(result) == 2 # Two hex chars class TestPacketParsing: """Test raw packet header parsing.""" def test_parse_flood_packet(self): """Parse a FLOOD route type GROUP_TEXT packet.""" # Header: route_type=FLOOD(1), payload_type=GROUP_TEXT(5), version=0 # Header byte = (0 << 6) | (5 << 2) | 1 = 0x15 # Path length = 0 header = bytes([0x15, 0x00]) + b"payload_data" result = parse_packet(header) assert result is not None assert result.route_type == RouteType.FLOOD assert result.payload_type == PayloadType.GROUP_TEXT assert result.path_length == 0 assert result.payload == b"payload_data" def test_parse_direct_packet_with_path(self): """Parse a DIRECT route type packet with path data.""" # Header: route_type=DIRECT(2), payload_type=TEXT_MESSAGE(2), version=0 # Header byte = (0 << 6) | (2 << 2) | 2 = 0x0A # Path length = 3, path = [0x01, 0x02, 0x03] header = bytes([0x0A, 0x03, 0x01, 0x02, 0x03]) + b"msg" result = parse_packet(header) assert result is not None assert result.route_type == RouteType.DIRECT assert result.payload_type == PayloadType.TEXT_MESSAGE assert result.path_length == 3 assert result.payload == b"msg" def test_parse_transport_flood_skips_transport_code(self): """TRANSPORT_FLOOD packets have 4-byte transport code to skip.""" # Header: route_type=TRANSPORT_FLOOD(0), payload_type=GROUP_TEXT(5) # Header byte = (0 << 6) | (5 << 2) | 0 = 0x14 # Transport code (4 bytes) + path_length + payload header = bytes([0x14, 0xAA, 0xBB, 0xCC, 0xDD, 0x00]) + b"data" result = parse_packet(header) assert result is not None assert result.route_type == RouteType.TRANSPORT_FLOOD assert result.payload_type == PayloadType.GROUP_TEXT assert result.payload == b"data" def test_parse_empty_packet_returns_none(self): """Empty packets return None.""" assert parse_packet(b"") is None assert parse_packet(b"\x00") is None def test_parse_truncated_packet_returns_none(self): """Truncated packets return None.""" # Packet claiming path_length=10 but no path data header = bytes([0x15, 0x0A]) assert parse_packet(header) is None class TestGroupTextDecryption: """Test GROUP_TEXT (channel message) decryption.""" def _create_encrypted_payload( self, channel_key: bytes, timestamp: int, flags: int, message: str ) -> bytes: """Helper to create a valid encrypted GROUP_TEXT payload.""" # Build plaintext: timestamp(4) + flags(1) + message + null terminator plaintext = ( timestamp.to_bytes(4, "little") + bytes([flags]) + message.encode("utf-8") + b"\x00" ) # Pad to 16-byte boundary pad_len = (16 - len(plaintext) % 16) % 16 if pad_len == 0: pad_len = 16 plaintext += bytes(pad_len) # Encrypt with AES-128 ECB cipher = AES.new(channel_key, AES.MODE_ECB) ciphertext = cipher.encrypt(plaintext) # Calculate MAC: HMAC-SHA256(channel_secret, ciphertext)[:2] channel_secret = channel_key + bytes(16) mac = hmac.new(channel_secret, ciphertext, hashlib.sha256).digest()[:2] # Build payload: channel_hash(1) + mac(2) + ciphertext channel_hash = hashlib.sha256(channel_key).digest()[0:1] return channel_hash + mac + ciphertext def test_decrypt_valid_message(self): """Decrypt a valid GROUP_TEXT message.""" channel_key = hashlib.sha256(b"#testchannel").digest()[:16] timestamp = 1700000000 message = "TestUser: Hello world" payload = self._create_encrypted_payload(channel_key, timestamp, 0, message) result = decrypt_group_text(payload, channel_key) assert result is not None assert result.timestamp == timestamp assert result.sender == "TestUser" assert result.message == "Hello world" def test_decrypt_message_without_sender_prefix(self): """Messages without 'sender: ' format have no parsed sender.""" channel_key = hashlib.sha256(b"#test").digest()[:16] message = "Just a plain message" payload = self._create_encrypted_payload(channel_key, 1234567890, 0, message) result = decrypt_group_text(payload, channel_key) assert result is not None assert result.sender is None assert result.message == "Just a plain message" def test_decrypt_with_wrong_key_fails(self): """Decryption with wrong key fails MAC verification.""" correct_key = hashlib.sha256(b"#correct").digest()[:16] wrong_key = hashlib.sha256(b"#wrong").digest()[:16] payload = self._create_encrypted_payload(correct_key, 1234567890, 0, "test") result = decrypt_group_text(payload, wrong_key) assert result is None def test_decrypt_corrupted_mac_fails(self): """Corrupted MAC causes decryption to fail.""" channel_key = hashlib.sha256(b"#test").digest()[:16] payload = self._create_encrypted_payload(channel_key, 1234567890, 0, "test") # Corrupt the MAC (bytes 1-2) corrupted = payload[:1] + bytes([payload[1] ^ 0xFF, payload[2] ^ 0xFF]) + payload[3:] result = decrypt_group_text(corrupted, channel_key) assert result is None class TestTryDecryptPacket: """Test the full packet decryption pipeline.""" def test_only_group_text_packets_decrypted(self): """Non-GROUP_TEXT packets return None.""" # TEXT_MESSAGE packet (payload_type=2) # Header: route_type=FLOOD(1), payload_type=TEXT_MESSAGE(2) # Header byte = (0 << 6) | (2 << 2) | 1 = 0x09 packet = bytes([0x09, 0x00]) + b"some_data" key = bytes(16) result = try_decrypt_packet_with_channel_key(packet, key) assert result is None def test_channel_hash_mismatch_returns_none(self): """Packets with non-matching channel hash return None early.""" # GROUP_TEXT packet with channel_hash that doesn't match our key # Header: route_type=FLOOD(1), payload_type=GROUP_TEXT(5) # Header byte = 0x15 wrong_hash = bytes([0xFF]) # Unlikely to match any real key packet = bytes([0x15, 0x00]) + wrong_hash + bytes(20) key = hashlib.sha256(b"#test").digest()[:16] result = try_decrypt_packet_with_channel_key(packet, key) assert result is None class TestRealWorldPackets: """Test with real captured packets to ensure decoder matches protocol.""" def test_decrypt_six77_channel_message(self): """Decrypt a real packet from #six77 channel.""" # Real packet captured from #six77 hashtag channel packet_hex = ( "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D" "99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D35182" "83156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF" "0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D" "07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818" ) packet = bytes.fromhex(packet_hex) # Verify key derivation: SHA256("#six77")[:16] channel_key = hashlib.sha256(b"#six77").digest()[:16] assert channel_key.hex() == "7aba109edcf304a84433cb71d0f3ab73" # Decrypt the packet result = try_decrypt_packet_with_channel_key(packet, channel_key) assert result is not None assert result.sender == "Flightless🥝" assert "hashtag room is essentially public" in result.message assert result.channel_hash == "e6" assert result.timestamp == 1766604717 class TestAdvertisementParsing: """Test parsing of advertisement packets.""" def test_parse_repeater_advertisement_with_gps(self): """Parse a repeater advertisement with GPS coordinates.""" from app.decoder import try_parse_advertisement # Repeater packet with lat/lon of 49.02056 / -123.82935 # Flags 0x92: Role=Repeater (2), Location=Yes, Name=Yes packet_hex = ( "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63" "AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED" "965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BA" "F9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB" ) packet = bytes.fromhex(packet_hex) result = try_parse_advertisement(packet) assert result is not None assert result.public_key == "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab" assert result.name == "Can O Mesh 2 🥫" assert result.device_role == 2 # Repeater assert result.timestamp > 0 # Has valid timestamp assert result.lat is not None assert result.lon is not None assert abs(result.lat - 49.02056) < 0.000001 assert abs(result.lon - (-123.82935)) < 0.000001 def test_parse_chat_node_advertisement_with_gps(self): """Parse a chat node advertisement with GPS coordinates.""" from app.decoder import try_parse_advertisement # Chat node packet with lat/lon of 47.786038 / -122.344096 # Flags 0x91: Role=Chat (1), Location=Yes, Name=Yes packet_hex = ( "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" "2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D" "F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D" "913628D902602DB5F8466C696768746C657373F09FA59D" ) packet = bytes.fromhex(packet_hex) result = try_parse_advertisement(packet) assert result is not None assert result.public_key == "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83" assert result.name == "Flightless🥝" assert result.device_role == 1 # Chat node assert result.timestamp > 0 # Has valid timestamp assert result.lat is not None assert result.lon is not None assert abs(result.lat - 47.786038) < 0.000001 assert abs(result.lon - (-122.344096)) < 0.000001 def test_parse_advertisement_without_gps(self): """Parse an advertisement without GPS coordinates.""" from app.decoder import try_parse_advertisement # Chat node packet without location # Flags 0x81: Role=Chat (1), Location=No, Name=Yes packet_hex = ( "1104D7F9E07A2E38C81F7DC0C1CEDDED6B415B4367CF48F578C5A092CED3490FF0C7" "6EFDF1F5A4BD6669D3D143CFF384D8B3BD950CDCA31C98B7DA789D004D04DED31E16" "B998E1AE352B283EAC8ABCF1F07214EC3BBF7AF3EB8EBF15C00417F2425A259E7CE6" "A875BA0D814D656E6E697344" ) packet = bytes.fromhex(packet_hex) result = try_parse_advertisement(packet) assert result is not None assert result.public_key == "2e38c81f7dc0c1cedded6b415b4367cf48f578c5a092ced3490ff0c76efdf1f5" assert result.name == "MennisD" assert result.device_role == 1 # Chat node assert result.timestamp > 0 # Has valid timestamp assert result.lat is None assert result.lon is None def test_parse_advertisement_extracts_public_key(self): """Advertisement parsing extracts the public key correctly.""" from app.decoder import parse_packet, PayloadType packet_hex = ( "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83" "2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D" "F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D" "913628D902602DB5F8466C696768746C657373F09FA59D" ) packet = bytes.fromhex(packet_hex) # Verify packet is recognized as ADVERT type info = parse_packet(packet) assert info is not None assert info.payload_type == PayloadType.ADVERT def test_non_advertisement_returns_none(self): """Non-advertisement packets return None from try_parse_advertisement.""" from app.decoder import try_parse_advertisement # GROUP_TEXT packet, not an advertisement packet = bytes([0x15, 0x00]) + bytes(50) result = try_parse_advertisement(packet) assert result is None