Fix repeater message duplication issue and clarify fallback functionality for missing private key export

This commit is contained in:
Jack Kingsman
2026-01-26 22:13:44 -08:00
parent d77523c76a
commit 2b681e1905
4 changed files with 155 additions and 24 deletions

View File

@@ -81,12 +81,16 @@ Radio events flow through `event_handlers.py`:
| Event | Handler | Actions |
|-------|---------|---------|
| `CONTACT_MSG_RECV` | `on_contact_message` | Store message, update contact last_seen, broadcast via WS |
| `CHANNEL_MSG_RECV` | `on_channel_message` | Store message, broadcast via WS |
| `RAW_DATA` | `on_raw_data` | Store packet, try decrypt with all channel keys, detect repeats |
| `ADVERTISEMENT` | `on_advertisement` | Upsert contact with location |
| `CONTACT_MSG_RECV` | `on_contact_message` | **Fallback only** - stores DM if packet processor didn't handle it |
| `RX_LOG_DATA` | `on_rx_log_data` | Store packet, decrypt channels/DMs, broadcast via WS |
| `PATH_UPDATE` | `on_path_update` | Update contact path info |
| `NEW_CONTACT` | `on_new_contact` | Sync contact from radio's internal database |
| `ACK` | `on_ack` | Match pending ACKs, mark message acked, broadcast |
**Note on DM handling**: Direct messages are primarily handled by the packet processor via
`RX_LOG_DATA`, which decrypts using the exported private key. The `CONTACT_MSG_RECV` handler
exists as a fallback for radios without `ENABLE_PRIVATE_KEY_EXPORT=1` in firmware.
### WebSocket Broadcasting
Real-time updates use `ws_manager` singleton:
@@ -333,9 +337,18 @@ Direct messages use ECDH key exchange (Ed25519 → X25519) for shared secret der
via `keystore.py`. This enables server-side DM decryption even when contacts aren't loaded
on the radio.
**Real-time decryption**: When a `RAW_DATA` event contains a `TEXT_MESSAGE` packet, the
`packet_processor.py` attempts to decrypt it using known contact public keys and the
stored private key.
**Primary path (packet processor)**: When an `RX_LOG_DATA` event contains a `TEXT_MESSAGE`
packet, `packet_processor.py` handles the complete flow:
1. Decrypts using known contact public keys and stored private key
2. Filters CLI responses (txt_type=1 in flags)
3. Stores message in database
4. Broadcasts via WebSocket
5. Updates contact's last_contacted timestamp
6. Triggers bot if enabled
**Fallback path (event handler)**: If the packet processor can't decrypt (no private key
export, unknown contact), `on_contact_message` handles DMs from the MeshCore library's
`CONTACT_MSG_RECV` event. DB deduplication prevents double-storage when both paths fire.
**Historical decryption**: When creating a contact with `try_historical=True`, the server
attempts to decrypt all stored `TEXT_MESSAGE` packets for that contact.
@@ -635,10 +648,18 @@ reboot # Restart repeater
### CLI Response Filtering
CLI responses have `txt_type=1` (vs `txt_type=0` for normal messages). The event handler
in `event_handlers.py` skips these to prevent duplicates—the command endpoint returns
the response directly, so we don't also store/broadcast via WebSocket.
CLI responses have `txt_type=1` (vs `txt_type=0` for normal messages). Both DM handling
paths filter these to prevent storage—the command endpoint returns the response directly.
**Packet processor path** (primary):
```python
# In create_dm_message_from_decrypted()
txt_type = decrypted.flags & 0x0F
if txt_type == 1:
return None # Skip CLI responses
```
**Event handler path** (fallback):
```python
# In on_contact_message()
txt_type = payload.get("txt_type", 0)

View File

@@ -47,22 +47,26 @@ def _cleanup_expired_acks() -> None:
async def on_contact_message(event: "Event") -> None:
"""Handle incoming direct messages.
"""Handle incoming direct messages from MeshCore library.
Direct messages are decrypted by MeshCore library using ECDH key exchange.
The packet processor cannot decrypt these without the node's private key.
NOTE: DMs are primarily handled by the packet processor via RX_LOG_DATA,
which decrypts using our exported private key. This handler exists as a
fallback for cases where:
1. The private key couldn't be exported (firmware without ENABLE_PRIVATE_KEY_EXPORT)
2. The packet processor couldn't match the sender to a known contact
The packet processor handles: decryption, storage, broadcast, bot trigger.
This handler only stores if the packet processor didn't already handle it
(detected via INSERT OR IGNORE returning None for duplicates).
"""
payload = event.payload
# Skip CLI command responses (txt_type=1) - these are handled by the command endpoint
# and should not be stored in the database or broadcast via WebSocket
txt_type = payload.get("txt_type", 0)
if txt_type == 1:
logger.debug("Skipping CLI response from %s (txt_type=1)", payload.get("pubkey_prefix"))
return
logger.debug("Received direct message from %s", payload.get("pubkey_prefix"))
# Get full public key if available, otherwise use prefix
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
received_at = int(time.time())
@@ -74,6 +78,7 @@ async def on_contact_message(event: "Event") -> None:
sender_pubkey = contact.public_key
# Try to create message - INSERT OR IGNORE handles duplicates atomically
# If the packet processor already stored this message, this returns None
msg_id = await MessageRepository.create(
msg_type="PRIV",
text=payload.get("text", ""),
@@ -81,21 +86,24 @@ async def on_contact_message(event: "Event") -> None:
sender_timestamp=payload.get("sender_timestamp"),
received_at=received_at,
path=payload.get("path"),
txt_type=payload.get("txt_type", 0),
txt_type=txt_type,
signature=payload.get("signature"),
)
if msg_id is None:
# Duplicate message (same content from same sender) - skip broadcast
logger.debug("Duplicate direct message from %s ignored", sender_pubkey[:12])
# Already handled by packet processor (or exact duplicate) - nothing more to do
logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12])
return
# If we get here, the packet processor didn't handle this message
# (likely because private key export is not available)
logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12])
# Build paths array for broadcast
# Use "is not None" to include empty string (direct/0-hop messages)
path = payload.get("path")
paths = [{"path": path or "", "received_at": received_at}] if path is not None else None
# Broadcast only genuinely new messages
# Broadcast the new message
broadcast_event(
"message",
{
@@ -106,19 +114,19 @@ async def on_contact_message(event: "Event") -> None:
"sender_timestamp": payload.get("sender_timestamp"),
"received_at": received_at,
"paths": paths,
"txt_type": payload.get("txt_type", 0),
"txt_type": txt_type,
"signature": payload.get("signature"),
"outgoing": False,
"acked": 0,
},
)
# Update contact last_seen and last_contacted
# Update contact last_contacted
contact = await ContactRepository.get_by_key_prefix(sender_pubkey)
if contact:
await ContactRepository.update_last_contacted(contact.public_key, received_at)
# Run bot if enabled (for non-CLI messages)
# Run bot if enabled
from app.bot import run_bot_for_message
await run_bot_for_message(

View File

@@ -212,6 +212,16 @@ async def create_dm_message_from_decrypted(
Returns the message ID if created, None if duplicate.
"""
# Extract txt_type from flags (lower 4 bits)
# txt_type=1 is CLI response - don't store these in chat history
txt_type = decrypted.flags & 0x0F
if txt_type == 1:
logger.debug(
"Skipping CLI response from %s (txt_type=1)",
their_public_key[:12],
)
return None
received = received_at or int(time.time())
# conversation_key is always the other party's public key

View File

@@ -941,3 +941,95 @@ class TestDMDecryptionFunction:
# Verify raw packet is linked
undecrypted = await RawPacketRepository.get_undecrypted(limit=100)
assert packet_id not in [p.id for p in undecrypted]
class TestCLIResponseFiltering:
"""Test that CLI responses (txt_type=1) are not stored in chat history."""
A1B2C3_PUB = "a1b2c3d3ba9f5fa8705b9845fe11cc6f01d1d49caaf4d122ac7121663c5beec7"
FACE12_PUB = "FACE123334789E2B81519AFDBC39A3C9EB7EA3457AD367D3243597A484847E46"
@pytest.mark.asyncio
async def test_cli_response_not_stored(self, test_db, captured_broadcasts):
"""CLI responses (flags & 0x0F == 1) should not be stored in database."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import create_dm_message_from_decrypted
from app.repository import MessageRepository, RawPacketRepository
# Store a raw packet first
packet_id, _ = await RawPacketRepository.create(b"\x09\x00test", 1700000000)
# Create a DecryptedDirectMessage with flags=1 (CLI response)
decrypted = DecryptedDirectMessage(
timestamp=1700000000,
flags=1, # txt_type=1 (CLI response)
message="cli response: version 1.0",
dest_hash="fa",
src_hash="a1",
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_dm_message_from_decrypted(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=self.A1B2C3_PUB,
our_public_key=self.FACE12_PUB,
received_at=1700000001,
outgoing=False,
)
# Should return None (not stored)
assert msg_id is None
# Should not broadcast
assert len(broadcasts) == 0
# Should not be in database
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10
)
assert len(messages) == 0
@pytest.mark.asyncio
async def test_normal_message_still_stored(self, test_db, captured_broadcasts):
"""Normal messages (flags & 0x0F == 0) should still be stored."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import create_dm_message_from_decrypted
from app.repository import MessageRepository, RawPacketRepository
packet_id, _ = await RawPacketRepository.create(b"\x09\x00test2", 1700000000)
decrypted = DecryptedDirectMessage(
timestamp=1700000000,
flags=0, # txt_type=0 (normal message)
message="Hello, world!",
dest_hash="fa",
src_hash="a1",
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_dm_message_from_decrypted(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=self.A1B2C3_PUB,
our_public_key=self.FACE12_PUB,
received_at=1700000001,
outgoing=False,
)
# Should return message ID
assert msg_id is not None
# Should broadcast
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
# Should be in database
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10
)
assert len(messages) == 1