From 2b681e1905ee98564e2238de67bc875fa5b87ba2 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 26 Jan 2026 22:13:44 -0800 Subject: [PATCH] Fix repeater message duplication issue and clarify fallback functionality for missing private key export --- app/AGENTS.md | 41 ++++++++++++---- app/event_handlers.py | 36 ++++++++------ app/packet_processor.py | 10 ++++ tests/test_packet_pipeline.py | 92 +++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 24 deletions(-) diff --git a/app/AGENTS.md b/app/AGENTS.md index e705595..532e0d1 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -81,12 +81,16 @@ Radio events flow through `event_handlers.py`: | Event | Handler | Actions | |-------|---------|---------| -| `CONTACT_MSG_RECV` | `on_contact_message` | Store message, update contact last_seen, broadcast via WS | -| `CHANNEL_MSG_RECV` | `on_channel_message` | Store message, broadcast via WS | -| `RAW_DATA` | `on_raw_data` | Store packet, try decrypt with all channel keys, detect repeats | -| `ADVERTISEMENT` | `on_advertisement` | Upsert contact with location | +| `CONTACT_MSG_RECV` | `on_contact_message` | **Fallback only** - stores DM if packet processor didn't handle it | +| `RX_LOG_DATA` | `on_rx_log_data` | Store packet, decrypt channels/DMs, broadcast via WS | +| `PATH_UPDATE` | `on_path_update` | Update contact path info | +| `NEW_CONTACT` | `on_new_contact` | Sync contact from radio's internal database | | `ACK` | `on_ack` | Match pending ACKs, mark message acked, broadcast | +**Note on DM handling**: Direct messages are primarily handled by the packet processor via +`RX_LOG_DATA`, which decrypts using the exported private key. The `CONTACT_MSG_RECV` handler +exists as a fallback for radios without `ENABLE_PRIVATE_KEY_EXPORT=1` in firmware. + ### WebSocket Broadcasting Real-time updates use `ws_manager` singleton: @@ -333,9 +337,18 @@ Direct messages use ECDH key exchange (Ed25519 → X25519) for shared secret der via `keystore.py`. This enables server-side DM decryption even when contacts aren't loaded on the radio. -**Real-time decryption**: When a `RAW_DATA` event contains a `TEXT_MESSAGE` packet, the -`packet_processor.py` attempts to decrypt it using known contact public keys and the -stored private key. +**Primary path (packet processor)**: When an `RX_LOG_DATA` event contains a `TEXT_MESSAGE` +packet, `packet_processor.py` handles the complete flow: +1. Decrypts using known contact public keys and stored private key +2. Filters CLI responses (txt_type=1 in flags) +3. Stores message in database +4. Broadcasts via WebSocket +5. Updates contact's last_contacted timestamp +6. Triggers bot if enabled + +**Fallback path (event handler)**: If the packet processor can't decrypt (no private key +export, unknown contact), `on_contact_message` handles DMs from the MeshCore library's +`CONTACT_MSG_RECV` event. DB deduplication prevents double-storage when both paths fire. **Historical decryption**: When creating a contact with `try_historical=True`, the server attempts to decrypt all stored `TEXT_MESSAGE` packets for that contact. @@ -635,10 +648,18 @@ reboot # Restart repeater ### CLI Response Filtering -CLI responses have `txt_type=1` (vs `txt_type=0` for normal messages). The event handler -in `event_handlers.py` skips these to prevent duplicates—the command endpoint returns -the response directly, so we don't also store/broadcast via WebSocket. +CLI responses have `txt_type=1` (vs `txt_type=0` for normal messages). Both DM handling +paths filter these to prevent storage—the command endpoint returns the response directly. +**Packet processor path** (primary): +```python +# In create_dm_message_from_decrypted() +txt_type = decrypted.flags & 0x0F +if txt_type == 1: + return None # Skip CLI responses +``` + +**Event handler path** (fallback): ```python # In on_contact_message() txt_type = payload.get("txt_type", 0) diff --git a/app/event_handlers.py b/app/event_handlers.py index 617eeb2..a0421f4 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -47,22 +47,26 @@ def _cleanup_expired_acks() -> None: async def on_contact_message(event: "Event") -> None: - """Handle incoming direct messages. + """Handle incoming direct messages from MeshCore library. - Direct messages are decrypted by MeshCore library using ECDH key exchange. - The packet processor cannot decrypt these without the node's private key. + NOTE: DMs are primarily handled by the packet processor via RX_LOG_DATA, + which decrypts using our exported private key. This handler exists as a + fallback for cases where: + 1. The private key couldn't be exported (firmware without ENABLE_PRIVATE_KEY_EXPORT) + 2. The packet processor couldn't match the sender to a known contact + + The packet processor handles: decryption, storage, broadcast, bot trigger. + This handler only stores if the packet processor didn't already handle it + (detected via INSERT OR IGNORE returning None for duplicates). """ payload = event.payload # Skip CLI command responses (txt_type=1) - these are handled by the command endpoint - # and should not be stored in the database or broadcast via WebSocket txt_type = payload.get("txt_type", 0) if txt_type == 1: logger.debug("Skipping CLI response from %s (txt_type=1)", payload.get("pubkey_prefix")) return - logger.debug("Received direct message from %s", payload.get("pubkey_prefix")) - # Get full public key if available, otherwise use prefix sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "") received_at = int(time.time()) @@ -74,6 +78,7 @@ async def on_contact_message(event: "Event") -> None: sender_pubkey = contact.public_key # Try to create message - INSERT OR IGNORE handles duplicates atomically + # If the packet processor already stored this message, this returns None msg_id = await MessageRepository.create( msg_type="PRIV", text=payload.get("text", ""), @@ -81,21 +86,24 @@ async def on_contact_message(event: "Event") -> None: sender_timestamp=payload.get("sender_timestamp"), received_at=received_at, path=payload.get("path"), - txt_type=payload.get("txt_type", 0), + txt_type=txt_type, signature=payload.get("signature"), ) if msg_id is None: - # Duplicate message (same content from same sender) - skip broadcast - logger.debug("Duplicate direct message from %s ignored", sender_pubkey[:12]) + # Already handled by packet processor (or exact duplicate) - nothing more to do + logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12]) return + # If we get here, the packet processor didn't handle this message + # (likely because private key export is not available) + logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12]) + # Build paths array for broadcast - # Use "is not None" to include empty string (direct/0-hop messages) path = payload.get("path") paths = [{"path": path or "", "received_at": received_at}] if path is not None else None - # Broadcast only genuinely new messages + # Broadcast the new message broadcast_event( "message", { @@ -106,19 +114,19 @@ async def on_contact_message(event: "Event") -> None: "sender_timestamp": payload.get("sender_timestamp"), "received_at": received_at, "paths": paths, - "txt_type": payload.get("txt_type", 0), + "txt_type": txt_type, "signature": payload.get("signature"), "outgoing": False, "acked": 0, }, ) - # Update contact last_seen and last_contacted + # Update contact last_contacted contact = await ContactRepository.get_by_key_prefix(sender_pubkey) if contact: await ContactRepository.update_last_contacted(contact.public_key, received_at) - # Run bot if enabled (for non-CLI messages) + # Run bot if enabled from app.bot import run_bot_for_message await run_bot_for_message( diff --git a/app/packet_processor.py b/app/packet_processor.py index 754f649..1118f81 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -212,6 +212,16 @@ async def create_dm_message_from_decrypted( Returns the message ID if created, None if duplicate. """ + # Extract txt_type from flags (lower 4 bits) + # txt_type=1 is CLI response - don't store these in chat history + txt_type = decrypted.flags & 0x0F + if txt_type == 1: + logger.debug( + "Skipping CLI response from %s (txt_type=1)", + their_public_key[:12], + ) + return None + received = received_at or int(time.time()) # conversation_key is always the other party's public key diff --git a/tests/test_packet_pipeline.py b/tests/test_packet_pipeline.py index 80fec37..64f2d95 100644 --- a/tests/test_packet_pipeline.py +++ b/tests/test_packet_pipeline.py @@ -941,3 +941,95 @@ class TestDMDecryptionFunction: # Verify raw packet is linked undecrypted = await RawPacketRepository.get_undecrypted(limit=100) assert packet_id not in [p.id for p in undecrypted] + + +class TestCLIResponseFiltering: + """Test that CLI responses (txt_type=1) are not stored in chat history.""" + + A1B2C3_PUB = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7" + FACE12_PUB = "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46" + + @pytest.mark.asyncio + async def test_cli_response_not_stored(self, test_db, captured_broadcasts): + """CLI responses (flags & 0x0F == 1) should not be stored in database.""" + from app.decoder import DecryptedDirectMessage + from app.packet_processor import create_dm_message_from_decrypted + from app.repository import MessageRepository, RawPacketRepository + + # Store a raw packet first + packet_id, _ = await RawPacketRepository.create(b"\x09\x00test", 1700000000) + + # Create a DecryptedDirectMessage with flags=1 (CLI response) + decrypted = DecryptedDirectMessage( + timestamp=1700000000, + flags=1, # txt_type=1 (CLI response) + message="cli response: version 1.0", + dest_hash="fa", + src_hash="a1", + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + msg_id = await create_dm_message_from_decrypted( + packet_id=packet_id, + decrypted=decrypted, + their_public_key=self.A1B2C3_PUB, + our_public_key=self.FACE12_PUB, + received_at=1700000001, + outgoing=False, + ) + + # Should return None (not stored) + assert msg_id is None + + # Should not broadcast + assert len(broadcasts) == 0 + + # Should not be in database + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10 + ) + assert len(messages) == 0 + + @pytest.mark.asyncio + async def test_normal_message_still_stored(self, test_db, captured_broadcasts): + """Normal messages (flags & 0x0F == 0) should still be stored.""" + from app.decoder import DecryptedDirectMessage + from app.packet_processor import create_dm_message_from_decrypted + from app.repository import MessageRepository, RawPacketRepository + + packet_id, _ = await RawPacketRepository.create(b"\x09\x00test2", 1700000000) + + decrypted = DecryptedDirectMessage( + timestamp=1700000000, + flags=0, # txt_type=0 (normal message) + message="Hello, world!", + dest_hash="fa", + src_hash="a1", + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + msg_id = await create_dm_message_from_decrypted( + packet_id=packet_id, + decrypted=decrypted, + their_public_key=self.A1B2C3_PUB, + our_public_key=self.FACE12_PUB, + received_at=1700000001, + outgoing=False, + ) + + # Should return message ID + assert msg_id is not None + + # Should broadcast + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert len(message_broadcasts) == 1 + + # Should be in database + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10 + ) + assert len(messages) == 1