Add E2E-ish tests

This commit is contained in:
Jack Kingsman
2026-01-13 19:48:19 -08:00
parent f368b80221
commit 32ed00fd34
6 changed files with 1045 additions and 62 deletions

View File

@@ -43,25 +43,38 @@ async def create_message_from_decrypted(
message_text: str,
timestamp: int,
received_at: int | None = None,
path_len: int | None = None,
) -> int | None:
"""Create a message record from decrypted channel packet content.
This is the shared logic for storing decrypted channel messages,
used by both real-time packet processing and historical decryption.
Args:
packet_id: ID of the raw packet being processed
channel_key: Hex string channel key
sender: Sender name (will be prefixed to message) or None
message_text: The decrypted message content
timestamp: Sender timestamp from the packet
received_at: When the packet was received (defaults to now)
path_len: Path length from packet routing (None for historical decryption)
Returns the message ID if created, None if duplicate.
"""
import time as time_module
received = received_at or int(time_module.time())
# Format the message text
# Format the message text with sender prefix if present
text = f"{sender}: {message_text}" if sender else message_text
# Normalize channel key to uppercase for consistency
channel_key_normalized = channel_key.upper()
# Try to create message - INSERT OR IGNORE handles duplicates atomically
msg_id = await MessageRepository.create(
msg_type="CHAN",
text=text,
conversation_key=channel_key.upper(),
conversation_key=channel_key_normalized,
sender_timestamp=timestamp,
received_at=received,
)
@@ -69,26 +82,32 @@ async def create_message_from_decrypted(
if msg_id is None:
# Duplicate detected - find existing message ID for packet linkage
existing_id = await MessageRepository.find_duplicate(
conversation_key=channel_key.upper(),
conversation_key=channel_key_normalized,
text=text,
sender_timestamp=timestamp,
)
logger.debug(
"Duplicate message detected for channel %s (existing id=%s)",
channel_key_normalized[:8], existing_id
)
if existing_id:
await RawPacketRepository.mark_decrypted(packet_id, existing_id)
return None
logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8])
# Mark the raw packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
# Broadcast new message to connected clients (for historical decryption visibility)
# Broadcast new message to connected clients
broadcast_event("message", {
"id": msg_id,
"type": "CHAN",
"conversation_key": channel_key.upper(),
"conversation_key": channel_key_normalized,
"text": text,
"sender_timestamp": timestamp,
"received_at": received,
"path_len": None,
"path_len": path_len,
"txt_type": 0,
"signature": None,
"outgoing": False,
@@ -262,67 +281,22 @@ async def _process_group_text(
"message_id": message_id,
}
# Format the message text
if decrypted.sender:
text = f"{decrypted.sender}: {decrypted.message}"
else:
text = decrypted.message
# Try to create message - INSERT OR IGNORE handles duplicates atomically
msg_id = await MessageRepository.create(
msg_type="CHAN",
text=text,
conversation_key=channel.key,
sender_timestamp=decrypted.timestamp,
# Use shared function to create message, handle duplicates, and broadcast
msg_id = await create_message_from_decrypted(
packet_id=packet_id,
channel_key=channel.key,
sender=decrypted.sender,
message_text=decrypted.message,
timestamp=decrypted.timestamp,
received_at=timestamp,
path_len=packet_info.path_length if packet_info else None,
)
if msg_id is None:
# Duplicate detected by database constraint (same message via different RF path)
# Find existing message ID for packet linkage
existing_id = await MessageRepository.find_duplicate(
conversation_key=channel.key,
text=text,
sender_timestamp=decrypted.timestamp,
)
logger.debug(
"Duplicate message detected for channel %s (existing id=%s)",
channel.name, existing_id
)
if existing_id:
await RawPacketRepository.mark_decrypted(packet_id, existing_id)
return {
"decrypted": True,
"channel_name": channel.name,
"sender": decrypted.sender,
"message_id": existing_id,
}
logger.info("Stored channel message %d for %s", msg_id, channel.name)
# Broadcast new message (only for genuinely new messages)
broadcast_event("message", {
"id": msg_id,
"type": "CHAN",
"conversation_key": channel.key,
"text": text,
"sender_timestamp": decrypted.timestamp,
"received_at": timestamp,
"path_len": packet_info.path_length if packet_info else None,
"txt_type": 0,
"signature": None,
"outgoing": False,
"acked": 0,
})
# Mark the raw packet as decrypted
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
return {
"decrypted": True,
"channel_name": channel.name,
"sender": decrypted.sender,
"message_id": msg_id,
"message_id": msg_id, # None if duplicate, msg_id if new
}
# Couldn't decrypt with any known key

View File

@@ -307,8 +307,10 @@ class MessageRepository:
path_len, txt_type, signature, outgoing),
)
await db.conn.commit()
# lastrowid is 0 if no row was inserted (duplicate)
return cursor.lastrowid if cursor.lastrowid else None
# rowcount is 0 if INSERT was ignored due to UNIQUE constraint violation
if cursor.rowcount == 0:
return None
return cursor.lastrowid
@staticmethod
async def get_all(

View File

@@ -0,0 +1,63 @@
{
"channel_message": {
"description": "Channel message on #six77 from Flightless🥝",
"raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818",
"channel_name": "#six77",
"channel_key_hex": "7aba109edcf304a84433cb71d0f3ab73",
"expected_ws_event": {
"type": "message",
"data": {
"type": "CHAN",
"conversation_key": "7ABA109EDCF304A84433CB71D0F3AB73",
"text": "Flightless🥝: hello there; this hashtag room is essentially public. MeshCore has great crypto; use private rooms or DMs for private comms instead!",
"sender_timestamp": 1766604717,
"outgoing": false,
"acked": 0
}
}
},
"advertisement_with_gps": {
"description": "Repeater advertisement with GPS coordinates from Can O Mesh 2",
"raw_packet_hex": "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BAF9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB",
"expected_ws_event": {
"type": "contact",
"data": {
"public_key": "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab",
"name": "Can O Mesh 2 🥫",
"type": 2,
"lat": 49.02056,
"lon": -123.82935
}
}
},
"advertisement_chat_node": {
"description": "Chat node advertisement with GPS from Flightless🥝",
"raw_packet_hex": "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A832DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1DF3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D913628D902602DB5F8466C696768746C657373F09FA59D",
"expected_ws_event": {
"type": "contact",
"data": {
"public_key": "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83",
"name": "Flightless🥝",
"type": 1,
"lat": 47.786038,
"lon": -122.344096
}
}
},
"message_acked": {
"description": "ACK event when a message is confirmed delivered",
"expected_ws_event": {
"type": "message_acked",
"data": {
"message_id": 42,
"ack_count": 1
}
}
},
"duplicate_channel_message": {
"description": "Same channel message arriving twice (via different paths) should only produce one message event",
"raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818",
"channel_name": "#six77",
"expected_behavior": "First packet creates message and broadcasts, second packet is deduplicated (no broadcast)"
}
}

View File

@@ -0,0 +1,369 @@
/**
* Integration tests for WebSocket event handling.
*
* These tests verify that WebSocket events (as produced by the backend)
* are correctly processed by the frontend state handlers.
*
* The fixtures in fixtures/websocket_events.json define the contract
* between backend and frontend - both sides test against the same data.
*/
import { describe, it, expect } from 'vitest';
import fixtures from './fixtures/websocket_events.json';
import { getMessageContentKey } from '../hooks/useConversationMessages';
import { getStateKey } from '../utils/conversationState';
import type { Message, Contact, Channel } from '../types';
/**
* Simulate the WebSocket message handler from App.tsx.
* This is the core logic we're testing.
*/
interface MockState {
messages: Message[];
contacts: Contact[];
channels: Channel[];
unreadCounts: Record<string, number>;
lastMessageTimes: Record<string, number>;
seenMessageContent: Set<string>;
}
function createMockState(): MockState {
return {
messages: [],
contacts: [],
channels: [],
unreadCounts: {},
lastMessageTimes: {},
seenMessageContent: new Set(),
};
}
/**
* Simulate handling a message WebSocket event.
* Mirrors the logic in App.tsx onMessage handler.
*/
function handleMessageEvent(
state: MockState,
msg: Message,
activeConversationKey: string | null
): { added: boolean; unreadIncremented: boolean } {
const contentKey = getMessageContentKey(msg);
let added = false;
let unreadIncremented = false;
// Check if message is for active conversation
const isForActiveConversation = activeConversationKey !== null &&
msg.conversation_key === activeConversationKey;
// Add to messages if for active conversation (with deduplication)
if (isForActiveConversation) {
if (!state.seenMessageContent.has(contentKey)) {
state.seenMessageContent.add(contentKey);
state.messages.push(msg);
added = true;
}
}
// Update last message time
const stateKey = msg.type === 'CHAN'
? getStateKey('channel', msg.conversation_key)
: getStateKey('contact', msg.conversation_key);
state.lastMessageTimes[stateKey] = msg.received_at;
// Increment unread if not for active conversation and not outgoing
if (!msg.outgoing && !isForActiveConversation) {
// Deduplicate by content
if (!state.seenMessageContent.has(contentKey)) {
state.seenMessageContent.add(contentKey);
state.unreadCounts[stateKey] = (state.unreadCounts[stateKey] || 0) + 1;
unreadIncremented = true;
}
}
return { added, unreadIncremented };
}
/**
* Simulate handling a contact WebSocket event.
*/
function handleContactEvent(state: MockState, contact: Contact): void {
const idx = state.contacts.findIndex(c => c.public_key === contact.public_key);
if (idx >= 0) {
// Update existing contact
state.contacts[idx] = { ...state.contacts[idx], ...contact };
} else {
// Add new contact
state.contacts.push(contact);
}
}
/**
* Simulate handling a message_acked WebSocket event.
*/
function handleMessageAckedEvent(
state: MockState,
messageId: number,
ackCount: number
): boolean {
const idx = state.messages.findIndex(m => m.id === messageId);
if (idx >= 0) {
state.messages[idx] = { ...state.messages[idx], acked: ackCount };
return true;
}
return false;
}
describe('Integration: Channel Message Events', () => {
const fixture = fixtures.channel_message;
it('adds message to list when conversation is active', () => {
const state = createMockState();
const msg = fixture.expected_ws_event.data as unknown as Message;
msg.id = 1;
msg.received_at = 1700000000;
const result = handleMessageEvent(state, msg, msg.conversation_key);
expect(result.added).toBe(true);
expect(state.messages).toHaveLength(1);
expect(state.messages[0].text).toContain('Flightless🥝');
});
it('increments unread count when conversation is not active', () => {
const state = createMockState();
const msg = fixture.expected_ws_event.data as unknown as Message;
msg.id = 1;
msg.received_at = 1700000000;
const result = handleMessageEvent(state, msg, 'different_conversation');
expect(result.unreadIncremented).toBe(true);
const stateKey = getStateKey('channel', msg.conversation_key);
expect(state.unreadCounts[stateKey]).toBe(1);
});
it('updates lastMessageTimes for sidebar sorting', () => {
const state = createMockState();
const msg = fixture.expected_ws_event.data as unknown as Message;
msg.id = 1;
msg.received_at = 1700000000;
handleMessageEvent(state, msg, null);
const stateKey = getStateKey('channel', msg.conversation_key);
expect(state.lastMessageTimes[stateKey]).toBe(1700000000);
});
it('does not increment unread for outgoing messages', () => {
const state = createMockState();
const msg = { ...fixture.expected_ws_event.data, outgoing: true } as unknown as Message;
msg.id = 1;
msg.received_at = 1700000000;
const result = handleMessageEvent(state, msg, 'different_conversation');
expect(result.unreadIncremented).toBe(false);
const stateKey = getStateKey('channel', msg.conversation_key);
expect(state.unreadCounts[stateKey]).toBeUndefined();
});
});
describe('Integration: Duplicate Message Handling', () => {
// Note: duplicate_channel_message fixture references the same packet data as channel_message
it('deduplicates messages by content when adding to list', () => {
const state = createMockState();
// Use channel_message fixture data since duplicate_channel_message references same packet
const msgData = fixtures.channel_message.expected_ws_event.data;
const msg1 = { ...msgData, id: 1, received_at: 1700000000 } as unknown as Message;
const msg2 = { ...msgData, id: 2, received_at: 1700000001 } as unknown as Message;
// Both arrive for active conversation
const result1 = handleMessageEvent(state, msg1, msg1.conversation_key);
const result2 = handleMessageEvent(state, msg2, msg2.conversation_key);
expect(result1.added).toBe(true);
expect(result2.added).toBe(false); // Deduplicated
expect(state.messages).toHaveLength(1);
});
it('deduplicates unread increments by content', () => {
const state = createMockState();
const msgData = fixtures.channel_message.expected_ws_event.data;
const msg1 = { ...msgData, id: 1, received_at: 1700000000 } as unknown as Message;
const msg2 = { ...msgData, id: 2, received_at: 1700000001 } as unknown as Message;
// Both arrive for non-active conversation
const result1 = handleMessageEvent(state, msg1, 'other_conversation');
const result2 = handleMessageEvent(state, msg2, 'other_conversation');
expect(result1.unreadIncremented).toBe(true);
expect(result2.unreadIncremented).toBe(false); // Deduplicated
const stateKey = getStateKey('channel', msg1.conversation_key);
expect(state.unreadCounts[stateKey]).toBe(1); // Only incremented once
});
});
describe('Integration: Contact/Advertisement Events', () => {
const fixture = fixtures.advertisement_with_gps;
it('creates new contact from advertisement', () => {
const state = createMockState();
const contact = fixture.expected_ws_event.data as unknown as Contact;
handleContactEvent(state, contact);
expect(state.contacts).toHaveLength(1);
expect(state.contacts[0].public_key).toBe(contact.public_key);
expect(state.contacts[0].name).toBe('Can O Mesh 2 🥫');
expect(state.contacts[0].type).toBe(2); // Repeater
expect(state.contacts[0].lat).toBeCloseTo(49.02056, 4);
expect(state.contacts[0].lon).toBeCloseTo(-123.82935, 4);
});
it('updates existing contact from advertisement', () => {
const state = createMockState();
// Add existing contact
state.contacts.push({
public_key: fixture.expected_ws_event.data.public_key,
name: 'Old Name',
type: 0,
on_radio: false,
last_read_at: null,
} as Contact);
// Process new advertisement
const contact = fixture.expected_ws_event.data as unknown as Contact;
handleContactEvent(state, contact);
expect(state.contacts).toHaveLength(1);
expect(state.contacts[0].name).toBe('Can O Mesh 2 🥫'); // Updated
expect(state.contacts[0].type).toBe(2); // Updated
});
it('preserves contact GPS from chat node advertisement', () => {
const state = createMockState();
const chatFixture = fixtures.advertisement_chat_node;
const contact = chatFixture.expected_ws_event.data as unknown as Contact;
handleContactEvent(state, contact);
expect(state.contacts[0].lat).toBeCloseTo(47.786038, 4);
expect(state.contacts[0].lon).toBeCloseTo(-122.344096, 4);
expect(state.contacts[0].type).toBe(1); // Chat node
});
});
describe('Integration: ACK Events', () => {
const fixture = fixtures.message_acked;
it('updates message ack count', () => {
const state = createMockState();
// Add a message that's waiting for ACK
state.messages.push({
id: 42,
type: 'PRIV',
conversation_key: 'abc123',
text: 'Hello',
sender_timestamp: 1700000000,
received_at: 1700000000,
path_len: null,
txt_type: 0,
signature: null,
outgoing: true,
acked: 0,
});
const ackData = fixture.expected_ws_event.data;
const updated = handleMessageAckedEvent(state, ackData.message_id, ackData.ack_count);
expect(updated).toBe(true);
expect(state.messages[0].acked).toBe(1);
});
it('returns false for unknown message id', () => {
const state = createMockState();
const ackData = fixture.expected_ws_event.data;
const updated = handleMessageAckedEvent(state, ackData.message_id, ackData.ack_count);
expect(updated).toBe(false);
});
it('updates to multiple ack count for flood echoes', () => {
const state = createMockState();
state.messages.push({
id: 42,
type: 'CHAN',
conversation_key: 'channel123',
text: 'Hello',
sender_timestamp: 1700000000,
received_at: 1700000000,
path_len: null,
txt_type: 0,
signature: null,
outgoing: true,
acked: 0,
});
// Multiple flood echoes
handleMessageAckedEvent(state, 42, 1);
handleMessageAckedEvent(state, 42, 2);
handleMessageAckedEvent(state, 42, 3);
expect(state.messages[0].acked).toBe(3);
});
});
describe('Integration: Message Content Key Contract', () => {
it('generates consistent keys for deduplication', () => {
const msg = fixtures.channel_message.expected_ws_event.data as unknown as Message;
msg.id = 1;
// Same content with different IDs should generate same key
const msg2 = { ...msg, id: 2 };
expect(getMessageContentKey(msg)).toBe(getMessageContentKey(msg2));
});
it('key format matches backend expectation', () => {
const msg = fixtures.channel_message.expected_ws_event.data as unknown as Message;
const key = getMessageContentKey(msg);
// Key should be: type-conversation_key-text-sender_timestamp
expect(key).toContain(msg.type);
expect(key).toContain(msg.conversation_key);
expect(key).toContain(String(msg.sender_timestamp));
});
});
describe('Integration: State Key Contract', () => {
it('generates correct channel state key', () => {
const channelKey = fixtures.channel_message.expected_ws_event.data.conversation_key;
const stateKey = getStateKey('channel', channelKey);
expect(stateKey).toBe(`channel-${channelKey}`);
});
it('generates correct contact state key with prefix', () => {
const publicKey = fixtures.advertisement_with_gps.expected_ws_event.data.public_key;
const stateKey = getStateKey('contact', publicKey);
// Contact state key uses 12-char prefix
expect(stateKey).toBe(`contact-${publicKey.substring(0, 12)}`);
});
});

63
tests/fixtures/websocket_events.json vendored Normal file
View File

@@ -0,0 +1,63 @@
{
"channel_message": {
"description": "Channel message on #six77 from Flightless🥝",
"raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818",
"channel_name": "#six77",
"channel_key_hex": "7aba109edcf304a84433cb71d0f3ab73",
"expected_ws_event": {
"type": "message",
"data": {
"type": "CHAN",
"conversation_key": "7ABA109EDCF304A84433CB71D0F3AB73",
"text": "Flightless🥝: hello there; this hashtag room is essentially public. MeshCore has great crypto; use private rooms or DMs for private comms instead!",
"sender_timestamp": 1766604717,
"outgoing": false,
"acked": 0
}
}
},
"advertisement_with_gps": {
"description": "Repeater advertisement with GPS coordinates from Can O Mesh 2",
"raw_packet_hex": "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BAF9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB",
"expected_ws_event": {
"type": "contact",
"data": {
"public_key": "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab",
"name": "Can O Mesh 2 🥫",
"type": 2,
"lat": 49.02056,
"lon": -123.82935
}
}
},
"advertisement_chat_node": {
"description": "Chat node advertisement with GPS from Flightless🥝",
"raw_packet_hex": "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A832DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1DF3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D913628D902602DB5F8466C696768746C657373F09FA59D",
"expected_ws_event": {
"type": "contact",
"data": {
"public_key": "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83",
"name": "Flightless🥝",
"type": 1,
"lat": 47.786038,
"lon": -122.344096
}
}
},
"message_acked": {
"description": "ACK event when a message is confirmed delivered",
"expected_ws_event": {
"type": "message_acked",
"data": {
"message_id": 42,
"ack_count": 1
}
}
},
"duplicate_channel_message": {
"description": "Same channel message arriving twice (via different paths) should only produce one message event",
"raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818",
"channel_name": "#six77",
"expected_behavior": "First packet creates message and broadcasts, second packet is deduplicated (no broadcast)"
}
}

View File

@@ -0,0 +1,512 @@
"""End-to-end tests for the packet processing pipeline.
These tests verify the full flow from raw packet arrival through to
WebSocket broadcast, using real packet data and a real database.
The fixtures in fixtures/websocket_events.json define the contract
between backend and frontend - both sides test against the same data.
"""
import json
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from app.database import Database
from app.repository import ChannelRepository, MessageRepository, ContactRepository, RawPacketRepository
# Load shared fixtures
FIXTURES_PATH = Path(__file__).parent / "fixtures" / "websocket_events.json"
with open(FIXTURES_PATH) as f:
FIXTURES = json.load(f)
@pytest.fixture
async def test_db():
"""Create an in-memory test database.
We need to patch the db module-level variable before any repository
methods are called, so they use our test database.
"""
import app.repository as repo_module
db = Database(":memory:")
await db.connect()
# Store original and patch the module attribute directly
original_db = repo_module.db
repo_module.db = db
try:
yield db
finally:
repo_module.db = original_db
await db.disconnect()
@pytest.fixture
def captured_broadcasts():
"""Capture WebSocket broadcasts for verification."""
broadcasts = []
def mock_broadcast(event_type: str, data: dict):
"""Synchronous mock that captures broadcasts."""
broadcasts.append({"type": event_type, "data": data})
return broadcasts, mock_broadcast
class TestChannelMessagePipeline:
"""Test channel message flow: packet → decrypt → store → broadcast."""
@pytest.mark.asyncio
async def test_channel_message_creates_message_and_broadcasts(self, test_db, captured_broadcasts):
"""A decryptable channel packet creates a message and broadcasts it."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["channel_message"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
# Create the channel in DB first using upsert
await ChannelRepository.upsert(
key=fixture["channel_key_hex"].upper(),
name=fixture["channel_name"],
is_hashtag=True
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await process_raw_packet(packet_bytes, timestamp=1700000000)
# Verify packet was processed successfully
assert result is not None
assert result.get("decrypted") is True
# Verify message was stored in database
messages = await MessageRepository.get_all(
msg_type="CHAN",
conversation_key=fixture["channel_key_hex"].upper(),
limit=10
)
assert len(messages) == 1
msg = messages[0]
assert "Flightless🥝:" in msg.text
assert "hashtag room is essentially public" in msg.text
# Verify WebSocket broadcast format matches fixture
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
broadcast = message_broadcasts[0]
expected = fixture["expected_ws_event"]["data"]
assert broadcast["data"]["type"] == expected["type"]
assert broadcast["data"]["conversation_key"] == expected["conversation_key"]
assert broadcast["data"]["outgoing"] == expected["outgoing"]
assert expected["text"][:30] in broadcast["data"]["text"] # Check text contains expected content
@pytest.mark.asyncio
async def test_duplicate_packet_not_broadcast_twice(self, test_db, captured_broadcasts):
"""Same packet arriving twice only creates one message and one broadcast."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["duplicate_channel_message"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
channel_key_hex = "7ABA109EDCF304A84433CB71D0F3AB73"
# Create the channel in DB first
await ChannelRepository.upsert(
key=channel_key_hex,
name=fixture["channel_name"],
is_hashtag=True
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
# Process same packet twice
result1 = await process_raw_packet(packet_bytes, timestamp=1700000000)
result2 = await process_raw_packet(packet_bytes, timestamp=1700000001)
# First should succeed, second should be detected as duplicate
assert result1 is not None
assert result1.get("decrypted") is True
# Second packet still processes but message is deduplicated
assert result2 is not None
# Only ONE message should exist in database
messages = await MessageRepository.get_all(
msg_type="CHAN",
conversation_key=channel_key_hex,
limit=10
)
assert len(messages) == 1
# Only ONE message broadcast should have been sent
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
@pytest.mark.asyncio
async def test_unknown_channel_stores_raw_packet_only(self, test_db, captured_broadcasts):
"""Packet for unknown channel is stored but not decrypted."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["channel_message"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
# DON'T create the channel - simulate unknown channel
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await process_raw_packet(packet_bytes, timestamp=1700000000)
# Packet should be stored but not decrypted
assert result is not None
# Raw packet should be stored
raw_packets = await RawPacketRepository.get_undecrypted(limit=10)
assert len(raw_packets) >= 1
# No message broadcast (only raw_packet broadcast)
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 0
class TestAdvertisementPipeline:
"""Test advertisement flow: packet → parse → upsert contact → broadcast."""
@pytest.mark.asyncio
async def test_advertisement_creates_contact_with_gps(self, test_db, captured_broadcasts):
"""Advertisement packet creates/updates contact with GPS coordinates."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["advertisement_with_gps"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
# Process the advertisement packet through the normal pipeline
result = await process_raw_packet(packet_bytes, timestamp=1700000000)
# Verify contact was created in database
expected = fixture["expected_ws_event"]["data"]
contact = await ContactRepository.get_by_key_prefix(expected["public_key"][:12])
assert contact is not None
assert contact.name == expected["name"]
assert contact.type == expected["type"]
assert contact.lat is not None
assert contact.lon is not None
assert abs(contact.lat - expected["lat"]) < 0.001
assert abs(contact.lon - expected["lon"]) < 0.001
# Verify WebSocket broadcast
contact_broadcasts = [b for b in broadcasts if b["type"] == "contact"]
assert len(contact_broadcasts) == 1
broadcast = contact_broadcasts[0]
assert broadcast["data"]["public_key"] == expected["public_key"]
assert broadcast["data"]["name"] == expected["name"]
assert broadcast["data"]["type"] == expected["type"]
@pytest.mark.asyncio
async def test_advertisement_updates_existing_contact(self, test_db, captured_broadcasts):
"""Advertisement for existing contact updates their info."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["advertisement_chat_node"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
expected = fixture["expected_ws_event"]["data"]
# Create existing contact with different/missing data
await ContactRepository.upsert({
"public_key": expected["public_key"],
"name": "OldName",
"type": 0,
"lat": None,
"lon": None
})
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
await process_raw_packet(packet_bytes, timestamp=1700000000)
# Verify contact was updated
contact = await ContactRepository.get_by_key_prefix(expected["public_key"][:12])
assert contact.name == expected["name"] # Name updated
assert contact.type == expected["type"] # Type updated
assert contact.lat is not None # GPS added
assert contact.lon is not None
class TestAckPipeline:
"""Test ACK flow: outgoing message → ACK received → broadcast update."""
@pytest.mark.asyncio
async def test_ack_updates_message_and_broadcasts(self, test_db, captured_broadcasts):
"""ACK receipt updates message ack count and broadcasts."""
from app.event_handlers import on_ack, track_pending_ack
from app.repository import MessageRepository
# Create a message that's waiting for ACK (acked defaults to 0)
msg_id = await MessageRepository.create(
msg_type="PRIV",
text="Hello",
conversation_key="abc123def456789012345678901234567890123456789012345678901234",
sender_timestamp=1700000000,
received_at=1700000000,
outgoing=True
)
# Track pending ACK
ack_code = "test_ack_123"
track_pending_ack(ack_code, message_id=msg_id, timeout_ms=30000)
broadcasts, mock_broadcast = captured_broadcasts
# Create a mock Event with the ACK code
# on_ack expects event.payload.get("code")
mock_event = MagicMock()
mock_event.payload = {"code": ack_code}
# Patch broadcast_event in the event_handlers module
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_ack(mock_event)
# Verify message was updated in database
messages = await MessageRepository.get_all(
msg_type="PRIV",
conversation_key="abc123def456789012345678901234567890123456789012345678901234",
limit=10
)
assert len(messages) == 1
assert messages[0].acked == 1
# Verify broadcast format matches fixture
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
assert len(ack_broadcasts) == 1
expected = FIXTURES["message_acked"]["expected_ws_event"]["data"]
broadcast = ack_broadcasts[0]
assert "message_id" in broadcast["data"]
assert "ack_count" in broadcast["data"]
assert broadcast["data"]["ack_count"] == 1
class TestCreateMessageFromDecrypted:
"""Test the shared message creation function used by both real-time and historical decryption."""
@pytest.mark.asyncio
async def test_creates_message_and_broadcasts(self, test_db, captured_broadcasts):
"""create_message_from_decrypted creates message and broadcasts correctly."""
from app.packet_processor import create_message_from_decrypted
# Create a raw packet first (required for the function)
packet_id = await RawPacketRepository.create(b"test_packet_data", 1700000000)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_message_from_decrypted(
packet_id=packet_id,
channel_key="ABC123DEF456",
sender="TestSender",
message_text="Hello world",
timestamp=1700000000,
received_at=1700000001,
)
# Should return a message ID
assert msg_id is not None
assert isinstance(msg_id, int)
# Verify message was stored in database
messages = await MessageRepository.get_all(
msg_type="CHAN",
conversation_key="ABC123DEF456",
limit=10
)
assert len(messages) == 1
assert messages[0].text == "TestSender: Hello world"
assert messages[0].sender_timestamp == 1700000000
# Verify broadcast was sent with correct structure
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
broadcast = message_broadcasts[0]["data"]
assert broadcast["id"] == msg_id
assert broadcast["type"] == "CHAN"
assert broadcast["conversation_key"] == "ABC123DEF456"
assert broadcast["text"] == "TestSender: Hello world"
assert broadcast["sender_timestamp"] == 1700000000
assert broadcast["received_at"] == 1700000001
assert broadcast["path_len"] is None # Historical decryption has no path info
assert broadcast["outgoing"] is False
assert broadcast["acked"] == 0
@pytest.mark.asyncio
async def test_handles_message_without_sender(self, test_db, captured_broadcasts):
"""create_message_from_decrypted handles messages without sender prefix."""
from app.packet_processor import create_message_from_decrypted
packet_id = await RawPacketRepository.create(b"test_packet_data_2", 1700000000)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_message_from_decrypted(
packet_id=packet_id,
channel_key="ABC123DEF456",
sender=None, # No sender
message_text="System message",
timestamp=1700000000,
received_at=1700000001,
)
assert msg_id is not None
# Verify text is stored without sender prefix
messages = await MessageRepository.get_all(
msg_type="CHAN",
conversation_key="ABC123DEF456",
limit=10
)
assert len(messages) == 1
assert messages[0].text == "System message" # No "None: " prefix
@pytest.mark.asyncio
async def test_returns_none_for_duplicate(self, test_db, captured_broadcasts):
"""create_message_from_decrypted returns None for duplicate message."""
from app.packet_processor import create_message_from_decrypted
packet_id_1 = await RawPacketRepository.create(b"packet_1", 1700000000)
packet_id_2 = await RawPacketRepository.create(b"packet_2", 1700000001)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
# First call creates the message
msg_id_1 = await create_message_from_decrypted(
packet_id=packet_id_1,
channel_key="ABC123DEF456",
sender="Sender",
message_text="Duplicate test",
timestamp=1700000000,
received_at=1700000001,
)
# Second call with same content (different packet) returns None
msg_id_2 = await create_message_from_decrypted(
packet_id=packet_id_2,
channel_key="ABC123DEF456",
sender="Sender",
message_text="Duplicate test",
timestamp=1700000000, # Same sender_timestamp
received_at=1700000002,
)
assert msg_id_1 is not None
assert msg_id_2 is None # Duplicate detected
# Only one message broadcast
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
@pytest.mark.asyncio
async def test_links_raw_packet_to_message(self, test_db, captured_broadcasts):
"""create_message_from_decrypted links raw packet to created message."""
from app.packet_processor import create_message_from_decrypted
packet_id = await RawPacketRepository.create(b"test_packet", 1700000000)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_message_from_decrypted(
packet_id=packet_id,
channel_key="ABC123DEF456",
sender="Sender",
message_text="Link test",
timestamp=1700000000,
received_at=1700000001,
)
# Verify packet is marked decrypted
undecrypted = await RawPacketRepository.get_undecrypted(limit=100)
packet_ids = [p[0] for p in undecrypted]
assert packet_id not in packet_ids # Should be marked as decrypted
class TestMessageBroadcastStructure:
"""Test that message broadcasts have the correct structure for frontend."""
@pytest.mark.asyncio
async def test_realtime_broadcast_includes_path_len(self, test_db, captured_broadcasts):
"""Real-time packet processing includes path_len in broadcast."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["channel_message"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
channel_key_hex = fixture["channel_key_hex"].upper()
await ChannelRepository.upsert(
key=channel_key_hex,
name=fixture["channel_name"],
is_hashtag=True
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
await process_raw_packet(packet_bytes, timestamp=1700000000)
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
broadcast = message_broadcasts[0]["data"]
# Real-time processing extracts path_len from packet (flood packets have path_len=0)
assert "path_len" in broadcast
# The test packet is a flood packet, so path_len should be 0 or None depending on packet structure
class TestRawPacketStorage:
"""Test raw packet storage for later decryption."""
@pytest.mark.asyncio
async def test_raw_packet_stored_with_decryption_status(self, test_db, captured_broadcasts):
"""Raw packets are stored with correct decryption status."""
from app.packet_processor import process_raw_packet
fixture = FIXTURES["channel_message"]
packet_bytes = bytes.fromhex(fixture["raw_packet_hex"])
channel_key_hex = fixture["channel_key_hex"].upper()
# Create channel so packet can be decrypted
await ChannelRepository.upsert(
key=channel_key_hex,
name=fixture["channel_name"],
is_hashtag=True
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await process_raw_packet(packet_bytes, timestamp=1700000000)
# Verify raw_packet broadcast was sent
raw_broadcasts = [b for b in broadcasts if b["type"] == "raw_packet"]
assert len(raw_broadcasts) == 1
# Verify broadcast includes decryption info
raw_broadcast = raw_broadcasts[0]["data"]
assert raw_broadcast["decrypted"] is True
assert "decrypted_info" in raw_broadcast
assert raw_broadcast["decrypted_info"]["channel_name"] == fixture["channel_name"]