Add historical DM decryption

This commit is contained in:
Jack Kingsman
2026-01-18 21:22:22 -08:00
parent 30e3eb47be
commit 42572aa234
22 changed files with 1761 additions and 53 deletions
+371
View File
@@ -10,11 +10,17 @@ import hmac
from Crypto.Cipher import AES
from app.decoder import (
DecryptedDirectMessage,
PayloadType,
RouteType,
_clamp_scalar,
calculate_channel_hash,
decrypt_direct_message,
decrypt_group_text,
derive_public_key,
derive_shared_secret,
parse_packet,
try_decrypt_dm,
try_decrypt_packet_with_channel_key,
)
@@ -355,3 +361,368 @@ class TestAdvertisementParsing:
result = try_parse_advertisement(packet)
assert result is None
class TestScalarClamping:
"""Test X25519 scalar clamping for ECDH."""
def test_clamp_scalar_modifies_first_byte(self):
"""Clamping clears the lower 3 bits of the first byte."""
# Input with all bits set in first byte
scalar = bytes([0xFF]) + bytes(31)
result = _clamp_scalar(scalar)
# First byte should have lower 3 bits cleared: 0xFF & 248 = 0xF8
assert result[0] == 0xF8
def test_clamp_scalar_modifies_last_byte(self):
"""Clamping modifies the last byte for correct group operations."""
# Input with all bits set in last byte
scalar = bytes(31) + bytes([0xFF])
result = _clamp_scalar(scalar)
# Last byte: (0xFF & 63) | 64 = 0x7F
assert result[31] == 0x7F
def test_clamp_scalar_preserves_middle_bytes(self):
"""Clamping preserves the middle bytes unchanged."""
# Known middle bytes
scalar = bytes([0xAB]) + bytes([0x12, 0x34, 0x56] * 10)[:30] + bytes([0xCD])
result = _clamp_scalar(scalar)
# Middle bytes should be unchanged
assert result[1:31] == scalar[1:31]
def test_clamp_scalar_truncates_to_32_bytes(self):
"""Clamping uses only first 32 bytes of input."""
# 64-byte input (typical Ed25519 private key)
scalar = bytes(64)
result = _clamp_scalar(scalar)
assert len(result) == 32
class TestPublicKeyDerivation:
"""Test deriving Ed25519 public key from MeshCore private key."""
# Test data from real MeshCore keys
# The private key's first 32 bytes are the scalar (post-SHA-512 clamped)
# The public key is derived via scalar × basepoint, NOT from the last 32 bytes
#
# IMPORTANT: The last 32 bytes of a MeshCore private key are the signing prefix,
# NOT the public key! Standard Ed25519 libraries will give wrong results because
# they expect a seed, not a raw scalar.
FACE12_PRIV = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
# Expected public key derived from scalar × basepoint
# Note: This starts with "face12" - the derived public key, NOT the signing prefix
FACE12_PUB_EXPECTED = bytes.fromhex(
"FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
)
def test_derive_public_key_from_meshcore_private(self):
"""derive_public_key correctly derives public key from MeshCore private key."""
result = derive_public_key(self.FACE12_PRIV)
assert len(result) == 32
assert result == self.FACE12_PUB_EXPECTED
def test_derive_public_key_from_scalar_only(self):
"""derive_public_key works with just the 32-byte scalar."""
scalar_only = self.FACE12_PRIV[:32]
result = derive_public_key(scalar_only)
assert len(result) == 32
assert result == self.FACE12_PUB_EXPECTED
def test_derive_public_key_deterministic(self):
"""Same private key always produces same public key."""
result1 = derive_public_key(self.FACE12_PRIV)
result2 = derive_public_key(self.FACE12_PRIV)
assert result1 == result2
class TestSharedSecretDerivation:
"""Test ECDH shared secret derivation from Ed25519 keys."""
# Test data from real MeshCore keys
# The private key's first 32 bytes are the scalar (post-SHA-512 clamped)
# The last 32 bytes are the signing prefix (NOT the public key, though they may match)
FACE12_PRIV = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
# a1b2c3 public key (32 bytes)
A1B2C3_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7")
def test_derive_shared_secret_returns_32_bytes(self):
"""Shared secret derivation returns 32-byte value."""
result = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
assert len(result) == 32
def test_derive_shared_secret_deterministic(self):
"""Same inputs always produce same shared secret."""
result1 = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
result2 = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
assert result1 == result2
def test_derive_shared_secret_different_keys_different_result(self):
"""Different key pairs produce different shared secrets."""
other_pub = bytes.fromhex(
"1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
)
result1 = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
# This may raise an exception for invalid public key, which is also acceptable
try:
result2 = derive_shared_secret(self.FACE12_PRIV, other_pub)
assert result1 != result2
except Exception:
# Invalid public keys may fail, which is fine
pass
class TestDirectMessageDecryption:
"""Test TEXT_MESSAGE (direct message) payload decryption."""
# Real test vector from user
# Payload: [dest_hash:1][src_hash:1][mac:2][ciphertext]
PAYLOAD = bytes.fromhex(
"FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B"
)
# Keys for deriving shared secret
FACE12_PRIV = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
A1B2C3_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7")
EXPECTED_MESSAGE = "Hello there, Mr. Face!"
def test_decrypt_real_dm_payload(self):
"""Decrypt a real DM payload with known shared secret."""
shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
result = decrypt_direct_message(self.PAYLOAD, shared_secret)
assert result is not None
assert result.message == self.EXPECTED_MESSAGE
assert result.dest_hash == "fa" # First byte of payload
assert result.src_hash == "a1" # Second byte, matches a1b2c3
def test_decrypt_extracts_timestamp(self):
"""Decrypted message contains valid timestamp."""
shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
result = decrypt_direct_message(self.PAYLOAD, shared_secret)
assert result is not None
assert result.timestamp > 0 # Non-zero timestamp
assert result.timestamp < 2**32 # Within uint32 range
def test_decrypt_extracts_flags(self):
"""Decrypted message contains flags byte."""
shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
result = decrypt_direct_message(self.PAYLOAD, shared_secret)
assert result is not None
assert isinstance(result.flags, int)
assert 0 <= result.flags <= 255
def test_decrypt_with_wrong_secret_fails(self):
"""Decryption with incorrect shared secret fails MAC verification."""
wrong_secret = bytes(32) # All zeros
result = decrypt_direct_message(self.PAYLOAD, wrong_secret)
assert result is None
def test_decrypt_with_corrupted_mac_fails(self):
"""Corrupted MAC causes decryption to fail."""
shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
# Corrupt the MAC (bytes 2-3)
corrupted = self.PAYLOAD[:2] + bytes([0xFF, 0xFF]) + self.PAYLOAD[4:]
result = decrypt_direct_message(corrupted, shared_secret)
assert result is None
def test_decrypt_too_short_payload_returns_none(self):
"""Payloads shorter than minimum (4 bytes) return None."""
shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
result = decrypt_direct_message(bytes(3), shared_secret)
assert result is None
def test_decrypt_invalid_ciphertext_length_returns_none(self):
"""Ciphertext not a multiple of 16 bytes returns None."""
shared_secret = derive_shared_secret(self.FACE12_PRIV, self.A1B2C3_PUB)
# 4-byte header + 15-byte ciphertext (not multiple of 16)
invalid_payload = bytes(4 + 15)
result = decrypt_direct_message(invalid_payload, shared_secret)
assert result is None
class TestTryDecryptDM:
"""Test full packet decryption for direct messages."""
# Full packet: header + path_length + payload
# Header byte = 0x09: route_type=FLOOD(1), payload_type=TEXT_MESSAGE(2)
# Header byte = (0 << 6) | (2 << 2) | 1 = 0x09
# Path length = 0
FULL_PACKET = bytes.fromhex(
"0900FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B"
)
# Keys
FACE12_PRIV = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
# FACE12 public key - derived via scalar × basepoint, NOT the last 32 bytes!
# The last 32 bytes (77AC...) are the signing prefix, not the public key.
FACE12_PUB = bytes.fromhex("FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46")
A1B2C3_PUB = bytes.fromhex("a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7")
EXPECTED_MESSAGE = "Hello there, Mr. Face!"
def test_try_decrypt_dm_full_packet(self):
"""Decrypt a full TEXT_MESSAGE packet."""
result = try_decrypt_dm(
self.FULL_PACKET,
self.FACE12_PRIV,
self.A1B2C3_PUB,
our_public_key=self.FACE12_PUB,
)
assert result is not None
assert result.message == self.EXPECTED_MESSAGE
def test_try_decrypt_dm_inbound_message(self):
"""Decrypt an inbound message (from them to us)."""
# src_hash = a1 matches A1B2C3's first byte
result = try_decrypt_dm(
self.FULL_PACKET,
self.FACE12_PRIV,
self.A1B2C3_PUB,
our_public_key=None, # Without our pubkey, only checks inbound
)
assert result is not None
assert result.src_hash == "a1"
def test_try_decrypt_dm_non_text_message_returns_none(self):
"""Non-TEXT_MESSAGE packets return None."""
# GROUP_TEXT packet (payload_type=5)
# Header byte = (0 << 6) | (5 << 2) | 1 = 0x15
group_text_packet = bytes([0x15, 0x00]) + self.FULL_PACKET[2:]
result = try_decrypt_dm(
group_text_packet,
self.FACE12_PRIV,
self.A1B2C3_PUB,
)
assert result is None
def test_try_decrypt_dm_wrong_src_hash_returns_none(self):
"""Packets from unknown senders return None."""
# Create a packet with different src_hash
# Original: FA A1 ... -> dest=FA, src=A1
# Modified: FA BB ... -> dest=FA, src=BB (doesn't match A1B2C3)
modified_payload = bytes([0xFA, 0xBB]) + self.FULL_PACKET[4:]
modified_packet = self.FULL_PACKET[:2] + modified_payload
result = try_decrypt_dm(
modified_packet,
self.FACE12_PRIV,
self.A1B2C3_PUB,
our_public_key=self.FACE12_PUB,
)
assert result is None
def test_try_decrypt_dm_empty_packet_returns_none(self):
"""Empty packets return None."""
result = try_decrypt_dm(
b"",
self.FACE12_PRIV,
self.A1B2C3_PUB,
)
assert result is None
def test_try_decrypt_dm_truncated_packet_returns_none(self):
"""Truncated packets return None."""
result = try_decrypt_dm(
self.FULL_PACKET[:5], # Only header + partial payload
self.FACE12_PRIV,
self.A1B2C3_PUB,
)
assert result is None
class TestRealWorldDMPacket:
"""End-to-end test with exact real-world test data."""
def test_full_dm_decryption_flow(self):
"""
Complete decryption flow with real test vectors.
Test data from user:
- face12 private key (64 bytes Ed25519)
- a1b2c3 public key (32 bytes)
- Encrypted payload producing "Hello there, Mr. Face!"
"""
# Keys
face12_priv = bytes.fromhex(
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
)
# Derived public key (scalar × basepoint) - NOT the signing prefix from bytes 32-64
# First byte is 0xFA, matching dest_hash in test packet
face12_pub = bytes.fromhex(
"FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
)
a1b2c3_pub = bytes.fromhex(
"a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
)
# Full packet with header
full_packet = bytes.fromhex(
"0900FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B"
)
# Decrypt
result = try_decrypt_dm(
full_packet,
face12_priv,
a1b2c3_pub,
our_public_key=face12_pub,
)
# Verify
assert result is not None
assert isinstance(result, DecryptedDirectMessage)
assert result.message == "Hello there, Mr. Face!"
assert result.dest_hash == "fa" # First byte of derived face12 pubkey (0xFA)
assert result.src_hash == "a1" # First byte of a1b2c3 pubkey