diff --git a/app/packet_processor.py b/app/packet_processor.py index 305e7b1..88bd366 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -43,25 +43,38 @@ async def create_message_from_decrypted( message_text: str, timestamp: int, received_at: int | None = None, + path_len: int | None = None, ) -> int | None: """Create a message record from decrypted channel packet content. This is the shared logic for storing decrypted channel messages, used by both real-time packet processing and historical decryption. + Args: + packet_id: ID of the raw packet being processed + channel_key: Hex string channel key + sender: Sender name (will be prefixed to message) or None + message_text: The decrypted message content + timestamp: Sender timestamp from the packet + received_at: When the packet was received (defaults to now) + path_len: Path length from packet routing (None for historical decryption) + Returns the message ID if created, None if duplicate. """ import time as time_module received = received_at or int(time_module.time()) - # Format the message text + # Format the message text with sender prefix if present text = f"{sender}: {message_text}" if sender else message_text + # Normalize channel key to uppercase for consistency + channel_key_normalized = channel_key.upper() + # Try to create message - INSERT OR IGNORE handles duplicates atomically msg_id = await MessageRepository.create( msg_type="CHAN", text=text, - conversation_key=channel_key.upper(), + conversation_key=channel_key_normalized, sender_timestamp=timestamp, received_at=received, ) @@ -69,26 +82,32 @@ async def create_message_from_decrypted( if msg_id is None: # Duplicate detected - find existing message ID for packet linkage existing_id = await MessageRepository.find_duplicate( - conversation_key=channel_key.upper(), + conversation_key=channel_key_normalized, text=text, sender_timestamp=timestamp, ) + logger.debug( + "Duplicate message detected for channel %s (existing id=%s)", + channel_key_normalized[:8], existing_id + ) if existing_id: await RawPacketRepository.mark_decrypted(packet_id, existing_id) return None + logger.info("Stored channel message %d for channel %s", msg_id, channel_key_normalized[:8]) + # Mark the raw packet as decrypted await RawPacketRepository.mark_decrypted(packet_id, msg_id) - # Broadcast new message to connected clients (for historical decryption visibility) + # Broadcast new message to connected clients broadcast_event("message", { "id": msg_id, "type": "CHAN", - "conversation_key": channel_key.upper(), + "conversation_key": channel_key_normalized, "text": text, "sender_timestamp": timestamp, "received_at": received, - "path_len": None, + "path_len": path_len, "txt_type": 0, "signature": None, "outgoing": False, @@ -262,67 +281,22 @@ async def _process_group_text( "message_id": message_id, } - # Format the message text - if decrypted.sender: - text = f"{decrypted.sender}: {decrypted.message}" - else: - text = decrypted.message - - # Try to create message - INSERT OR IGNORE handles duplicates atomically - msg_id = await MessageRepository.create( - msg_type="CHAN", - text=text, - conversation_key=channel.key, - sender_timestamp=decrypted.timestamp, + # Use shared function to create message, handle duplicates, and broadcast + msg_id = await create_message_from_decrypted( + packet_id=packet_id, + channel_key=channel.key, + sender=decrypted.sender, + message_text=decrypted.message, + timestamp=decrypted.timestamp, received_at=timestamp, + path_len=packet_info.path_length if packet_info else None, ) - if msg_id is None: - # Duplicate detected by database constraint (same message via different RF path) - # Find existing message ID for packet linkage - existing_id = await MessageRepository.find_duplicate( - conversation_key=channel.key, - text=text, - sender_timestamp=decrypted.timestamp, - ) - logger.debug( - "Duplicate message detected for channel %s (existing id=%s)", - channel.name, existing_id - ) - if existing_id: - await RawPacketRepository.mark_decrypted(packet_id, existing_id) - return { - "decrypted": True, - "channel_name": channel.name, - "sender": decrypted.sender, - "message_id": existing_id, - } - - logger.info("Stored channel message %d for %s", msg_id, channel.name) - - # Broadcast new message (only for genuinely new messages) - broadcast_event("message", { - "id": msg_id, - "type": "CHAN", - "conversation_key": channel.key, - "text": text, - "sender_timestamp": decrypted.timestamp, - "received_at": timestamp, - "path_len": packet_info.path_length if packet_info else None, - "txt_type": 0, - "signature": None, - "outgoing": False, - "acked": 0, - }) - - # Mark the raw packet as decrypted - await RawPacketRepository.mark_decrypted(packet_id, msg_id) - return { "decrypted": True, "channel_name": channel.name, "sender": decrypted.sender, - "message_id": msg_id, + "message_id": msg_id, # None if duplicate, msg_id if new } # Couldn't decrypt with any known key diff --git a/app/repository.py b/app/repository.py index 3b9a57e..6674357 100644 --- a/app/repository.py +++ b/app/repository.py @@ -307,8 +307,10 @@ class MessageRepository: path_len, txt_type, signature, outgoing), ) await db.conn.commit() - # lastrowid is 0 if no row was inserted (duplicate) - return cursor.lastrowid if cursor.lastrowid else None + # rowcount is 0 if INSERT was ignored due to UNIQUE constraint violation + if cursor.rowcount == 0: + return None + return cursor.lastrowid @staticmethod async def get_all( diff --git a/frontend/src/test/fixtures/websocket_events.json b/frontend/src/test/fixtures/websocket_events.json new file mode 100644 index 0000000..91b348b --- /dev/null +++ b/frontend/src/test/fixtures/websocket_events.json @@ -0,0 +1,63 @@ +{ + "channel_message": { + "description": "Channel message on #six77 from Flightless🥝", + "raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818", + "channel_name": "#six77", + "channel_key_hex": "7aba109edcf304a84433cb71d0f3ab73", + "expected_ws_event": { + "type": "message", + "data": { + "type": "CHAN", + "conversation_key": "7ABA109EDCF304A84433CB71D0F3AB73", + "text": "Flightless🥝: hello there; this hashtag room is essentially public. MeshCore has great crypto; use private rooms or DMs for private comms instead!", + "sender_timestamp": 1766604717, + "outgoing": false, + "acked": 0 + } + } + }, + "advertisement_with_gps": { + "description": "Repeater advertisement with GPS coordinates from Can O Mesh 2", + "raw_packet_hex": "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BAF9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB", + "expected_ws_event": { + "type": "contact", + "data": { + "public_key": "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab", + "name": "Can O Mesh 2 🥫", + "type": 2, + "lat": 49.02056, + "lon": -123.82935 + } + } + }, + "advertisement_chat_node": { + "description": "Chat node advertisement with GPS from Flightless🥝", + "raw_packet_hex": "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A832DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1DF3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D913628D902602DB5F8466C696768746C657373F09FA59D", + "expected_ws_event": { + "type": "contact", + "data": { + "public_key": "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83", + "name": "Flightless🥝", + "type": 1, + "lat": 47.786038, + "lon": -122.344096 + } + } + }, + "message_acked": { + "description": "ACK event when a message is confirmed delivered", + "expected_ws_event": { + "type": "message_acked", + "data": { + "message_id": 42, + "ack_count": 1 + } + } + }, + "duplicate_channel_message": { + "description": "Same channel message arriving twice (via different paths) should only produce one message event", + "raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818", + "channel_name": "#six77", + "expected_behavior": "First packet creates message and broadcasts, second packet is deduplicated (no broadcast)" + } +} diff --git a/frontend/src/test/integration.test.ts b/frontend/src/test/integration.test.ts new file mode 100644 index 0000000..f6cc346 --- /dev/null +++ b/frontend/src/test/integration.test.ts @@ -0,0 +1,369 @@ +/** + * Integration tests for WebSocket event handling. + * + * These tests verify that WebSocket events (as produced by the backend) + * are correctly processed by the frontend state handlers. + * + * The fixtures in fixtures/websocket_events.json define the contract + * between backend and frontend - both sides test against the same data. + */ + +import { describe, it, expect } from 'vitest'; +import fixtures from './fixtures/websocket_events.json'; +import { getMessageContentKey } from '../hooks/useConversationMessages'; +import { getStateKey } from '../utils/conversationState'; +import type { Message, Contact, Channel } from '../types'; + +/** + * Simulate the WebSocket message handler from App.tsx. + * This is the core logic we're testing. + */ +interface MockState { + messages: Message[]; + contacts: Contact[]; + channels: Channel[]; + unreadCounts: Record; + lastMessageTimes: Record; + seenMessageContent: Set; +} + +function createMockState(): MockState { + return { + messages: [], + contacts: [], + channels: [], + unreadCounts: {}, + lastMessageTimes: {}, + seenMessageContent: new Set(), + }; +} + +/** + * Simulate handling a message WebSocket event. + * Mirrors the logic in App.tsx onMessage handler. + */ +function handleMessageEvent( + state: MockState, + msg: Message, + activeConversationKey: string | null +): { added: boolean; unreadIncremented: boolean } { + const contentKey = getMessageContentKey(msg); + let added = false; + let unreadIncremented = false; + + // Check if message is for active conversation + const isForActiveConversation = activeConversationKey !== null && + msg.conversation_key === activeConversationKey; + + // Add to messages if for active conversation (with deduplication) + if (isForActiveConversation) { + if (!state.seenMessageContent.has(contentKey)) { + state.seenMessageContent.add(contentKey); + state.messages.push(msg); + added = true; + } + } + + // Update last message time + const stateKey = msg.type === 'CHAN' + ? getStateKey('channel', msg.conversation_key) + : getStateKey('contact', msg.conversation_key); + + state.lastMessageTimes[stateKey] = msg.received_at; + + // Increment unread if not for active conversation and not outgoing + if (!msg.outgoing && !isForActiveConversation) { + // Deduplicate by content + if (!state.seenMessageContent.has(contentKey)) { + state.seenMessageContent.add(contentKey); + state.unreadCounts[stateKey] = (state.unreadCounts[stateKey] || 0) + 1; + unreadIncremented = true; + } + } + + return { added, unreadIncremented }; +} + +/** + * Simulate handling a contact WebSocket event. + */ +function handleContactEvent(state: MockState, contact: Contact): void { + const idx = state.contacts.findIndex(c => c.public_key === contact.public_key); + if (idx >= 0) { + // Update existing contact + state.contacts[idx] = { ...state.contacts[idx], ...contact }; + } else { + // Add new contact + state.contacts.push(contact); + } +} + +/** + * Simulate handling a message_acked WebSocket event. + */ +function handleMessageAckedEvent( + state: MockState, + messageId: number, + ackCount: number +): boolean { + const idx = state.messages.findIndex(m => m.id === messageId); + if (idx >= 0) { + state.messages[idx] = { ...state.messages[idx], acked: ackCount }; + return true; + } + return false; +} + + +describe('Integration: Channel Message Events', () => { + const fixture = fixtures.channel_message; + + it('adds message to list when conversation is active', () => { + const state = createMockState(); + const msg = fixture.expected_ws_event.data as unknown as Message; + msg.id = 1; + msg.received_at = 1700000000; + + const result = handleMessageEvent(state, msg, msg.conversation_key); + + expect(result.added).toBe(true); + expect(state.messages).toHaveLength(1); + expect(state.messages[0].text).toContain('Flightless🥝'); + }); + + it('increments unread count when conversation is not active', () => { + const state = createMockState(); + const msg = fixture.expected_ws_event.data as unknown as Message; + msg.id = 1; + msg.received_at = 1700000000; + + const result = handleMessageEvent(state, msg, 'different_conversation'); + + expect(result.unreadIncremented).toBe(true); + const stateKey = getStateKey('channel', msg.conversation_key); + expect(state.unreadCounts[stateKey]).toBe(1); + }); + + it('updates lastMessageTimes for sidebar sorting', () => { + const state = createMockState(); + const msg = fixture.expected_ws_event.data as unknown as Message; + msg.id = 1; + msg.received_at = 1700000000; + + handleMessageEvent(state, msg, null); + + const stateKey = getStateKey('channel', msg.conversation_key); + expect(state.lastMessageTimes[stateKey]).toBe(1700000000); + }); + + it('does not increment unread for outgoing messages', () => { + const state = createMockState(); + const msg = { ...fixture.expected_ws_event.data, outgoing: true } as unknown as Message; + msg.id = 1; + msg.received_at = 1700000000; + + const result = handleMessageEvent(state, msg, 'different_conversation'); + + expect(result.unreadIncremented).toBe(false); + const stateKey = getStateKey('channel', msg.conversation_key); + expect(state.unreadCounts[stateKey]).toBeUndefined(); + }); +}); + + +describe('Integration: Duplicate Message Handling', () => { + // Note: duplicate_channel_message fixture references the same packet data as channel_message + + it('deduplicates messages by content when adding to list', () => { + const state = createMockState(); + // Use channel_message fixture data since duplicate_channel_message references same packet + const msgData = fixtures.channel_message.expected_ws_event.data; + const msg1 = { ...msgData, id: 1, received_at: 1700000000 } as unknown as Message; + const msg2 = { ...msgData, id: 2, received_at: 1700000001 } as unknown as Message; + + // Both arrive for active conversation + const result1 = handleMessageEvent(state, msg1, msg1.conversation_key); + const result2 = handleMessageEvent(state, msg2, msg2.conversation_key); + + expect(result1.added).toBe(true); + expect(result2.added).toBe(false); // Deduplicated + expect(state.messages).toHaveLength(1); + }); + + it('deduplicates unread increments by content', () => { + const state = createMockState(); + const msgData = fixtures.channel_message.expected_ws_event.data; + const msg1 = { ...msgData, id: 1, received_at: 1700000000 } as unknown as Message; + const msg2 = { ...msgData, id: 2, received_at: 1700000001 } as unknown as Message; + + // Both arrive for non-active conversation + const result1 = handleMessageEvent(state, msg1, 'other_conversation'); + const result2 = handleMessageEvent(state, msg2, 'other_conversation'); + + expect(result1.unreadIncremented).toBe(true); + expect(result2.unreadIncremented).toBe(false); // Deduplicated + + const stateKey = getStateKey('channel', msg1.conversation_key); + expect(state.unreadCounts[stateKey]).toBe(1); // Only incremented once + }); +}); + + +describe('Integration: Contact/Advertisement Events', () => { + const fixture = fixtures.advertisement_with_gps; + + it('creates new contact from advertisement', () => { + const state = createMockState(); + const contact = fixture.expected_ws_event.data as unknown as Contact; + + handleContactEvent(state, contact); + + expect(state.contacts).toHaveLength(1); + expect(state.contacts[0].public_key).toBe(contact.public_key); + expect(state.contacts[0].name).toBe('Can O Mesh 2 🥫'); + expect(state.contacts[0].type).toBe(2); // Repeater + expect(state.contacts[0].lat).toBeCloseTo(49.02056, 4); + expect(state.contacts[0].lon).toBeCloseTo(-123.82935, 4); + }); + + it('updates existing contact from advertisement', () => { + const state = createMockState(); + + // Add existing contact + state.contacts.push({ + public_key: fixture.expected_ws_event.data.public_key, + name: 'Old Name', + type: 0, + on_radio: false, + last_read_at: null, + } as Contact); + + // Process new advertisement + const contact = fixture.expected_ws_event.data as unknown as Contact; + handleContactEvent(state, contact); + + expect(state.contacts).toHaveLength(1); + expect(state.contacts[0].name).toBe('Can O Mesh 2 🥫'); // Updated + expect(state.contacts[0].type).toBe(2); // Updated + }); + + it('preserves contact GPS from chat node advertisement', () => { + const state = createMockState(); + const chatFixture = fixtures.advertisement_chat_node; + const contact = chatFixture.expected_ws_event.data as unknown as Contact; + + handleContactEvent(state, contact); + + expect(state.contacts[0].lat).toBeCloseTo(47.786038, 4); + expect(state.contacts[0].lon).toBeCloseTo(-122.344096, 4); + expect(state.contacts[0].type).toBe(1); // Chat node + }); +}); + + +describe('Integration: ACK Events', () => { + const fixture = fixtures.message_acked; + + it('updates message ack count', () => { + const state = createMockState(); + + // Add a message that's waiting for ACK + state.messages.push({ + id: 42, + type: 'PRIV', + conversation_key: 'abc123', + text: 'Hello', + sender_timestamp: 1700000000, + received_at: 1700000000, + path_len: null, + txt_type: 0, + signature: null, + outgoing: true, + acked: 0, + }); + + const ackData = fixture.expected_ws_event.data; + const updated = handleMessageAckedEvent(state, ackData.message_id, ackData.ack_count); + + expect(updated).toBe(true); + expect(state.messages[0].acked).toBe(1); + }); + + it('returns false for unknown message id', () => { + const state = createMockState(); + + const ackData = fixture.expected_ws_event.data; + const updated = handleMessageAckedEvent(state, ackData.message_id, ackData.ack_count); + + expect(updated).toBe(false); + }); + + it('updates to multiple ack count for flood echoes', () => { + const state = createMockState(); + + state.messages.push({ + id: 42, + type: 'CHAN', + conversation_key: 'channel123', + text: 'Hello', + sender_timestamp: 1700000000, + received_at: 1700000000, + path_len: null, + txt_type: 0, + signature: null, + outgoing: true, + acked: 0, + }); + + // Multiple flood echoes + handleMessageAckedEvent(state, 42, 1); + handleMessageAckedEvent(state, 42, 2); + handleMessageAckedEvent(state, 42, 3); + + expect(state.messages[0].acked).toBe(3); + }); +}); + + +describe('Integration: Message Content Key Contract', () => { + it('generates consistent keys for deduplication', () => { + const msg = fixtures.channel_message.expected_ws_event.data as unknown as Message; + msg.id = 1; + + // Same content with different IDs should generate same key + const msg2 = { ...msg, id: 2 }; + + expect(getMessageContentKey(msg)).toBe(getMessageContentKey(msg2)); + }); + + it('key format matches backend expectation', () => { + const msg = fixtures.channel_message.expected_ws_event.data as unknown as Message; + + const key = getMessageContentKey(msg); + + // Key should be: type-conversation_key-text-sender_timestamp + expect(key).toContain(msg.type); + expect(key).toContain(msg.conversation_key); + expect(key).toContain(String(msg.sender_timestamp)); + }); +}); + + +describe('Integration: State Key Contract', () => { + it('generates correct channel state key', () => { + const channelKey = fixtures.channel_message.expected_ws_event.data.conversation_key; + + const stateKey = getStateKey('channel', channelKey); + + expect(stateKey).toBe(`channel-${channelKey}`); + }); + + it('generates correct contact state key with prefix', () => { + const publicKey = fixtures.advertisement_with_gps.expected_ws_event.data.public_key; + + const stateKey = getStateKey('contact', publicKey); + + // Contact state key uses 12-char prefix + expect(stateKey).toBe(`contact-${publicKey.substring(0, 12)}`); + }); +}); diff --git a/tests/fixtures/websocket_events.json b/tests/fixtures/websocket_events.json new file mode 100644 index 0000000..91b348b --- /dev/null +++ b/tests/fixtures/websocket_events.json @@ -0,0 +1,63 @@ +{ + "channel_message": { + "description": "Channel message on #six77 from Flightless🥝", + "raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818", + "channel_name": "#six77", + "channel_key_hex": "7aba109edcf304a84433cb71d0f3ab73", + "expected_ws_event": { + "type": "message", + "data": { + "type": "CHAN", + "conversation_key": "7ABA109EDCF304A84433CB71D0F3AB73", + "text": "Flightless🥝: hello there; this hashtag room is essentially public. MeshCore has great crypto; use private rooms or DMs for private comms instead!", + "sender_timestamp": 1766604717, + "outgoing": false, + "acked": 0 + } + } + }, + "advertisement_with_gps": { + "description": "Repeater advertisement with GPS coordinates from Can O Mesh 2", + "raw_packet_hex": "1106538B1CD273868576DC7F679B493F9AB5AC316173E1A56D3388BC3BA75F583F63AB0D1BA2A8ABD0BC6669DBF719E67E4C8517BA4E0D6F8C96A323E9D13A77F2630DED965A5C17C3EC6ED1601EEFE857749DA24E9F39CBEACD722C3708F433DB5FA9BAF0BAF9BC5B1241069290FEEB029A839EF843616E204F204D657368203220F09FA5AB", + "expected_ws_event": { + "type": "contact", + "data": { + "public_key": "8576dc7f679b493f9ab5ac316173e1a56d3388bc3ba75f583f63ab0d1ba2a8ab", + "name": "Can O Mesh 2 🥫", + "type": 2, + "lat": 49.02056, + "lon": -123.82935 + } + } + }, + "advertisement_chat_node": { + "description": "Chat node advertisement with GPS from Flightless🥝", + "raw_packet_hex": "1100AE92564C5C9884854F04F469BBB2BAB8871A078053AF6CF4AA2C014B18CE8A832DBF6669128E9476F36320F21D1B37FF1CF31680F50F4B17EDABCC7CF8C47D3C5E1DF3AFD0C8721EA06A8078462EF241DEF80AD6922751F206E3BB121DFB604F4146D60D913628D902602DB5F8466C696768746C657373F09FA59D", + "expected_ws_event": { + "type": "contact", + "data": { + "public_key": "ae92564c5c9884854f04f469bbb2bab8871a078053af6cf4aa2c014b18ce8a83", + "name": "Flightless🥝", + "type": 1, + "lat": 47.786038, + "lon": -122.344096 + } + } + }, + "message_acked": { + "description": "ACK event when a message is confirmed delivered", + "expected_ws_event": { + "type": "message_acked", + "data": { + "message_id": 42, + "ack_count": 1 + } + } + }, + "duplicate_channel_message": { + "description": "Same channel message arriving twice (via different paths) should only produce one message event", + "raw_packet_hex": "1500E69C7A89DD0AF6A2D69F5823B88F9720731E4B887C56932BF889255D8D926D99195927144323A42DD8A158F878B518B8304DF55E80501C7D02A9FFD578D3518283156BBA257BF8413E80A237393B2E4149BBBC864371140A9BBC4E23EB9BF203EF0D029214B3E3AAC3C0295690ACDB89A28619E7E5F22C83E16073AD679D25FA904D07E5ACF1DB5A7C77D7E1719FB9AE5BF55541EE0D7F59ED890E12CF0FEED6700818", + "channel_name": "#six77", + "expected_behavior": "First packet creates message and broadcasts, second packet is deduplicated (no broadcast)" + } +} diff --git a/tests/test_packet_pipeline.py b/tests/test_packet_pipeline.py new file mode 100644 index 0000000..7f07415 --- /dev/null +++ b/tests/test_packet_pipeline.py @@ -0,0 +1,512 @@ +"""End-to-end tests for the packet processing pipeline. + +These tests verify the full flow from raw packet arrival through to +WebSocket broadcast, using real packet data and a real database. + +The fixtures in fixtures/websocket_events.json define the contract +between backend and frontend - both sides test against the same data. +""" + +import json +from pathlib import Path +from unittest.mock import patch, MagicMock + +import pytest + +from app.database import Database +from app.repository import ChannelRepository, MessageRepository, ContactRepository, RawPacketRepository + + +# Load shared fixtures +FIXTURES_PATH = Path(__file__).parent / "fixtures" / "websocket_events.json" +with open(FIXTURES_PATH) as f: + FIXTURES = json.load(f) + + +@pytest.fixture +async def test_db(): + """Create an in-memory test database. + + We need to patch the db module-level variable before any repository + methods are called, so they use our test database. + """ + import app.repository as repo_module + + db = Database(":memory:") + await db.connect() + + # Store original and patch the module attribute directly + original_db = repo_module.db + repo_module.db = db + + try: + yield db + finally: + repo_module.db = original_db + await db.disconnect() + + +@pytest.fixture +def captured_broadcasts(): + """Capture WebSocket broadcasts for verification.""" + broadcasts = [] + + def mock_broadcast(event_type: str, data: dict): + """Synchronous mock that captures broadcasts.""" + broadcasts.append({"type": event_type, "data": data}) + + return broadcasts, mock_broadcast + + +class TestChannelMessagePipeline: + """Test channel message flow: packet → decrypt → store → broadcast.""" + + @pytest.mark.asyncio + async def test_channel_message_creates_message_and_broadcasts(self, test_db, captured_broadcasts): + """A decryptable channel packet creates a message and broadcasts it.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["channel_message"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + + # Create the channel in DB first using upsert + await ChannelRepository.upsert( + key=fixture["channel_key_hex"].upper(), + name=fixture["channel_name"], + is_hashtag=True + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + result = await process_raw_packet(packet_bytes, timestamp=1700000000) + + # Verify packet was processed successfully + assert result is not None + assert result.get("decrypted") is True + + # Verify message was stored in database + messages = await MessageRepository.get_all( + msg_type="CHAN", + conversation_key=fixture["channel_key_hex"].upper(), + limit=10 + ) + assert len(messages) == 1 + msg = messages[0] + assert "Flightless🥝:" in msg.text + assert "hashtag room is essentially public" in msg.text + + # Verify WebSocket broadcast format matches fixture + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 1 + + broadcast = message_broadcasts[0] + expected = fixture["expected_ws_event"]["data"] + assert broadcast["data"]["type"] == expected["type"] + assert broadcast["data"]["conversation_key"] == expected["conversation_key"] + assert broadcast["data"]["outgoing"] == expected["outgoing"] + assert expected["text"][:30] in broadcast["data"]["text"] # Check text contains expected content + + @pytest.mark.asyncio + async def test_duplicate_packet_not_broadcast_twice(self, test_db, captured_broadcasts): + """Same packet arriving twice only creates one message and one broadcast.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["duplicate_channel_message"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + channel_key_hex = "7ABA109EDCF304A84433CB71D0F3AB73" + + # Create the channel in DB first + await ChannelRepository.upsert( + key=channel_key_hex, + name=fixture["channel_name"], + is_hashtag=True + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + # Process same packet twice + result1 = await process_raw_packet(packet_bytes, timestamp=1700000000) + result2 = await process_raw_packet(packet_bytes, timestamp=1700000001) + + # First should succeed, second should be detected as duplicate + assert result1 is not None + assert result1.get("decrypted") is True + + # Second packet still processes but message is deduplicated + assert result2 is not None + + # Only ONE message should exist in database + messages = await MessageRepository.get_all( + msg_type="CHAN", + conversation_key=channel_key_hex, + limit=10 + ) + assert len(messages) == 1 + + # Only ONE message broadcast should have been sent + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 1 + + @pytest.mark.asyncio + async def test_unknown_channel_stores_raw_packet_only(self, test_db, captured_broadcasts): + """Packet for unknown channel is stored but not decrypted.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["channel_message"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + + # DON'T create the channel - simulate unknown channel + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + result = await process_raw_packet(packet_bytes, timestamp=1700000000) + + # Packet should be stored but not decrypted + assert result is not None + + # Raw packet should be stored + raw_packets = await RawPacketRepository.get_undecrypted(limit=10) + assert len(raw_packets) >= 1 + + # No message broadcast (only raw_packet broadcast) + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 0 + + +class TestAdvertisementPipeline: + """Test advertisement flow: packet → parse → upsert contact → broadcast.""" + + @pytest.mark.asyncio + async def test_advertisement_creates_contact_with_gps(self, test_db, captured_broadcasts): + """Advertisement packet creates/updates contact with GPS coordinates.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["advertisement_with_gps"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + # Process the advertisement packet through the normal pipeline + result = await process_raw_packet(packet_bytes, timestamp=1700000000) + + # Verify contact was created in database + expected = fixture["expected_ws_event"]["data"] + contact = await ContactRepository.get_by_key_prefix(expected["public_key"][:12]) + + assert contact is not None + assert contact.name == expected["name"] + assert contact.type == expected["type"] + assert contact.lat is not None + assert contact.lon is not None + assert abs(contact.lat - expected["lat"]) < 0.001 + assert abs(contact.lon - expected["lon"]) < 0.001 + + # Verify WebSocket broadcast + contact_broadcasts = [b for b in broadcasts if b["type"] == "contact"] + assert len(contact_broadcasts) == 1 + + broadcast = contact_broadcasts[0] + assert broadcast["data"]["public_key"] == expected["public_key"] + assert broadcast["data"]["name"] == expected["name"] + assert broadcast["data"]["type"] == expected["type"] + + @pytest.mark.asyncio + async def test_advertisement_updates_existing_contact(self, test_db, captured_broadcasts): + """Advertisement for existing contact updates their info.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["advertisement_chat_node"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + expected = fixture["expected_ws_event"]["data"] + + # Create existing contact with different/missing data + await ContactRepository.upsert({ + "public_key": expected["public_key"], + "name": "OldName", + "type": 0, + "lat": None, + "lon": None + }) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + await process_raw_packet(packet_bytes, timestamp=1700000000) + + # Verify contact was updated + contact = await ContactRepository.get_by_key_prefix(expected["public_key"][:12]) + + assert contact.name == expected["name"] # Name updated + assert contact.type == expected["type"] # Type updated + assert contact.lat is not None # GPS added + assert contact.lon is not None + + +class TestAckPipeline: + """Test ACK flow: outgoing message → ACK received → broadcast update.""" + + @pytest.mark.asyncio + async def test_ack_updates_message_and_broadcasts(self, test_db, captured_broadcasts): + """ACK receipt updates message ack count and broadcasts.""" + from app.event_handlers import on_ack, track_pending_ack + from app.repository import MessageRepository + + # Create a message that's waiting for ACK (acked defaults to 0) + msg_id = await MessageRepository.create( + msg_type="PRIV", + text="Hello", + conversation_key="abc123def456789012345678901234567890123456789012345678901234", + sender_timestamp=1700000000, + received_at=1700000000, + outgoing=True + ) + + # Track pending ACK + ack_code = "test_ack_123" + track_pending_ack(ack_code, message_id=msg_id, timeout_ms=30000) + + broadcasts, mock_broadcast = captured_broadcasts + + # Create a mock Event with the ACK code + # on_ack expects event.payload.get("code") + mock_event = MagicMock() + mock_event.payload = {"code": ack_code} + + # Patch broadcast_event in the event_handlers module + with patch("app.event_handlers.broadcast_event", mock_broadcast): + await on_ack(mock_event) + + # Verify message was updated in database + messages = await MessageRepository.get_all( + msg_type="PRIV", + conversation_key="abc123def456789012345678901234567890123456789012345678901234", + limit=10 + ) + assert len(messages) == 1 + assert messages[0].acked == 1 + + # Verify broadcast format matches fixture + ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"] + assert len(ack_broadcasts) == 1 + + expected = FIXTURES["message_acked"]["expected_ws_event"]["data"] + broadcast = ack_broadcasts[0] + assert "message_id" in broadcast["data"] + assert "ack_count" in broadcast["data"] + assert broadcast["data"]["ack_count"] == 1 + + +class TestCreateMessageFromDecrypted: + """Test the shared message creation function used by both real-time and historical decryption.""" + + @pytest.mark.asyncio + async def test_creates_message_and_broadcasts(self, test_db, captured_broadcasts): + """create_message_from_decrypted creates message and broadcasts correctly.""" + from app.packet_processor import create_message_from_decrypted + + # Create a raw packet first (required for the function) + packet_id = await RawPacketRepository.create(b"test_packet_data", 1700000000) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + msg_id = await create_message_from_decrypted( + packet_id=packet_id, + channel_key="ABC123DEF456", + sender="TestSender", + message_text="Hello world", + timestamp=1700000000, + received_at=1700000001, + ) + + # Should return a message ID + assert msg_id is not None + assert isinstance(msg_id, int) + + # Verify message was stored in database + messages = await MessageRepository.get_all( + msg_type="CHAN", + conversation_key="ABC123DEF456", + limit=10 + ) + assert len(messages) == 1 + assert messages[0].text == "TestSender: Hello world" + assert messages[0].sender_timestamp == 1700000000 + + # Verify broadcast was sent with correct structure + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 1 + + broadcast = message_broadcasts[0]["data"] + assert broadcast["id"] == msg_id + assert broadcast["type"] == "CHAN" + assert broadcast["conversation_key"] == "ABC123DEF456" + assert broadcast["text"] == "TestSender: Hello world" + assert broadcast["sender_timestamp"] == 1700000000 + assert broadcast["received_at"] == 1700000001 + assert broadcast["path_len"] is None # Historical decryption has no path info + assert broadcast["outgoing"] is False + assert broadcast["acked"] == 0 + + @pytest.mark.asyncio + async def test_handles_message_without_sender(self, test_db, captured_broadcasts): + """create_message_from_decrypted handles messages without sender prefix.""" + from app.packet_processor import create_message_from_decrypted + + packet_id = await RawPacketRepository.create(b"test_packet_data_2", 1700000000) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + msg_id = await create_message_from_decrypted( + packet_id=packet_id, + channel_key="ABC123DEF456", + sender=None, # No sender + message_text="System message", + timestamp=1700000000, + received_at=1700000001, + ) + + assert msg_id is not None + + # Verify text is stored without sender prefix + messages = await MessageRepository.get_all( + msg_type="CHAN", + conversation_key="ABC123DEF456", + limit=10 + ) + assert len(messages) == 1 + assert messages[0].text == "System message" # No "None: " prefix + + @pytest.mark.asyncio + async def test_returns_none_for_duplicate(self, test_db, captured_broadcasts): + """create_message_from_decrypted returns None for duplicate message.""" + from app.packet_processor import create_message_from_decrypted + + packet_id_1 = await RawPacketRepository.create(b"packet_1", 1700000000) + packet_id_2 = await RawPacketRepository.create(b"packet_2", 1700000001) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + # First call creates the message + msg_id_1 = await create_message_from_decrypted( + packet_id=packet_id_1, + channel_key="ABC123DEF456", + sender="Sender", + message_text="Duplicate test", + timestamp=1700000000, + received_at=1700000001, + ) + + # Second call with same content (different packet) returns None + msg_id_2 = await create_message_from_decrypted( + packet_id=packet_id_2, + channel_key="ABC123DEF456", + sender="Sender", + message_text="Duplicate test", + timestamp=1700000000, # Same sender_timestamp + received_at=1700000002, + ) + + assert msg_id_1 is not None + assert msg_id_2 is None # Duplicate detected + + # Only one message broadcast + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 1 + + @pytest.mark.asyncio + async def test_links_raw_packet_to_message(self, test_db, captured_broadcasts): + """create_message_from_decrypted links raw packet to created message.""" + from app.packet_processor import create_message_from_decrypted + + packet_id = await RawPacketRepository.create(b"test_packet", 1700000000) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + msg_id = await create_message_from_decrypted( + packet_id=packet_id, + channel_key="ABC123DEF456", + sender="Sender", + message_text="Link test", + timestamp=1700000000, + received_at=1700000001, + ) + + # Verify packet is marked decrypted + undecrypted = await RawPacketRepository.get_undecrypted(limit=100) + packet_ids = [p[0] for p in undecrypted] + assert packet_id not in packet_ids # Should be marked as decrypted + + +class TestMessageBroadcastStructure: + """Test that message broadcasts have the correct structure for frontend.""" + + @pytest.mark.asyncio + async def test_realtime_broadcast_includes_path_len(self, test_db, captured_broadcasts): + """Real-time packet processing includes path_len in broadcast.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["channel_message"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + channel_key_hex = fixture["channel_key_hex"].upper() + + await ChannelRepository.upsert( + key=channel_key_hex, + name=fixture["channel_name"], + is_hashtag=True + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + await process_raw_packet(packet_bytes, timestamp=1700000000) + + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 1 + + broadcast = message_broadcasts[0]["data"] + # Real-time processing extracts path_len from packet (flood packets have path_len=0) + assert "path_len" in broadcast + # The test packet is a flood packet, so path_len should be 0 or None depending on packet structure + + +class TestRawPacketStorage: + """Test raw packet storage for later decryption.""" + + @pytest.mark.asyncio + async def test_raw_packet_stored_with_decryption_status(self, test_db, captured_broadcasts): + """Raw packets are stored with correct decryption status.""" + from app.packet_processor import process_raw_packet + + fixture = FIXTURES["channel_message"] + packet_bytes = bytes.fromhex(fixture["raw_packet_hex"]) + channel_key_hex = fixture["channel_key_hex"].upper() + + # Create channel so packet can be decrypted + await ChannelRepository.upsert( + key=channel_key_hex, + name=fixture["channel_name"], + is_hashtag=True + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + result = await process_raw_packet(packet_bytes, timestamp=1700000000) + + # Verify raw_packet broadcast was sent + raw_broadcasts = [b for b in broadcasts if b["type"] == "raw_packet"] + assert len(raw_broadcasts) == 1 + + # Verify broadcast includes decryption info + raw_broadcast = raw_broadcasts[0]["data"] + assert raw_broadcast["decrypted"] is True + assert "decrypted_info" in raw_broadcast + assert raw_broadcast["decrypted_info"]["channel_name"] == fixture["channel_name"]