Unify our DM ingest

This commit is contained in:
Jack Kingsman
2026-03-16 16:36:11 -07:00
parent dbb8dd4c43
commit d373585527
4 changed files with 512 additions and 245 deletions

View File

@@ -4,19 +4,21 @@ from typing import TYPE_CHECKING
from meshcore import EventType
from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert
from app.models import Contact, ContactUpsert
from app.packet_processor import process_raw_packet
from app.repository import (
AmbiguousPublicKeyPrefixError,
ContactRepository,
)
from app.services import dm_ack_tracker
from app.services.contact_reconciliation import (
claim_prefix_messages_for_contact,
promote_prefix_contacts_for_contact,
record_contact_name_and_reconcile,
)
from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast
from app.services.dm_ingest import (
ingest_fallback_direct_message,
resolve_fallback_direct_message_context,
)
from app.services.messages import increment_ack_and_broadcast
from app.websocket import broadcast_event
if TYPE_CHECKING:
@@ -51,8 +53,8 @@ async def on_contact_message(event: "Event") -> None:
2. The packet processor couldn't match the sender to a known contact
The packet processor handles: decryption, storage, broadcast, bot trigger.
This handler only stores if the packet processor didn't already handle it
(detected via INSERT OR IGNORE returning None for duplicates).
This handler adapts CONTACT_MSG_RECV payloads into the shared DM ingest
workflow, which reconciles duplicates against the packet pipeline when possible.
"""
payload = event.payload
@@ -66,54 +68,27 @@ async def on_contact_message(event: "Event") -> None:
sender_pubkey = payload.get("public_key") or payload.get("pubkey_prefix", "")
received_at = int(time.time())
# Look up contact from database - use prefix lookup only if needed
# (get_by_key_or_prefix does exact match first, then prefix fallback)
try:
contact = await ContactRepository.get_by_key_or_prefix(sender_pubkey)
except AmbiguousPublicKeyPrefixError:
logger.warning(
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
sender_pubkey,
context = await resolve_fallback_direct_message_context(
sender_public_key=sender_pubkey,
received_at=received_at,
broadcast_fn=broadcast_event,
contact_repository=ContactRepository,
log=logger,
)
if context.skip_storage:
logger.debug(
"Skipping message from repeater %s (not stored in chat history)",
context.conversation_key[:12],
)
contact = None
if contact:
sender_pubkey = contact.public_key.lower()
return
# Promote any prefix-stored messages to this full key
await claim_prefix_messages_for_contact(public_key=sender_pubkey, log=logger)
# Skip messages from repeaters - they only send CLI responses, not chat messages.
# CLI responses are handled by the command endpoint and txt_type filter above.
if contact.type == CONTACT_TYPE_REPEATER:
logger.debug(
"Skipping message from repeater %s (not stored in chat history)",
sender_pubkey[:12],
)
return
elif sender_pubkey:
placeholder_upsert = ContactUpsert(
public_key=sender_pubkey.lower(),
type=0,
last_seen=received_at,
last_contacted=received_at,
first_seen=received_at,
on_radio=False,
out_path_hash_mode=-1,
)
await ContactRepository.upsert(placeholder_upsert)
contact = await ContactRepository.get_by_key(sender_pubkey.lower())
if contact:
broadcast_event("contact", contact.model_dump())
# Try to create message - INSERT OR IGNORE handles duplicates atomically
# If the packet processor already stored this message, this returns None
# Try to create or reconcile the message via the shared DM ingest service.
ts = payload.get("sender_timestamp")
sender_timestamp = ts if ts is not None else received_at
sender_name = contact.name if contact else None
path = payload.get("path")
path_len = payload.get("path_len")
message = await create_fallback_direct_message(
conversation_key=sender_pubkey,
message = await ingest_fallback_direct_message(
conversation_key=context.conversation_key,
text=payload.get("text", ""),
sender_timestamp=sender_timestamp,
received_at=received_at,
@@ -121,23 +96,24 @@ async def on_contact_message(event: "Event") -> None:
path_len=path_len,
txt_type=txt_type,
signature=payload.get("signature"),
sender_name=sender_name,
sender_key=sender_pubkey,
sender_name=context.sender_name,
sender_key=context.sender_key,
broadcast_fn=broadcast_event,
update_last_contacted_key=context.contact.public_key.lower() if context.contact else None,
)
if message is None:
# Already handled by packet processor (or exact duplicate) - nothing more to do
logger.debug("DM from %s already processed by packet processor", sender_pubkey[:12])
logger.debug(
"DM from %s already processed by packet processor", context.conversation_key[:12]
)
return
# If we get here, the packet processor didn't handle this message
# (likely because private key export is not available)
logger.debug("DM from %s handled by event handler (fallback path)", sender_pubkey[:12])
# Update contact last_contacted (contact was already fetched above)
if contact:
await ContactRepository.update_last_contacted(sender_pubkey, received_at)
logger.debug(
"DM from %s handled by event handler (fallback path)", context.conversation_key[:12]
)
async def on_rx_log_data(event: "Event") -> None:

320
app/services/dm_ingest.py Normal file
View File

@@ -0,0 +1,320 @@
import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from app.models import CONTACT_TYPE_REPEATER, Contact, ContactUpsert, Message
from app.repository import (
AmbiguousPublicKeyPrefixError,
ContactRepository,
MessageRepository,
RawPacketRepository,
)
from app.services.contact_reconciliation import claim_prefix_messages_for_contact
from app.services.messages import (
broadcast_message,
build_message_model,
build_message_paths,
format_contact_log_target,
handle_duplicate_message,
reconcile_duplicate_message,
truncate_for_log,
)
if TYPE_CHECKING:
from app.decoder import DecryptedDirectMessage
logger = logging.getLogger(__name__)
BroadcastFn = Callable[..., Any]
_decrypted_dm_store_lock = asyncio.Lock()
@dataclass(frozen=True)
class FallbackDirectMessageContext:
conversation_key: str
contact: Contact | None
sender_name: str | None
sender_key: str | None
skip_storage: bool = False
async def _prepare_resolved_contact(
contact: Contact,
*,
log: logging.Logger | None = None,
) -> tuple[str, bool]:
conversation_key = contact.public_key.lower()
await claim_prefix_messages_for_contact(public_key=conversation_key, log=log or logger)
if contact.type == CONTACT_TYPE_REPEATER:
return conversation_key, True
return conversation_key, False
async def resolve_fallback_direct_message_context(
*,
sender_public_key: str,
received_at: int,
broadcast_fn: BroadcastFn,
contact_repository=ContactRepository,
log: logging.Logger | None = None,
) -> FallbackDirectMessageContext:
normalized_sender = sender_public_key.lower()
try:
contact = await contact_repository.get_by_key_or_prefix(normalized_sender)
except AmbiguousPublicKeyPrefixError:
(log or logger).warning(
"DM sender prefix '%s' is ambiguous; storing under prefix until full key is known",
sender_public_key,
)
contact = None
if contact is not None:
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=log)
return FallbackDirectMessageContext(
conversation_key=conversation_key,
contact=contact,
sender_name=contact.name,
sender_key=conversation_key,
skip_storage=skip_storage,
)
if normalized_sender:
placeholder_upsert = ContactUpsert(
public_key=normalized_sender,
type=0,
last_seen=received_at,
last_contacted=received_at,
first_seen=received_at,
on_radio=False,
out_path_hash_mode=-1,
)
await contact_repository.upsert(placeholder_upsert)
contact = await contact_repository.get_by_key(normalized_sender)
if contact is not None:
broadcast_fn("contact", contact.model_dump())
return FallbackDirectMessageContext(
conversation_key=normalized_sender,
contact=contact,
sender_name=contact.name if contact else None,
sender_key=normalized_sender or None,
)
async def _store_direct_message(
*,
packet_id: int | None,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
outgoing: bool,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
realtime: bool,
broadcast_fn: BroadcastFn,
update_last_contacted_key: str | None,
best_effort_content_dedup: bool,
linked_packet_dedup: bool,
message_repository=MessageRepository,
contact_repository=ContactRepository,
raw_packet_repository=RawPacketRepository,
) -> Message | None:
async def store() -> Message | None:
if linked_packet_dedup and packet_id is not None:
linked_message_id = await raw_packet_repository.get_linked_message_id(packet_id)
if linked_message_id is not None:
existing_msg = await message_repository.get_by_id(linked_message_id)
if existing_msg is not None:
await reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received_at,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
if best_effort_content_dedup:
existing_msg = await message_repository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if existing_msg is not None:
await reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received_at,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
msg_id = await message_repository.create(
msg_type="PRIV",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
received_at=received_at,
path=path,
path_len=path_len,
txt_type=txt_type,
signature=signature,
outgoing=outgoing,
sender_key=sender_key,
sender_name=sender_name,
)
if msg_id is None:
await handle_duplicate_message(
packet_id=packet_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
path=path,
received_at=received_at,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
if packet_id is not None:
await raw_packet_repository.mark_decrypted(packet_id, msg_id)
message = build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=build_message_paths(path, received_at, path_len),
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
outgoing=outgoing,
sender_name=sender_name,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn, realtime=realtime)
if update_last_contacted_key:
await contact_repository.update_last_contacted(update_last_contacted_key, received_at)
return message
if linked_packet_dedup:
async with _decrypted_dm_store_lock:
return await store()
return await store()
async def ingest_decrypted_direct_message(
*,
packet_id: int,
decrypted: "DecryptedDirectMessage",
their_public_key: str,
received_at: int | None = None,
path: str | None = None,
path_len: int | None = None,
outgoing: bool = False,
realtime: bool = True,
broadcast_fn: BroadcastFn,
contact_repository=ContactRepository,
) -> Message | None:
conversation_key = their_public_key.lower()
contact = await contact_repository.get_by_key(conversation_key)
sender_name: str | None = None
if contact is not None:
conversation_key, skip_storage = await _prepare_resolved_contact(contact, log=logger)
if skip_storage:
logger.debug(
"Skipping message from repeater %s (CLI responses not stored): %s",
conversation_key[:12],
(decrypted.message or "")[:50],
)
return None
if not outgoing:
sender_name = contact.name
received = received_at or int(time.time())
message = await _store_direct_message(
packet_id=packet_id,
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
path=path,
path_len=path_len,
outgoing=outgoing,
txt_type=0,
signature=None,
sender_name=sender_name,
sender_key=conversation_key if not outgoing else None,
realtime=realtime,
broadcast_fn=broadcast_fn,
update_last_contacted_key=conversation_key,
best_effort_content_dedup=outgoing,
linked_packet_dedup=True,
)
if message is None:
return None
logger.info(
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
truncate_for_log(decrypted.message),
format_contact_log_target(contact.name if contact else None, conversation_key),
message.id,
conversation_key,
outgoing,
)
return message
async def ingest_fallback_direct_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
broadcast_fn: BroadcastFn,
update_last_contacted_key: str | None = None,
) -> Message | None:
return await _store_direct_message(
packet_id=None,
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
path=path,
path_len=path_len,
outgoing=False,
txt_type=txt_type,
signature=signature,
sender_name=sender_name,
sender_key=sender_key,
realtime=True,
broadcast_fn=broadcast_fn,
update_last_contacted_key=update_last_contacted_key,
best_effort_content_dedup=True,
linked_packet_dedup=False,
)

View File

@@ -1,10 +1,9 @@
import asyncio
import logging
import time
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from app.models import CONTACT_TYPE_REPEATER, Message, MessagePath
from app.models import Message, MessagePath
from app.repository import ContactRepository, MessageRepository, RawPacketRepository
if TYPE_CHECKING:
@@ -14,10 +13,9 @@ logger = logging.getLogger(__name__)
BroadcastFn = Callable[..., Any]
LOG_MESSAGE_PREVIEW_LEN = 32
_decrypted_dm_store_lock = asyncio.Lock()
def _truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str:
def truncate_for_log(text: str, max_chars: int = LOG_MESSAGE_PREVIEW_LEN) -> str:
"""Return a compact single-line message preview for log output."""
normalized = " ".join(text.split())
if len(normalized) <= max_chars:
@@ -30,7 +28,7 @@ def _format_channel_log_target(channel_name: str | None, channel_key: str) -> st
return channel_name or channel_key
def _format_contact_log_target(contact_name: str | None, public_key: str) -> str:
def format_contact_log_target(contact_name: str | None, public_key: str) -> str:
"""Return a human-friendly DM target label for logs."""
return contact_name or public_key[:12]
@@ -127,7 +125,7 @@ async def increment_ack_and_broadcast(
return ack_count
async def _reconcile_duplicate_message(
async def reconcile_duplicate_message(
*,
existing_msg: Message,
packet_id: int | None,
@@ -194,7 +192,7 @@ async def handle_duplicate_message(
)
return
await _reconcile_duplicate_message(
await reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
@@ -257,7 +255,7 @@ async def create_message_from_decrypted(
logger.info(
'Stored channel message "%s" for %r (msg ID %d in chan ID %s)',
_truncate_for_log(text),
truncate_for_log(text),
_format_channel_log_target(channel_name, channel_key_normalized),
msg_id,
channel_key_normalized,
@@ -298,165 +296,20 @@ async def create_dm_message_from_decrypted(
broadcast_fn: BroadcastFn,
) -> int | None:
"""Store and broadcast a decrypted direct message."""
contact = await ContactRepository.get_by_key(their_public_key)
if contact and contact.type == CONTACT_TYPE_REPEATER:
logger.debug(
"Skipping message from repeater %s (CLI responses not stored): %s",
their_public_key[:12],
(decrypted.message or "")[:50],
)
return None
from app.services.dm_ingest import ingest_decrypted_direct_message
received = received_at or int(time.time())
conversation_key = their_public_key.lower()
sender_name = contact.name if contact and not outgoing else None
async with _decrypted_dm_store_lock:
linked_message_id = await RawPacketRepository.get_linked_message_id(packet_id)
if linked_message_id is not None:
existing_msg = await MessageRepository.get_by_id(linked_message_id)
if existing_msg is not None:
await _reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
if outgoing:
existing_msg = await MessageRepository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
)
if existing_msg is not None:
await _reconcile_duplicate_message(
existing_msg=existing_msg,
packet_id=packet_id,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
msg_id = await MessageRepository.create(
msg_type="PRIV",
text=decrypted.message,
conversation_key=conversation_key,
sender_timestamp=decrypted.timestamp,
received_at=received,
path=path,
path_len=path_len,
outgoing=outgoing,
sender_key=conversation_key if not outgoing else None,
sender_name=sender_name,
)
if msg_id is None:
await handle_duplicate_message(
packet_id=packet_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
path=path,
received_at=received,
path_len=path_len,
broadcast_fn=broadcast_fn,
)
return None
logger.info(
'Stored direct message "%s" for %r (msg ID %d in contact ID %s, outgoing=%s)',
_truncate_for_log(decrypted.message),
_format_contact_log_target(contact.name if contact else None, conversation_key),
msg_id,
conversation_key,
outgoing,
)
await RawPacketRepository.mark_decrypted(packet_id, msg_id)
broadcast_message(
message=build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=decrypted.message,
sender_timestamp=decrypted.timestamp,
received_at=received,
paths=build_message_paths(path, received, path_len),
outgoing=outgoing,
sender_name=sender_name,
sender_key=conversation_key if not outgoing else None,
),
broadcast_fn=broadcast_fn,
realtime=realtime,
)
await ContactRepository.update_last_contacted(conversation_key, received)
return msg_id
async def create_fallback_direct_message(
*,
conversation_key: str,
text: str,
sender_timestamp: int,
received_at: int,
path: str | None,
path_len: int | None,
txt_type: int,
signature: str | None,
sender_name: str | None,
sender_key: str | None,
broadcast_fn: BroadcastFn,
message_repository=MessageRepository,
) -> Message | None:
"""Store and broadcast a CONTACT_MSG_RECV fallback direct message."""
existing = await message_repository.get_by_content(
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
)
if existing is not None:
return None
msg_id = await message_repository.create(
msg_type="PRIV",
text=text,
conversation_key=conversation_key,
sender_timestamp=sender_timestamp,
message = await ingest_decrypted_direct_message(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=their_public_key,
received_at=received_at,
path=path,
path_len=path_len,
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
sender_name=sender_name,
outgoing=outgoing,
realtime=realtime,
broadcast_fn=broadcast_fn,
)
if msg_id is None:
return None
message = build_message_model(
message_id=msg_id,
msg_type="PRIV",
conversation_key=conversation_key,
text=text,
sender_timestamp=sender_timestamp,
received_at=received_at,
paths=build_message_paths(path, received_at, path_len),
txt_type=txt_type,
signature=signature,
sender_key=sender_key,
sender_name=sender_name,
)
broadcast_message(message=message, broadcast_fn=broadcast_fn)
return message
return message.id if message is not None else None
async def create_fallback_channel_message(

View File

@@ -7,7 +7,7 @@ paths (packet_processor + event_handler fallback) don't double-store messages.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import MagicMock, patch
import pytest
@@ -410,7 +410,8 @@ class TestDualPathDedup:
1. Primary: RX_LOG_DATA → packet_processor (decrypts with private key)
2. Fallback: CONTACT_MSG_RECV → on_contact_message (MeshCore library decoded)
The fallback uses INSERT OR IGNORE to avoid double-storage when both fire.
The fallback path should reconcile against the packet path instead of creating
a second row, and should still add new path observations when available.
"""
@pytest.mark.asyncio
@@ -457,19 +458,7 @@ class TestDualPathDedup:
"sender_timestamp": SENDER_TIMESTAMP,
}
# Mock contact lookup to return a contact with the right key
mock_contact = MagicMock()
mock_contact.public_key = CONTACT_PUB
mock_contact.type = 1 # Client, not repeater
mock_contact.name = "TestContact"
with (
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
patch("app.event_handlers.broadcast_event", mock_broadcast),
):
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
mock_contact_repo.update_last_contacted = AsyncMock()
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(mock_event)
# No additional message broadcast should have been sent
@@ -538,18 +527,7 @@ class TestDualPathDedup:
"sender_timestamp": SENDER_TIMESTAMP,
}
mock_contact = MagicMock()
mock_contact.public_key = upper_key # Uppercase from DB
mock_contact.type = 1
mock_contact.name = "TestContact"
with (
patch("app.event_handlers.ContactRepository") as mock_contact_repo,
patch("app.event_handlers.broadcast_event", mock_broadcast),
):
mock_contact_repo.get_by_key_or_prefix = AsyncMock(return_value=mock_contact)
mock_contact_repo.update_last_contacted = AsyncMock()
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(mock_event)
# Should NOT create a second message (dedup catches it thanks to .lower())
@@ -564,6 +542,146 @@ class TestDualPathDedup:
)
assert len(messages) == 1
@pytest.mark.asyncio
async def test_event_handler_duplicate_adds_path_to_existing_dm(
self, test_db, captured_broadcasts
):
"""Fallback DM duplicates should reconcile path updates onto the stored message."""
from app.event_handlers import on_contact_message
from app.packet_processor import create_dm_message_from_decrypted
await ContactRepository.upsert(
{
"public_key": CONTACT_PUB.lower(),
"name": "TestContact",
"type": 1,
"last_seen": SENDER_TIMESTAMP,
"last_contacted": SENDER_TIMESTAMP,
"first_seen": SENDER_TIMESTAMP,
"on_radio": False,
"out_path_hash_mode": 0,
}
)
pkt_id, _ = await RawPacketRepository.create(b"primary_with_no_path", SENDER_TIMESTAMP)
decrypted = DecryptedDirectMessage(
timestamp=SENDER_TIMESTAMP,
flags=0,
message="Dual path with route update",
dest_hash="fa",
src_hash="a1",
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
msg_id = await create_dm_message_from_decrypted(
packet_id=pkt_id,
decrypted=decrypted,
their_public_key=CONTACT_PUB,
our_public_key=OUR_PUB,
received_at=SENDER_TIMESTAMP,
outgoing=False,
)
assert msg_id is not None
broadcasts.clear()
mock_event = MagicMock()
mock_event.payload = {
"public_key": CONTACT_PUB,
"text": "Dual path with route update",
"txt_type": 0,
"sender_timestamp": SENDER_TIMESTAMP,
"path": "bbcc",
"path_len": 2,
}
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(mock_event)
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert message_broadcasts == []
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
assert len(ack_broadcasts) == 1
assert ack_broadcasts[0]["data"]["message_id"] == msg_id
assert ack_broadcasts[0]["data"]["ack_count"] == 0
assert any(p["path"] == "bbcc" for p in ack_broadcasts[0]["data"]["paths"])
msg = await MessageRepository.get_by_id(msg_id)
assert msg is not None
assert msg.paths is not None
assert any(p.path == "bbcc" for p in msg.paths)
@pytest.mark.asyncio
async def test_fallback_path_duplicate_reconciles_path_without_new_row(
self, test_db, captured_broadcasts
):
"""Repeated fallback DMs should keep one row and merge path observations."""
from app.event_handlers import on_contact_message
await ContactRepository.upsert(
{
"public_key": CONTACT_PUB.lower(),
"name": "FallbackOnly",
"type": 1,
"last_seen": SENDER_TIMESTAMP,
"last_contacted": SENDER_TIMESTAMP,
"first_seen": SENDER_TIMESTAMP,
"on_radio": False,
"out_path_hash_mode": 0,
}
)
broadcasts, mock_broadcast = captured_broadcasts
first_event = MagicMock()
first_event.payload = {
"public_key": CONTACT_PUB,
"text": "Fallback duplicate route test",
"txt_type": 0,
"sender_timestamp": SENDER_TIMESTAMP,
}
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(first_event)
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
)
assert len(messages) == 1
msg_id = messages[0].id
broadcasts.clear()
second_event = MagicMock()
second_event.payload = {
"public_key": CONTACT_PUB,
"text": "Fallback duplicate route test",
"txt_type": 0,
"sender_timestamp": SENDER_TIMESTAMP,
"path": "ddee",
"path_len": 2,
}
with patch("app.event_handlers.broadcast_event", mock_broadcast):
await on_contact_message(second_event)
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=CONTACT_PUB.lower(), limit=10
)
assert len(messages) == 1
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert message_broadcasts == []
ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"]
assert len(ack_broadcasts) == 1
assert ack_broadcasts[0]["data"]["message_id"] == msg_id
assert ack_broadcasts[0]["data"]["ack_count"] == 0
assert any(p["path"] == "ddee" for p in ack_broadcasts[0]["data"]["paths"])
class TestDirectMessageDirectionDetection:
"""Test src_hash/dest_hash direction detection in _process_direct_message.