From d373585527124933041c4e20c04a5ba8de3a772b Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 16 Mar 2026 16:36:11 -0700 Subject: [PATCH] Unify our DM ingest --- app/event_handlers.py | 88 ++++------- app/services/dm_ingest.py | 320 ++++++++++++++++++++++++++++++++++++++ app/services/messages.py | 177 ++------------------- tests/test_echo_dedup.py | 172 ++++++++++++++++---- 4 files changed, 512 insertions(+), 245 deletions(-) create mode 100644 app/services/dm_ingest.py diff --git a/app/event_handlers.py b/app/event_handlers.py index 9e84d3a..f367a16 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -4,19 +4,21 @@ from typing import TYPE_CHECKING from meshcore import EventType -from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert +from app.models import Contact, ContactUpsert from app.packet_processor import process_raw_packet from app.repository import ( - AmbiguousPublicKeyPrefixError, ContactRepository, ) from app.services import dm_ack_tracker from app.services.contact_reconciliation import ( - claim_prefix_messages_for_contact, promote_prefix_contacts_for_contact, record_contact_name_and_reconcile, ) -from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast +from app.services.dm_ingest import ( + ingest_fallback_direct_message, + resolve_fallback_direct_message_context, +) +from app.services.messages import increment_ack_and_broadcast from app.websocket import broadcast_event if TYPE_CHECKING: @@ -51,8 +53,8 @@ async def on_contact_message(event: "Event") -> None: 2. The packet processor couldn't match the sender to a known contact The packet processor handles: decryption, storage, broadcast, bot trigger. - This handler only stores if the packet processor didn't already handle it - (detected via INSERT OR IGNORE returning None for duplicates). + This handler adapts CONTACT_MSG_RECV payloads into the shared DM ingest + workflow, which reconciles duplicates against the packet pipeline when possible. """ payload = event.payload @@ -66,54 +68,27 @@ async def on_contact_message(event: "Event") -> None: sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "") received_at = int(time.time()) - # Look up contact from database - use prefix lookup only if needed - # (get_by_key_or_prefix does exact match first, then prefix fallback) - try: - contact = await ContactRepository.get_by_key_or_prefix(sender_pubkey) - except AmbiguousPublicKeyPrefixError: - logger.warning( - "DM sender prefix '%s' is ambiguous; storing under prefix until full key is known", - sender_pubkey, + context = await resolve_fallback_direct_message_context( + sender_public_key=sender_pubkey, + received_at=received_at, + broadcast_fn=broadcast_event, + contact_repository=ContactRepository, + log=logger, + ) + if context.skip_storage: + logger.debug( + "Skipping message from repeater %s (not stored in chat history)", + context.conversation_key[:12], ) - contact = None - if contact: - sender_pubkey = contact.public_key.lower() + return - # Promote any prefix-stored messages to this full key - await claim_prefix_messages_for_contact(public_key=sender_pubkey, log=logger) - - # Skip messages from repeaters - they only send CLI responses, not chat messages. - # CLI responses are handled by the command endpoint and txt_type filter above. - if contact.type == CONTACT_TYPE_REPEATER: - logger.debug( - "Skipping message from repeater %s (not stored in chat history)", - sender_pubkey[:12], - ) - return - elif sender_pubkey: - placeholder_upsert = ContactUpsert( - public_key=sender_pubkey.lower(), - type=0, - last_seen=received_at, - last_contacted=received_at, - first_seen=received_at, - on_radio=False, - out_path_hash_mode=-1, - ) - await ContactRepository.upsert(placeholder_upsert) - contact = await ContactRepository.get_by_key(sender_pubkey.lower()) - if contact: - broadcast_event("contact", contact.model_dump()) - - # Try to create message - INSERT OR IGNORE handles duplicates atomically - # If the packet processor already stored this message, this returns None + # Try to create or reconcile the message via the shared DM ingest service. ts = payload.get("sender_timestamp") sender_timestamp = ts if ts is not None else received_at - sender_name = contact.name if contact else None path = payload.get("path") path_len = payload.get("path_len") - message = await create_fallback_direct_message( - conversation_key=sender_pubkey, + message = await ingest_fallback_direct_message( + conversation_key=context.conversation_key, text=payload.get("text", ""), sender_timestamp=sender_timestamp, received_at=received_at, @@ -121,23 +96,24 @@ async def on_contact_message(event: "Event") -> None: path_len=path_len, txt_type=txt_type, signature=payload.get("signature"), - sender_name=sender_name, - sender_key=sender_pubkey, + sender_name=context.sender_name, + sender_key=context.sender_key, broadcast_fn=broadcast_event, + update_last_contacted_key=context.contact.public_key.lower() if context.contact else None, ) if message is None: # Already handled by packet processor (or exact duplicate) - nothing more to do - logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12]) + logger.debug( + "DM from %s already processed by packet processor", context.conversation_key[:12] + ) return # If we get here, the packet processor didn't handle this message # (likely because private key export is not available) - logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12]) - - # Update contact last_contacted (contact was already fetched above) - if contact: - await ContactRepository.update_last_contacted(sender_pubkey, received_at) + logger.debug( + "DM from %s handled by event handler (fallback path)", context.conversation_key[:12] + ) async def on_rx_log_data(event: "Event") -> None: diff --git a/app/services/dm_ingest.py b/app/services/dm_ingest.py new file mode 100644 index 0000000..c0963c7 --- /dev/null +++ b/app/services/dm_ingest.py @@ -0,0 +1,320 @@ +import asyncio +import logging +import time +from collections.abc import Callable +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert, Message +from app.repository import ( + AmbiguousPublicKeyPrefixError, + ContactRepository, + MessageRepository, + RawPacketRepository, +) +from app.services.contact_reconciliation import claim_prefix_messages_for_contact +from app.services.messages import ( + broadcast_message, + build_message_model, + build_message_paths, + format_contact_log_target, + handle_duplicate_message, + reconcile_duplicate_message, + truncate_for_log, +) + +if TYPE_CHECKING: + from app.decoder import DecryptedDirectMessage + +logger = logging.getLogger(__name__) + +BroadcastFn = Callable[..., Any] +_decrypted_dm_store_lock = asyncio.Lock() + + +@dataclass(frozen=True) +class FallbackDirectMessageContext: + conversation_key: str + contact: Contact | None + sender_name: str | None + sender_key: str | None + skip_storage: bool = False + + +async def _prepare_resolved_contact( + contact: Contact, + *, + log: logging.Logger | None = None, +) -> tuple[str, bool]: + conversation_key = contact.public_key.lower() + await claim_prefix_messages_for_contact(public_key=conversation_key, log=log or logger) + + if contact.type == CONTACT_TYPE_REPEATER: + return conversation_key, True + + return conversation_key, False + + +async def resolve_fallback_direct_message_context( + *, + sender_public_key: str, + received_at: int, + broadcast_fn: BroadcastFn, + contact_repository=ContactRepository, + log: logging.Logger | None = None, +) -> FallbackDirectMessageContext: + normalized_sender = sender_public_key.lower() + + try: + contact = await contact_repository.get_by_key_or_prefix(normalized_sender) + except AmbiguousPublicKeyPrefixError: + (log or logger).warning( + "DM sender prefix '%s' is ambiguous; storing under prefix until full key is known", + sender_public_key, + ) + contact = None + + if contact is not None: + conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=log) + return FallbackDirectMessageContext( + conversation_key=conversation_key, + contact=contact, + sender_name=contact.name, + sender_key=conversation_key, + skip_storage=skip_storage, + ) + + if normalized_sender: + placeholder_upsert = ContactUpsert( + public_key=normalized_sender, + type=0, + last_seen=received_at, + last_contacted=received_at, + first_seen=received_at, + on_radio=False, + out_path_hash_mode=-1, + ) + await contact_repository.upsert(placeholder_upsert) + contact = await contact_repository.get_by_key(normalized_sender) + if contact is not None: + broadcast_fn("contact", contact.model_dump()) + + return FallbackDirectMessageContext( + conversation_key=normalized_sender, + contact=contact, + sender_name=contact.name if contact else None, + sender_key=normalized_sender or None, + ) + + +async def _store_direct_message( + *, + packet_id: int | None, + conversation_key: str, + text: str, + sender_timestamp: int, + received_at: int, + path: str | None, + path_len: int | None, + outgoing: bool, + txt_type: int, + signature: str | None, + sender_name: str | None, + sender_key: str | None, + realtime: bool, + broadcast_fn: BroadcastFn, + update_last_contacted_key: str | None, + best_effort_content_dedup: bool, + linked_packet_dedup: bool, + message_repository=MessageRepository, + contact_repository=ContactRepository, + raw_packet_repository=RawPacketRepository, +) -> Message | None: + async def store() -> Message | None: + if linked_packet_dedup and packet_id is not None: + linked_message_id = await raw_packet_repository.get_linked_message_id(packet_id) + if linked_message_id is not None: + existing_msg = await message_repository.get_by_id(linked_message_id) + if existing_msg is not None: + await reconcile_duplicate_message( + existing_msg=existing_msg, + packet_id=packet_id, + path=path, + received_at=received_at, + path_len=path_len, + broadcast_fn=broadcast_fn, + ) + return None + + if best_effort_content_dedup: + existing_msg = await message_repository.get_by_content( + msg_type="PRIV", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + ) + if existing_msg is not None: + await reconcile_duplicate_message( + existing_msg=existing_msg, + packet_id=packet_id, + path=path, + received_at=received_at, + path_len=path_len, + broadcast_fn=broadcast_fn, + ) + return None + + msg_id = await message_repository.create( + msg_type="PRIV", + text=text, + conversation_key=conversation_key, + sender_timestamp=sender_timestamp, + received_at=received_at, + path=path, + path_len=path_len, + txt_type=txt_type, + signature=signature, + outgoing=outgoing, + sender_key=sender_key, + sender_name=sender_name, + ) + if msg_id is None: + await handle_duplicate_message( + packet_id=packet_id, + msg_type="PRIV", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + path=path, + received_at=received_at, + path_len=path_len, + broadcast_fn=broadcast_fn, + ) + return None + + if packet_id is not None: + await raw_packet_repository.mark_decrypted(packet_id, msg_id) + + message = build_message_model( + message_id=msg_id, + msg_type="PRIV", + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + paths=build_message_paths(path, received_at, path_len), + txt_type=txt_type, + signature=signature, + sender_key=sender_key, + outgoing=outgoing, + sender_name=sender_name, + ) + broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime) + + if update_last_contacted_key: + await contact_repository.update_last_contacted(update_last_contacted_key, received_at) + + return message + + if linked_packet_dedup: + async with _decrypted_dm_store_lock: + return await store() + return await store() + + +async def ingest_decrypted_direct_message( + *, + packet_id: int, + decrypted: "DecryptedDirectMessage", + their_public_key: str, + received_at: int | None = None, + path: str | None = None, + path_len: int | None = None, + outgoing: bool = False, + realtime: bool = True, + broadcast_fn: BroadcastFn, + contact_repository=ContactRepository, +) -> Message | None: + conversation_key = their_public_key.lower() + contact = await contact_repository.get_by_key(conversation_key) + sender_name: str | None = None + if contact is not None: + conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=logger) + if skip_storage: + logger.debug( + "Skipping message from repeater %s (CLI responses not stored): %s", + conversation_key[:12], + (decrypted.message or "")[:50], + ) + return None + if not outgoing: + sender_name = contact.name + + received = received_at or int(time.time()) + message = await _store_direct_message( + packet_id=packet_id, + conversation_key=conversation_key, + text=decrypted.message, + sender_timestamp=decrypted.timestamp, + received_at=received, + path=path, + path_len=path_len, + outgoing=outgoing, + txt_type=0, + signature=None, + sender_name=sender_name, + sender_key=conversation_key if not outgoing else None, + realtime=realtime, + broadcast_fn=broadcast_fn, + update_last_contacted_key=conversation_key, + best_effort_content_dedup=outgoing, + linked_packet_dedup=True, + ) + if message is None: + return None + + logger.info( + 'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)', + truncate_for_log(decrypted.message), + format_contact_log_target(contact.name if contact else None, conversation_key), + message.id, + conversation_key, + outgoing, + ) + return message + + +async def ingest_fallback_direct_message( + *, + conversation_key: str, + text: str, + sender_timestamp: int, + received_at: int, + path: str | None, + path_len: int | None, + txt_type: int, + signature: str | None, + sender_name: str | None, + sender_key: str | None, + broadcast_fn: BroadcastFn, + update_last_contacted_key: str | None = None, +) -> Message | None: + return await _store_direct_message( + packet_id=None, + conversation_key=conversation_key, + text=text, + sender_timestamp=sender_timestamp, + received_at=received_at, + path=path, + path_len=path_len, + outgoing=False, + txt_type=txt_type, + signature=signature, + sender_name=sender_name, + sender_key=sender_key, + realtime=True, + broadcast_fn=broadcast_fn, + update_last_contacted_key=update_last_contacted_key, + best_effort_content_dedup=True, + linked_packet_dedup=False, + ) diff --git a/app/services/messages.py b/app/services/messages.py index 980db94..1b6a711 100644 --- a/app/services/messages.py +++ b/app/services/messages.py @@ -1,10 +1,9 @@ -import asyncio import logging import time from collections.abc import Callable from typing import TYPE_CHECKING, Any -from app.models import CONTACT_TYPE_REPEATER, Message, MessagePath +from app.models import Message, MessagePath from app.repository import ContactRepository, MessageRepository, RawPacketRepository if TYPE_CHECKING: @@ -14,10 +13,9 @@ logger = logging.getLogger(__name__) BroadcastFn = Callable[..., Any] LOG_MESSAGE_PREVIEW_LEN = 32 -_decrypted_dm_store_lock = asyncio.Lock() -def _truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str: +def truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str: """Return a compact single-line message preview for log output.""" normalized = " ".join(text.split()) if len(normalized) <= max_chars: @@ -30,7 +28,7 @@ def _format_channel_log_target(channel_name: str | None, channel_key: str) -> st return channel_name or channel_key -def _format_contact_log_target(contact_name: str | None, public_key: str) -> str: +def format_contact_log_target(contact_name: str | None, public_key: str) -> str: """Return a human-friendly DM target label for logs.""" return contact_name or public_key[:12] @@ -127,7 +125,7 @@ async def increment_ack_and_broadcast( return ack_count -async def _reconcile_duplicate_message( +async def reconcile_duplicate_message( *, existing_msg: Message, packet_id: int | None, @@ -194,7 +192,7 @@ async def handle_duplicate_message( ) return - await _reconcile_duplicate_message( + await reconcile_duplicate_message( existing_msg=existing_msg, packet_id=packet_id, path=path, @@ -257,7 +255,7 @@ async def create_message_from_decrypted( logger.info( 'Stored channel message "%s" for %r (msg ID %d in chan ID %s)', - _truncate_for_log(text), + truncate_for_log(text), _format_channel_log_target(channel_name, channel_key_normalized), msg_id, channel_key_normalized, @@ -298,165 +296,20 @@ async def create_dm_message_from_decrypted( broadcast_fn: BroadcastFn, ) -> int | None: """Store and broadcast a decrypted direct message.""" - contact = await ContactRepository.get_by_key(their_public_key) - if contact and contact.type == CONTACT_TYPE_REPEATER: - logger.debug( - "Skipping message from repeater %s (CLI responses not stored): %s", - their_public_key[:12], - (decrypted.message or "")[:50], - ) - return None + from app.services.dm_ingest import ingest_decrypted_direct_message - received = received_at or int(time.time()) - conversation_key = their_public_key.lower() - sender_name = contact.name if contact and not outgoing else None - - async with _decrypted_dm_store_lock: - linked_message_id = await RawPacketRepository.get_linked_message_id(packet_id) - if linked_message_id is not None: - existing_msg = await MessageRepository.get_by_id(linked_message_id) - if existing_msg is not None: - await _reconcile_duplicate_message( - existing_msg=existing_msg, - packet_id=packet_id, - path=path, - received_at=received, - path_len=path_len, - broadcast_fn=broadcast_fn, - ) - return None - - if outgoing: - existing_msg = await MessageRepository.get_by_content( - msg_type="PRIV", - conversation_key=conversation_key, - text=decrypted.message, - sender_timestamp=decrypted.timestamp, - ) - if existing_msg is not None: - await _reconcile_duplicate_message( - existing_msg=existing_msg, - packet_id=packet_id, - path=path, - received_at=received, - path_len=path_len, - broadcast_fn=broadcast_fn, - ) - return None - - msg_id = await MessageRepository.create( - msg_type="PRIV", - text=decrypted.message, - conversation_key=conversation_key, - sender_timestamp=decrypted.timestamp, - received_at=received, - path=path, - path_len=path_len, - outgoing=outgoing, - sender_key=conversation_key if not outgoing else None, - sender_name=sender_name, - ) - if msg_id is None: - await handle_duplicate_message( - packet_id=packet_id, - msg_type="PRIV", - conversation_key=conversation_key, - text=decrypted.message, - sender_timestamp=decrypted.timestamp, - path=path, - received_at=received, - path_len=path_len, - broadcast_fn=broadcast_fn, - ) - return None - - logger.info( - 'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)', - _truncate_for_log(decrypted.message), - _format_contact_log_target(contact.name if contact else None, conversation_key), - msg_id, - conversation_key, - outgoing, - ) - await RawPacketRepository.mark_decrypted(packet_id, msg_id) - - broadcast_message( - message=build_message_model( - message_id=msg_id, - msg_type="PRIV", - conversation_key=conversation_key, - text=decrypted.message, - sender_timestamp=decrypted.timestamp, - received_at=received, - paths=build_message_paths(path, received, path_len), - outgoing=outgoing, - sender_name=sender_name, - sender_key=conversation_key if not outgoing else None, - ), - broadcast_fn=broadcast_fn, - realtime=realtime, - ) - - await ContactRepository.update_last_contacted(conversation_key, received) - return msg_id - - -async def create_fallback_direct_message( - *, - conversation_key: str, - text: str, - sender_timestamp: int, - received_at: int, - path: str | None, - path_len: int | None, - txt_type: int, - signature: str | None, - sender_name: str | None, - sender_key: str | None, - broadcast_fn: BroadcastFn, - message_repository=MessageRepository, -) -> Message | None: - """Store and broadcast a CONTACT_MSG_RECV fallback direct message.""" - existing = await message_repository.get_by_content( - msg_type="PRIV", - conversation_key=conversation_key, - text=text, - sender_timestamp=sender_timestamp, - ) - if existing is not None: - return None - - msg_id = await message_repository.create( - msg_type="PRIV", - text=text, - conversation_key=conversation_key, - sender_timestamp=sender_timestamp, + message = await ingest_decrypted_direct_message( + packet_id=packet_id, + decrypted=decrypted, + their_public_key=their_public_key, received_at=received_at, path=path, path_len=path_len, - txt_type=txt_type, - signature=signature, - sender_key=sender_key, - sender_name=sender_name, + outgoing=outgoing, + realtime=realtime, + broadcast_fn=broadcast_fn, ) - if msg_id is None: - return None - - message = build_message_model( - message_id=msg_id, - msg_type="PRIV", - conversation_key=conversation_key, - text=text, - sender_timestamp=sender_timestamp, - received_at=received_at, - paths=build_message_paths(path, received_at, path_len), - txt_type=txt_type, - signature=signature, - sender_key=sender_key, - sender_name=sender_name, - ) - broadcast_message(message=message, broadcast_fn=broadcast_fn) - return message + return message.id if message is not None else None async def create_fallback_channel_message( diff --git a/tests/test_echo_dedup.py b/tests/test_echo_dedup.py index 57317d5..a418fe3 100644 --- a/tests/test_echo_dedup.py +++ b/tests/test_echo_dedup.py @@ -7,7 +7,7 @@ paths (packet_processor + event_handler fallback) don't double-store messages. """ import asyncio -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import MagicMock, patch import pytest @@ -410,7 +410,8 @@ class TestDualPathDedup: 1. Primary: RX_LOG_DATA → packet_processor (decrypts with private key) 2. Fallback: CONTACT_MSG_RECV → on_contact_message (MeshCore library decoded) - The fallback uses INSERT OR IGNORE to avoid double-storage when both fire. + The fallback path should reconcile against the packet path instead of creating + a second row, and should still add new path observations when available. """ @pytest.mark.asyncio @@ -457,19 +458,7 @@ class TestDualPathDedup: "sender_timestamp": SENDER_TIMESTAMP, } - # Mock contact lookup to return a contact with the right key - mock_contact = MagicMock() - mock_contact.public_key = CONTACT_PUB - mock_contact.type = 1 # Client, not repeater - mock_contact.name = "TestContact" - - with ( - patch("app.event_handlers.ContactRepository") as mock_contact_repo, - patch("app.event_handlers.broadcast_event", mock_broadcast), - ): - mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact) - mock_contact_repo.update_last_contacted = AsyncMock() - + with patch("app.event_handlers.broadcast_event", mock_broadcast): await on_contact_message(mock_event) # No additional message broadcast should have been sent @@ -538,18 +527,7 @@ class TestDualPathDedup: "sender_timestamp": SENDER_TIMESTAMP, } - mock_contact = MagicMock() - mock_contact.public_key = upper_key # Uppercase from DB - mock_contact.type = 1 - mock_contact.name = "TestContact" - - with ( - patch("app.event_handlers.ContactRepository") as mock_contact_repo, - patch("app.event_handlers.broadcast_event", mock_broadcast), - ): - mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact) - mock_contact_repo.update_last_contacted = AsyncMock() - + with patch("app.event_handlers.broadcast_event", mock_broadcast): await on_contact_message(mock_event) # Should NOT create a second message (dedup catches it thanks to .lower()) @@ -564,6 +542,146 @@ class TestDualPathDedup: ) assert len(messages) == 1 + @pytest.mark.asyncio + async def test_event_handler_duplicate_adds_path_to_existing_dm( + self, test_db, captured_broadcasts + ): + """Fallback DM duplicates should reconcile path updates onto the stored message.""" + from app.event_handlers import on_contact_message + from app.packet_processor import create_dm_message_from_decrypted + + await ContactRepository.upsert( + { + "public_key": CONTACT_PUB.lower(), + "name": "TestContact", + "type": 1, + "last_seen": SENDER_TIMESTAMP, + "last_contacted": SENDER_TIMESTAMP, + "first_seen": SENDER_TIMESTAMP, + "on_radio": False, + "out_path_hash_mode": 0, + } + ) + + pkt_id, _ = await RawPacketRepository.create(b"primary_with_no_path", SENDER_TIMESTAMP) + decrypted = DecryptedDirectMessage( + timestamp=SENDER_TIMESTAMP, + flags=0, + message="Dual path with route update", + dest_hash="fa", + src_hash="a1", + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + msg_id = await create_dm_message_from_decrypted( + packet_id=pkt_id, + decrypted=decrypted, + their_public_key=CONTACT_PUB, + our_public_key=OUR_PUB, + received_at=SENDER_TIMESTAMP, + outgoing=False, + ) + + assert msg_id is not None + broadcasts.clear() + + mock_event = MagicMock() + mock_event.payload = { + "public_key": CONTACT_PUB, + "text": "Dual path with route update", + "txt_type": 0, + "sender_timestamp": SENDER_TIMESTAMP, + "path": "bbcc", + "path_len": 2, + } + + with patch("app.event_handlers.broadcast_event", mock_broadcast): + await on_contact_message(mock_event) + + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert message_broadcasts == [] + + ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"] + assert len(ack_broadcasts) == 1 + assert ack_broadcasts[0]["data"]["message_id"] == msg_id + assert ack_broadcasts[0]["data"]["ack_count"] == 0 + assert any(p["path"] == "bbcc" for p in ack_broadcasts[0]["data"]["paths"]) + + msg = await MessageRepository.get_by_id(msg_id) + assert msg is not None + assert msg.paths is not None + assert any(p.path == "bbcc" for p in msg.paths) + + @pytest.mark.asyncio + async def test_fallback_path_duplicate_reconciles_path_without_new_row( + self, test_db, captured_broadcasts + ): + """Repeated fallback DMs should keep one row and merge path observations.""" + from app.event_handlers import on_contact_message + + await ContactRepository.upsert( + { + "public_key": CONTACT_PUB.lower(), + "name": "FallbackOnly", + "type": 1, + "last_seen": SENDER_TIMESTAMP, + "last_contacted": SENDER_TIMESTAMP, + "first_seen": SENDER_TIMESTAMP, + "on_radio": False, + "out_path_hash_mode": 0, + } + ) + + broadcasts, mock_broadcast = captured_broadcasts + + first_event = MagicMock() + first_event.payload = { + "public_key": CONTACT_PUB, + "text": "Fallback duplicate route test", + "txt_type": 0, + "sender_timestamp": SENDER_TIMESTAMP, + } + + with patch("app.event_handlers.broadcast_event", mock_broadcast): + await on_contact_message(first_event) + + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10 + ) + assert len(messages) == 1 + msg_id = messages[0].id + + broadcasts.clear() + + second_event = MagicMock() + second_event.payload = { + "public_key": CONTACT_PUB, + "text": "Fallback duplicate route test", + "txt_type": 0, + "sender_timestamp": SENDER_TIMESTAMP, + "path": "ddee", + "path_len": 2, + } + + with patch("app.event_handlers.broadcast_event", mock_broadcast): + await on_contact_message(second_event) + + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10 + ) + assert len(messages) == 1 + + message_broadcasts = [b for b in broadcasts if b["type"] == "message"] + assert message_broadcasts == [] + + ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"] + assert len(ack_broadcasts) == 1 + assert ack_broadcasts[0]["data"]["message_id"] == msg_id + assert ack_broadcasts[0]["data"]["ack_count"] == 0 + assert any(p["path"] == "ddee" for p in ack_broadcasts[0]["data"]["paths"]) + class TestDirectMessageDirectionDetection: """Test src_hash/dest_hash direction detection in _process_direct_message.