mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
573 lines
22 KiB
Python
573 lines
22 KiB
Python
"""Tests using real MeshCore packet data and cryptographic keys.
|
|
|
|
These tests verify the decryption pipeline end-to-end with actual radio packets
|
|
captured from the mesh network. No crypto functions are mocked.
|
|
|
|
Test data:
|
|
- Client 1 ("a1b2c3d3"): sender of the DM
|
|
- Client 2 ("face1233"): receiver of the DM
|
|
- Channel: #six77 (hashtag room, key derived from SHA-256 of name)
|
|
"""
|
|
|
|
from hashlib import sha256
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from app.decoder import (
|
|
DecryptedDirectMessage,
|
|
PayloadType,
|
|
RouteType,
|
|
decrypt_direct_message,
|
|
derive_public_key,
|
|
derive_shared_secret,
|
|
parse_packet,
|
|
try_decrypt_dm,
|
|
try_decrypt_packet_with_channel_key,
|
|
)
|
|
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Real test data captured from a MeshCore mesh network
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Client 1 (sender of the DM)
|
|
CLIENT1_PUBLIC_HEX = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
|
|
CLIENT1_PRIVATE_HEX = (
|
|
"1808C3512F063796E492B9FA101A7A6239F14E71F8D1D5AD086E8E228ED0A076"
|
|
"D5ED26C82C6E64ABF1954336E42CF68E4AB288A4D38E40ED0F5870FED95C1DEB"
|
|
)
|
|
CLIENT1_PUBLIC = bytes.fromhex(CLIENT1_PUBLIC_HEX)
|
|
CLIENT1_PRIVATE = bytes.fromhex(CLIENT1_PRIVATE_HEX)
|
|
|
|
# Client 2 (receiver of the DM)
|
|
CLIENT2_PUBLIC_HEX = "face123334789e2b81519afdbc39a3c9eb7ea3457ad367d3243597a484847e46"
|
|
CLIENT2_PRIVATE_HEX = (
|
|
"58BA1940E97099CBB4357C62CE9C7F4B245C94C90D722E67201B989F9FEACF7B"
|
|
"77ACADDB84438514022BDB0FC3140C2501859BE1772AC7B8C7E41DC0F40490A1"
|
|
)
|
|
CLIENT2_PUBLIC = bytes.fromhex(CLIENT2_PUBLIC_HEX)
|
|
CLIENT2_PRIVATE = bytes.fromhex(CLIENT2_PRIVATE_HEX)
|
|
|
|
# DM packet: client 1 -> client 2
|
|
DM_PACKET_HEX = "0900FAA1295471ADB44A98B13CA528A4B5C4FBC29B4DA3CED477519B2FBD8FD5467C31E5D58B"
|
|
DM_PACKET = bytes.fromhex(DM_PACKET_HEX)
|
|
DM_PLAINTEXT = "Hello there, Mr. Face!"
|
|
|
|
# Channel message in #six77
|
|
CHANNEL_PACKET_HEX = (
|
|
"1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D"
|
|
"99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D35182"
|
|
"83156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF"
|
|
"0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D"
|
|
"07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818"
|
|
)
|
|
CHANNEL_PACKET = bytes.fromhex(CHANNEL_PACKET_HEX)
|
|
CHANNEL_NAME = "#six77"
|
|
CHANNEL_KEY = sha256(CHANNEL_NAME.encode("utf-8")).digest()[:16]
|
|
CHANNEL_PLAINTEXT_FULL = (
|
|
"Flightless🥝: hello there; this hashtag room is essentially public. "
|
|
"MeshCore has great crypto; use private rooms or DMs for private comms instead!"
|
|
)
|
|
CHANNEL_SENDER = "Flightless🥝"
|
|
CHANNEL_MESSAGE_BODY = (
|
|
"hello there; this hashtag room is essentially public. "
|
|
"MeshCore has great crypto; use private rooms or DMs for private comms instead!"
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Direct Message Decryption
|
|
# ============================================================================
|
|
|
|
|
|
class TestDMDecryption:
|
|
"""Test DM decryption using real captured packet data."""
|
|
|
|
def test_derive_public_key_from_private(self):
|
|
"""derive_public_key reproduces known public keys from private keys."""
|
|
assert derive_public_key(CLIENT1_PRIVATE) == CLIENT1_PUBLIC
|
|
assert derive_public_key(CLIENT2_PRIVATE) == CLIENT2_PUBLIC
|
|
|
|
def test_shared_secret_is_symmetric(self):
|
|
"""Both parties derive the same ECDH shared secret."""
|
|
secret_1to2 = derive_shared_secret(CLIENT1_PRIVATE, CLIENT2_PUBLIC)
|
|
secret_2to1 = derive_shared_secret(CLIENT2_PRIVATE, CLIENT1_PUBLIC)
|
|
assert secret_1to2 == secret_2to1
|
|
|
|
def test_parse_dm_packet_header(self):
|
|
"""Raw DM packet parses to the expected header fields."""
|
|
info = parse_packet(DM_PACKET)
|
|
assert info is not None
|
|
assert info.route_type == RouteType.FLOOD
|
|
assert info.payload_type == PayloadType.TEXT_MESSAGE
|
|
assert info.path_length == 0
|
|
|
|
def test_decrypt_dm_as_receiver(self):
|
|
"""Receiver (face1233) decrypts the DM with correct plaintext."""
|
|
result = try_decrypt_dm(
|
|
DM_PACKET,
|
|
our_private_key=CLIENT2_PRIVATE,
|
|
their_public_key=CLIENT1_PUBLIC,
|
|
our_public_key=CLIENT2_PUBLIC,
|
|
)
|
|
assert result is not None
|
|
assert isinstance(result, DecryptedDirectMessage)
|
|
assert result.message == DM_PLAINTEXT
|
|
|
|
def test_decrypt_dm_as_sender(self):
|
|
"""Sender (a1b2c3d3) decrypts the DM too (outgoing echo scenario)."""
|
|
result = try_decrypt_dm(
|
|
DM_PACKET,
|
|
our_private_key=CLIENT1_PRIVATE,
|
|
their_public_key=CLIENT2_PUBLIC,
|
|
our_public_key=CLIENT1_PUBLIC,
|
|
)
|
|
assert result is not None
|
|
assert result.message == DM_PLAINTEXT
|
|
|
|
def test_direction_hashes_match_key_prefixes(self):
|
|
"""dest_hash and src_hash correspond to first bytes of public keys."""
|
|
result = try_decrypt_dm(
|
|
DM_PACKET,
|
|
our_private_key=CLIENT2_PRIVATE,
|
|
their_public_key=CLIENT1_PUBLIC,
|
|
our_public_key=CLIENT2_PUBLIC,
|
|
)
|
|
assert result is not None
|
|
# Packet was sent FROM client1 TO client2
|
|
assert result.src_hash == format(CLIENT1_PUBLIC[0], "02x") # a1
|
|
assert result.dest_hash == format(CLIENT2_PUBLIC[0], "02x") # fa
|
|
|
|
def test_wrong_key_fails_mac(self):
|
|
"""Decryption with an unrelated key fails (MAC mismatch)."""
|
|
wrong_private = b"\x01" * 64
|
|
result = try_decrypt_dm(
|
|
DM_PACKET,
|
|
our_private_key=wrong_private,
|
|
their_public_key=CLIENT1_PUBLIC,
|
|
)
|
|
assert result is None
|
|
|
|
def test_decrypt_dm_payload_directly(self):
|
|
"""decrypt_direct_message works with just the payload and shared secret."""
|
|
info = parse_packet(DM_PACKET)
|
|
assert info is not None
|
|
|
|
shared = derive_shared_secret(CLIENT2_PRIVATE, CLIENT1_PUBLIC)
|
|
result = decrypt_direct_message(info.payload, shared)
|
|
assert result is not None
|
|
assert result.message == DM_PLAINTEXT
|
|
assert result.timestamp > 0
|
|
|
|
|
|
# ============================================================================
|
|
# Channel Message Decryption
|
|
# ============================================================================
|
|
|
|
|
|
class TestChannelDecryption:
|
|
"""Test channel message decryption using real captured packet data."""
|
|
|
|
def test_parse_channel_packet_header(self):
|
|
"""Raw channel packet parses to GROUP_TEXT."""
|
|
info = parse_packet(CHANNEL_PACKET)
|
|
assert info is not None
|
|
assert info.payload_type == PayloadType.GROUP_TEXT
|
|
|
|
def test_decrypt_channel_message(self):
|
|
"""Channel message decrypts to expected sender and body."""
|
|
result = try_decrypt_packet_with_channel_key(CHANNEL_PACKET, CHANNEL_KEY)
|
|
assert result is not None
|
|
assert result.sender == CHANNEL_SENDER
|
|
assert result.message == CHANNEL_MESSAGE_BODY
|
|
|
|
def test_full_text_reconstructed(self):
|
|
"""Reconstructed 'sender: message' matches the original plaintext."""
|
|
result = try_decrypt_packet_with_channel_key(CHANNEL_PACKET, CHANNEL_KEY)
|
|
assert result is not None
|
|
full = f"{result.sender}: {result.message}"
|
|
assert full == CHANNEL_PLAINTEXT_FULL
|
|
|
|
def test_channel_hash_matches_packet(self):
|
|
"""Channel hash in packet matches hash computed from key."""
|
|
from app.decoder import calculate_channel_hash
|
|
|
|
info = parse_packet(CHANNEL_PACKET)
|
|
assert info is not None
|
|
packet_hash = format(info.payload[0], "02x")
|
|
expected_hash = calculate_channel_hash(CHANNEL_KEY)
|
|
assert packet_hash == expected_hash
|
|
|
|
def test_wrong_channel_key_fails(self):
|
|
"""Decryption with a different channel key returns None."""
|
|
wrong_key = b"\x00" * 16
|
|
result = try_decrypt_packet_with_channel_key(CHANNEL_PACKET, wrong_key)
|
|
assert result is None
|
|
|
|
def test_hashtag_key_derivation(self):
|
|
"""Hashtag channel key is SHA-256(name)[:16], matching radio firmware."""
|
|
key = sha256(b"#six77").digest()[:16]
|
|
assert len(key) == 16
|
|
# Key should decrypt our packet
|
|
result = try_decrypt_packet_with_channel_key(CHANNEL_PACKET, key)
|
|
assert result is not None
|
|
|
|
|
|
# ============================================================================
|
|
# Historical DM Decryption Pipeline (Integration)
|
|
# ============================================================================
|
|
|
|
|
|
class TestHistoricalDMDecryptionPipeline:
|
|
"""Integration test: store a real DM packet, run historical decryption,
|
|
verify correct message and direction end up in the DB."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_historical_decrypt_stores_incoming_dm(self, test_db, captured_broadcasts):
|
|
"""run_historical_dm_decryption decrypts a real packet and stores it
|
|
with the correct direction (incoming from client1 to client2)."""
|
|
from app.packet_processor import run_historical_dm_decryption
|
|
|
|
# Store the undecrypted raw packet (message_id=NULL means undecrypted)
|
|
pkt_id, _ = await RawPacketRepository.create(DM_PACKET, 1700000000)
|
|
|
|
# Add client1 as a known contact
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT1_PUBLIC_HEX,
|
|
"name": "Client1",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
# Decrypt as client2 (the receiver)
|
|
await run_historical_dm_decryption(
|
|
private_key_bytes=CLIENT2_PRIVATE,
|
|
contact_public_key_bytes=CLIENT1_PUBLIC,
|
|
contact_public_key_hex=CLIENT1_PUBLIC_HEX,
|
|
display_name="Client1",
|
|
)
|
|
|
|
# Verify the message was stored
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=CLIENT1_PUBLIC_HEX.lower(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
|
|
msg = messages[0]
|
|
assert msg.text == DM_PLAINTEXT
|
|
assert msg.outgoing is False # We are client2, message is FROM client1
|
|
assert msg.type == "PRIV"
|
|
|
|
# Verify a message broadcast was sent
|
|
msg_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
|
assert len(msg_broadcasts) == 1
|
|
assert msg_broadcasts[0]["data"]["text"] == DM_PLAINTEXT
|
|
assert msg_broadcasts[0]["data"]["outgoing"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_historical_decrypt_skips_outgoing_by_design(self, test_db, captured_broadcasts):
|
|
"""Historical decryption skips outgoing DMs (they're stored by the send endpoint).
|
|
|
|
run_historical_dm_decryption passes our_public_key=None, which disables
|
|
the outbound hash check. When our first byte differs from the contact's
|
|
(255/256 cases), outgoing packets fail the inbound src_hash check and
|
|
are skipped — this is correct behavior.
|
|
"""
|
|
from app.packet_processor import run_historical_dm_decryption
|
|
|
|
await RawPacketRepository.create(DM_PACKET, 1700000000)
|
|
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT2_PUBLIC_HEX,
|
|
"name": "Client2",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
# Decrypt as client1 (the sender) — first bytes differ (a1 != fa)
|
|
# so historical decryption correctly skips this outgoing packet
|
|
await run_historical_dm_decryption(
|
|
private_key_bytes=CLIENT1_PRIVATE,
|
|
contact_public_key_bytes=CLIENT2_PUBLIC,
|
|
contact_public_key_hex=CLIENT2_PUBLIC_HEX,
|
|
display_name="Client2",
|
|
)
|
|
|
|
# No messages stored — outgoing DMs are handled by the send endpoint
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=CLIENT2_PUBLIC_HEX.lower(), limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_historical_decrypt_broadcasts_success(self, test_db, captured_broadcasts):
|
|
"""Successful decryption broadcasts a success notification."""
|
|
from app.packet_processor import run_historical_dm_decryption
|
|
|
|
await RawPacketRepository.create(DM_PACKET, 1700000000)
|
|
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT1_PUBLIC_HEX,
|
|
"name": "Client1",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
from unittest.mock import MagicMock
|
|
|
|
mock_success = MagicMock()
|
|
|
|
with (
|
|
patch("app.packet_processor.broadcast_event", mock_broadcast),
|
|
patch("app.websocket.broadcast_success", mock_success),
|
|
):
|
|
await run_historical_dm_decryption(
|
|
private_key_bytes=CLIENT2_PRIVATE,
|
|
contact_public_key_bytes=CLIENT1_PUBLIC,
|
|
contact_public_key_hex=CLIENT1_PUBLIC_HEX,
|
|
display_name="Client1",
|
|
)
|
|
|
|
mock_success.assert_called_once()
|
|
args = mock_success.call_args.args
|
|
assert "Client1" in args[0]
|
|
assert "1 message" in args[1]
|
|
|
|
|
|
class TestLiveDMDecryptionPipeline:
|
|
"""Integration test: process a real DM packet through the live
|
|
process_raw_packet → _process_direct_message → try_decrypt_dm pipeline
|
|
with no crypto mocking."""
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_keystore(self):
|
|
"""Reset the global keystore state between tests."""
|
|
import app.keystore as ks
|
|
|
|
orig_priv, orig_pub = ks._private_key, ks._public_key
|
|
ks._private_key = None
|
|
ks._public_key = None
|
|
yield
|
|
ks._private_key, ks._public_key = orig_priv, orig_pub
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_dm_packet_end_to_end(self, test_db, captured_broadcasts):
|
|
"""process_raw_packet decrypts a real DM packet and stores the message
|
|
with correct text, direction, and contact attribution."""
|
|
from app.keystore import set_private_key
|
|
|
|
# Set up: client2 is "us" (receiver), client1 is the sender
|
|
set_private_key(CLIENT2_PRIVATE)
|
|
|
|
# Register client1 as a known contact
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT1_PUBLIC_HEX,
|
|
"name": "Client1",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
from app.packet_processor import process_raw_packet
|
|
|
|
result = await process_raw_packet(raw_bytes=DM_PACKET)
|
|
|
|
# Verify process_raw_packet reports successful decryption
|
|
assert result is not None
|
|
assert result["decrypted"] is True
|
|
assert result["contact_name"] == "Client1"
|
|
assert result["message_id"] is not None
|
|
|
|
# Verify message stored in DB with correct fields
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=CLIENT1_PUBLIC_HEX.lower(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
msg = messages[0]
|
|
assert msg.text == DM_PLAINTEXT
|
|
assert msg.outgoing is False # We are client2, message is FROM client1
|
|
assert msg.type == "PRIV"
|
|
|
|
# Verify a "message" broadcast was sent
|
|
msg_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
|
assert len(msg_broadcasts) == 1
|
|
assert msg_broadcasts[0]["data"]["text"] == DM_PLAINTEXT
|
|
assert msg_broadcasts[0]["data"]["outgoing"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dm_from_unknown_contact_not_decrypted(self, test_db, captured_broadcasts):
|
|
"""DM from an unknown contact (not in DB) is stored but not decrypted."""
|
|
from app.keystore import set_private_key
|
|
|
|
set_private_key(CLIENT2_PRIVATE)
|
|
# No contacts registered — client1 is unknown
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
from app.packet_processor import process_raw_packet
|
|
|
|
result = await process_raw_packet(raw_bytes=DM_PACKET)
|
|
|
|
# Raw packet is stored but not decrypted
|
|
assert result is not None
|
|
assert result["decrypted"] is False
|
|
|
|
# No messages created (can't decrypt without knowing the contact)
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=CLIENT1_PUBLIC_HEX.lower(), limit=10
|
|
)
|
|
assert len(messages) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dm_without_private_key_not_decrypted(self, test_db, captured_broadcasts):
|
|
"""Without a private key in the keystore, DMs are stored but not decrypted."""
|
|
# Don't call set_private_key — keystore is empty
|
|
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT1_PUBLIC_HEX,
|
|
"name": "Client1",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
from app.packet_processor import process_raw_packet
|
|
|
|
result = await process_raw_packet(raw_bytes=DM_PACKET)
|
|
|
|
assert result is not None
|
|
assert result["decrypted"] is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dm_duplicate_packet_deduplicates(self, test_db, captured_broadcasts):
|
|
"""Processing the same DM packet twice doesn't create duplicate messages."""
|
|
from app.keystore import set_private_key
|
|
|
|
set_private_key(CLIENT2_PRIVATE)
|
|
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT1_PUBLIC_HEX,
|
|
"name": "Client1",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
from app.packet_processor import process_raw_packet
|
|
|
|
result1 = await process_raw_packet(raw_bytes=DM_PACKET)
|
|
await process_raw_packet(raw_bytes=DM_PACKET)
|
|
|
|
# First processing succeeds
|
|
assert result1["decrypted"] is True
|
|
assert result1["message_id"] is not None
|
|
|
|
# Only one message stored (dedup via unique constraint)
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=CLIENT1_PUBLIC_HEX.lower(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_historical_then_live_deduplicates(self, test_db, captured_broadcasts):
|
|
"""A DM decrypted historically and then received live doesn't duplicate."""
|
|
from app.keystore import set_private_key
|
|
from app.packet_processor import process_raw_packet, run_historical_dm_decryption
|
|
|
|
set_private_key(CLIENT2_PRIVATE)
|
|
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": CLIENT1_PUBLIC_HEX,
|
|
"name": "Client1",
|
|
"type": 1,
|
|
}
|
|
)
|
|
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
# First: store packet undecrypted, then run historical decryption
|
|
await RawPacketRepository.create(DM_PACKET, 1700000000)
|
|
|
|
with patch("app.websocket.broadcast_success"):
|
|
await run_historical_dm_decryption(
|
|
private_key_bytes=CLIENT2_PRIVATE,
|
|
contact_public_key_bytes=CLIENT1_PUBLIC,
|
|
contact_public_key_hex=CLIENT1_PUBLIC_HEX,
|
|
display_name="Client1",
|
|
)
|
|
|
|
# Then: same packet arrives again via live pipeline
|
|
await process_raw_packet(raw_bytes=DM_PACKET)
|
|
|
|
# Only one message stored despite both paths processing it
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="PRIV", conversation_key=CLIENT1_PUBLIC_HEX.lower(), limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
assert messages[0].text == DM_PLAINTEXT
|
|
|
|
|
|
class TestHistoricalChannelDecryptionPipeline:
|
|
"""Integration test: store a real channel packet, process it through
|
|
the channel message pipeline, verify correct message in DB."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_channel_packet_end_to_end(self, test_db, captured_broadcasts):
|
|
"""process_raw_packet decrypts a real channel packet and stores
|
|
the message with correct sender and text."""
|
|
from app.repository import ChannelRepository
|
|
|
|
# Register the #six77 channel
|
|
channel_key_hex = CHANNEL_KEY.hex().upper()
|
|
await ChannelRepository.upsert(key=channel_key_hex, name=CHANNEL_NAME, is_hashtag=True)
|
|
|
|
# Store the raw packet and process it
|
|
broadcasts, mock_broadcast = captured_broadcasts
|
|
|
|
with patch("app.packet_processor.broadcast_event", mock_broadcast):
|
|
from app.packet_processor import process_raw_packet
|
|
|
|
result = await process_raw_packet(raw_bytes=CHANNEL_PACKET)
|
|
|
|
# Verify it was decrypted
|
|
assert result is not None
|
|
assert result["decrypted"] is True
|
|
assert result["channel_name"] == CHANNEL_NAME
|
|
assert result["sender"] == CHANNEL_SENDER
|
|
|
|
# Verify message in DB
|
|
messages = await MessageRepository.get_all(
|
|
msg_type="CHAN", conversation_key=channel_key_hex, limit=10
|
|
)
|
|
assert len(messages) == 1
|
|
assert messages[0].text == CHANNEL_PLAINTEXT_FULL
|
|
|
|
# Verify a "message" broadcast was sent
|
|
msg_broadcasts = [b for b in broadcasts if b["type"] == "message"]
|
|
assert len(msg_broadcasts) == 1
|
|
assert msg_broadcasts[0]["data"]["text"] == CHANNEL_PLAINTEXT_FULL
|