Files
Remote-Terminal-for-MeshCore/tests/test_decoder.py
2026-01-14 20:08:41 -08:00

358 lines
14 KiB
Python

"""Tests for the packet decoder module.
These tests verify the cryptographic operations for MeshCore packet decryption,
which is critical for correctly interpreting mesh network messages.
"""
import hashlib
import hmac
from Crypto.Cipher import AES
from app.decoder import (
PayloadType,
RouteType,
calculate_channel_hash,
decrypt_group_text,
parse_packet,
try_decrypt_packet_with_channel_key,
)
class TestChannelKeyDerivation:
"""Test channel key derivation from hashtag names."""
def test_hashtag_key_derivation(self):
"""Hashtag channel keys are derived as SHA256(name)[:16]."""
channel_name = "#test"
expected_key = hashlib.sha256(channel_name.encode("utf-8")).digest()[:16]
# This matches the meshcore_py implementation
assert len(expected_key) == 16
def test_channel_hash_calculation(self):
"""Channel hash is the first byte of SHA256(key) as hex."""
key = bytes(16) # All zeros
expected_hash = format(hashlib.sha256(key).digest()[0], "02x")
result = calculate_channel_hash(key)
assert result == expected_hash
assert len(result) == 2 # Two hex chars
class TestPacketParsing:
"""Test raw packet header parsing."""
def test_parse_flood_packet(self):
"""Parse a FLOOD route type GROUP_TEXT packet."""
# Header: route_type=FLOOD(1), payload_type=GROUP_TEXT(5), version=0
# Header byte = (0 << 6) | (5 << 2) | 1 = 0x15
# Path length = 0
header = bytes([0x15, 0x00]) + b"payload_data"
result = parse_packet(header)
assert result is not None
assert result.route_type == RouteType.FLOOD
assert result.payload_type == PayloadType.GROUP_TEXT
assert result.path_length == 0
assert result.payload == b"payload_data"
def test_parse_direct_packet_with_path(self):
"""Parse a DIRECT route type packet with path data."""
# Header: route_type=DIRECT(2), payload_type=TEXT_MESSAGE(2), version=0
# Header byte = (0 << 6) | (2 << 2) | 2 = 0x0A
# Path length = 3, path = [0x01, 0x02, 0x03]
header = bytes([0x0A, 0x03, 0x01, 0x02, 0x03]) + b"msg"
result = parse_packet(header)
assert result is not None
assert result.route_type == RouteType.DIRECT
assert result.payload_type == PayloadType.TEXT_MESSAGE
assert result.path_length == 3
assert result.payload == b"msg"
def test_parse_transport_flood_skips_transport_code(self):
"""TRANSPORT_FLOOD packets have 4-byte transport code to skip."""
# Header: route_type=TRANSPORT_FLOOD(0), payload_type=GROUP_TEXT(5)
# Header byte = (0 << 6) | (5 << 2) | 0 = 0x14
# Transport code (4 bytes) + path_length + payload
header = bytes([0x14, 0xAA, 0xBB, 0xCC, 0xDD, 0x00]) + b"data"
result = parse_packet(header)
assert result is not None
assert result.route_type == RouteType.TRANSPORT_FLOOD
assert result.payload_type == PayloadType.GROUP_TEXT
assert result.payload == b"data"
def test_parse_empty_packet_returns_none(self):
"""Empty packets return None."""
assert parse_packet(b"") is None
assert parse_packet(b"\x00") is None
def test_parse_truncated_packet_returns_none(self):
"""Truncated packets return None."""
# Packet claiming path_length=10 but no path data
header = bytes([0x15, 0x0A])
assert parse_packet(header) is None
class TestGroupTextDecryption:
"""Test GROUP_TEXT (channel message) decryption."""
def _create_encrypted_payload(
self, channel_key: bytes, timestamp: int, flags: int, message: str
) -> bytes:
"""Helper to create a valid encrypted GROUP_TEXT payload."""
# Build plaintext: timestamp(4) + flags(1) + message + null terminator
plaintext = (
timestamp.to_bytes(4, "little") + bytes([flags]) + message.encode("utf-8") + b"\x00"
)
# Pad to 16-byte boundary
pad_len = (16 - len(plaintext) % 16) % 16
if pad_len == 0:
pad_len = 16
plaintext += bytes(pad_len)
# Encrypt with AES-128 ECB
cipher = AES.new(channel_key, AES.MODE_ECB)
ciphertext = cipher.encrypt(plaintext)
# Calculate MAC: HMAC-SHA256(channel_secret, ciphertext)[:2]
channel_secret = channel_key + bytes(16)
mac = hmac.new(channel_secret, ciphertext, hashlib.sha256).digest()[:2]
# Build payload: channel_hash(1) + mac(2) + ciphertext
channel_hash = hashlib.sha256(channel_key).digest()[0:1]
return channel_hash + mac + ciphertext
def test_decrypt_valid_message(self):
"""Decrypt a valid GROUP_TEXT message."""
channel_key = hashlib.sha256(b"#testchannel").digest()[:16]
timestamp = 1700000000
message = "TestUser: Hello world"
payload = self._create_encrypted_payload(channel_key, timestamp, 0, message)
result = decrypt_group_text(payload, channel_key)
assert result is not None
assert result.timestamp == timestamp
assert result.sender == "TestUser"
assert result.message == "Hello world"
def test_decrypt_message_without_sender_prefix(self):
"""Messages without 'sender: ' format have no parsed sender."""
channel_key = hashlib.sha256(b"#test").digest()[:16]
message = "Just a plain message"
payload = self._create_encrypted_payload(channel_key, 1234567890, 0, message)
result = decrypt_group_text(payload, channel_key)
assert result is not None
assert result.sender is None
assert result.message == "Just a plain message"
def test_decrypt_with_wrong_key_fails(self):
"""Decryption with wrong key fails MAC verification."""
correct_key = hashlib.sha256(b"#correct").digest()[:16]
wrong_key = hashlib.sha256(b"#wrong").digest()[:16]
payload = self._create_encrypted_payload(correct_key, 1234567890, 0, "test")
result = decrypt_group_text(payload, wrong_key)
assert result is None
def test_decrypt_corrupted_mac_fails(self):
"""Corrupted MAC causes decryption to fail."""
channel_key = hashlib.sha256(b"#test").digest()[:16]
payload = self._create_encrypted_payload(channel_key, 1234567890, 0, "test")
# Corrupt the MAC (bytes 1-2)
corrupted = payload[:1] + bytes([payload[1] ^ 0xFF, payload[2] ^ 0xFF]) + payload[3:]
result = decrypt_group_text(corrupted, channel_key)
assert result is None
class TestTryDecryptPacket:
"""Test the full packet decryption pipeline."""
def test_only_group_text_packets_decrypted(self):
"""Non-GROUP_TEXT packets return None."""
# TEXT_MESSAGE packet (payload_type=2)
# Header: route_type=FLOOD(1), payload_type=TEXT_MESSAGE(2)
# Header byte = (0 << 6) | (2 << 2) | 1 = 0x09
packet = bytes([0x09, 0x00]) + b"some_data"
key = bytes(16)
result = try_decrypt_packet_with_channel_key(packet, key)
assert result is None
def test_channel_hash_mismatch_returns_none(self):
"""Packets with non-matching channel hash return None early."""
# GROUP_TEXT packet with channel_hash that doesn't match our key
# Header: route_type=FLOOD(1), payload_type=GROUP_TEXT(5)
# Header byte = 0x15
wrong_hash = bytes([0xFF]) # Unlikely to match any real key
packet = bytes([0x15, 0x00]) + wrong_hash + bytes(20)
key = hashlib.sha256(b"#test").digest()[:16]
result = try_decrypt_packet_with_channel_key(packet, key)
assert result is None
class TestRealWorldPackets:
"""Test with real captured packets to ensure decoder matches protocol."""
def test_decrypt_six77_channel_message(self):
"""Decrypt a real packet from #six77 channel."""
# Real packet captured from #six77 hashtag channel
packet_hex = (
"1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D"
"99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D35182"
"83156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF"
"0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D"
"07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818"
)
packet = bytes.fromhex(packet_hex)
# Verify key derivation: SHA256("#six77")[:16]
channel_key = hashlib.sha256(b"#six77").digest()[:16]
assert channel_key.hex() == "7aba109edcf304a84433cb71d0f3ab73"
# Decrypt the packet
result = try_decrypt_packet_with_channel_key(packet, channel_key)
assert result is not None
assert result.sender == "Flightless🥝"
assert "hashtag room is essentially public" in result.message
assert result.channel_hash == "e6"
assert result.timestamp == 1766604717
class TestAdvertisementParsing:
"""Test parsing of advertisement packets."""
def test_parse_repeater_advertisement_with_gps(self):
"""Parse a repeater advertisement with GPS coordinates."""
from app.decoder import try_parse_advertisement
# Repeater packet with lat/lon of 49.02056 / -123.82935
# Flags 0x92: Role=Repeater (2), Location=Yes, Name=Yes
packet_hex = (
"1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63"
"AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED"
"965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BA"
"F9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB"
)
packet = bytes.fromhex(packet_hex)
result = try_parse_advertisement(packet)
assert result is not None
assert (
result.public_key == "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab"
)
assert result.name == "Can O Mesh 2 🥫"
assert result.device_role == 2 # Repeater
assert result.timestamp > 0 # Has valid timestamp
assert result.lat is not None
assert result.lon is not None
assert abs(result.lat - 49.02056) < 0.000001
assert abs(result.lon - (-123.82935)) < 0.000001
def test_parse_chat_node_advertisement_with_gps(self):
"""Parse a chat node advertisement with GPS coordinates."""
from app.decoder import try_parse_advertisement
# Chat node packet with lat/lon of 47.786038 / -122.344096
# Flags 0x91: Role=Chat (1), Location=Yes, Name=Yes
packet_hex = (
"1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83"
"2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D"
"F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D"
"913628D902602DB5F8466C696768746C657373F09FA59D"
)
packet = bytes.fromhex(packet_hex)
result = try_parse_advertisement(packet)
assert result is not None
assert (
result.public_key == "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83"
)
assert result.name == "Flightless🥝"
assert result.device_role == 1 # Chat node
assert result.timestamp > 0 # Has valid timestamp
assert result.lat is not None
assert result.lon is not None
assert abs(result.lat - 47.786038) < 0.000001
assert abs(result.lon - (-122.344096)) < 0.000001
def test_parse_advertisement_without_gps(self):
"""Parse an advertisement without GPS coordinates."""
from app.decoder import try_parse_advertisement
# Chat node packet without location
# Flags 0x81: Role=Chat (1), Location=No, Name=Yes
packet_hex = (
"1104D7F9E07A2E38C81F7DC0C1CEDDED6B415B4367CF48F578C5A092CED3490FF0C7"
"6EFDF1F5A4BD6669D3D143CFF384D8B3BD950CDCA31C98B7DA789D004D04DED31E16"
"B998E1AE352B283EAC8ABCF1F07214EC3BBF7AF3EB8EBF15C00417F2425A259E7CE6"
"A875BA0D814D656E6E697344"
)
packet = bytes.fromhex(packet_hex)
result = try_parse_advertisement(packet)
assert result is not None
assert (
result.public_key == "2e38c81f7dc0c1cedded6b415b4367cf48f578c5a092ced3490ff0c76efdf1f5"
)
assert result.name == "MennisD"
assert result.device_role == 1 # Chat node
assert result.timestamp > 0 # Has valid timestamp
assert result.lat is None
assert result.lon is None
def test_parse_advertisement_extracts_public_key(self):
"""Advertisement parsing extracts the public key correctly."""
from app.decoder import PayloadType, parse_packet
packet_hex = (
"1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A83"
"2DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1D"
"F3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D"
"913628D902602DB5F8466C696768746C657373F09FA59D"
)
packet = bytes.fromhex(packet_hex)
# Verify packet is recognized as ADVERT type
info = parse_packet(packet)
assert info is not None
assert info.payload_type == PayloadType.ADVERT
def test_non_advertisement_returns_none(self):
"""Non-advertisement packets return None from try_parse_advertisement."""
from app.decoder import try_parse_advertisement
# GROUP_TEXT packet, not an advertisement
packet = bytes([0x15, 0x00]) + bytes(50)
result = try_parse_advertisement(packet)
assert result is None