From 088dcb39d6830bbc45f5a2cabf376fc2fea68f4f Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Mon, 9 Mar 2026 17:32:43 -0700 Subject: [PATCH] extract contact reconciliation service --- app/event_handlers.py | 26 ++--- app/packet_processor.py | 34 ++---- app/radio_sync.py | 25 +--- app/routers/contacts.py | 35 ++---- app/services/contact_reconciliation.py | 115 +++++++++++++++++++ tests/test_contact_reconciliation_service.py | 72 ++++++++++++ 6 files changed, 222 insertions(+), 85 deletions(-) create mode 100644 app/services/contact_reconciliation.py create mode 100644 tests/test_contact_reconciliation_service.py diff --git a/app/event_handlers.py b/app/event_handlers.py index 526e8fb..173986b 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -8,11 +8,13 @@ from app.models import CONTACT_TYPE_REPEATER, Contact from app.packet_processor import process_raw_packet from app.repository import ( AmbiguousPublicKeyPrefixError, - ContactNameHistoryRepository, ContactRepository, - MessageRepository, ) from app.services import dm_ack_tracker +from app.services.contact_reconciliation import ( + claim_prefix_messages_for_contact, + record_contact_name_and_reconcile, +) from app.services.messages import create_fallback_direct_message, increment_ack_and_broadcast from app.websocket import broadcast_event @@ -76,7 +78,7 @@ async def on_contact_message(event: "Event") -> None: sender_pubkey = contact.public_key.lower() # Promote any prefix-stored messages to this full key - await MessageRepository.claim_prefix_messages(sender_pubkey) + await claim_prefix_messages_for_contact(public_key=sender_pubkey, log=logger) # Skip messages from repeaters - they only send CLI responses, not chat messages. # CLI responses are handled by the command endpoint and txt_type filter above. @@ -232,19 +234,13 @@ async def on_new_contact(event: "Event") -> None: } await ContactRepository.upsert(contact_data) - # Record name history if contact has a name adv_name = payload.get("adv_name") - if adv_name: - await ContactNameHistoryRepository.record_name( - public_key.lower(), adv_name, int(time.time()) - ) - backfilled = await MessageRepository.backfill_channel_sender_key(public_key, adv_name) - if backfilled > 0: - logger.info( - "Backfilled sender_key on %d channel message(s) for %s", - backfilled, - adv_name, - ) + await record_contact_name_and_reconcile( + public_key=public_key, + contact_name=adv_name, + timestamp=int(time.time()), + log=logger, + ) # Read back from DB so the broadcast includes all fields (last_contacted, # last_read_at, etc.) matching the REST Contact shape exactly. diff --git a/app/packet_processor.py b/app/packet_processor.py index d676290..d8e09b6 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -36,11 +36,10 @@ from app.models import ( from app.repository import ( ChannelRepository, ContactAdvertPathRepository, - ContactNameHistoryRepository, ContactRepository, - MessageRepository, RawPacketRepository, ) +from app.services.contact_reconciliation import record_contact_name_and_reconcile from app.services.messages import ( create_dm_message_from_decrypted as _create_dm_message_from_decrypted, ) @@ -490,14 +489,6 @@ async def _process_advertisement( hop_count=new_path_len, ) - # Record name history - if advert.name: - await ContactNameHistoryRepository.record_name( - public_key=advert.public_key.lower(), - name=advert.name, - timestamp=timestamp, - ) - contact_data = { "public_key": advert.public_key.lower(), "name": advert.name, @@ -513,23 +504,12 @@ async def _process_advertisement( } await ContactRepository.upsert(contact_data) - claimed = await MessageRepository.claim_prefix_messages(advert.public_key.lower()) - if claimed > 0: - logger.info( - "Claimed %d prefix DM message(s) for contact %s", - claimed, - advert.public_key[:12], - ) - if advert.name: - backfilled = await MessageRepository.backfill_channel_sender_key( - advert.public_key, advert.name - ) - if backfilled > 0: - logger.info( - "Backfilled sender_key on %d channel message(s) for %s", - backfilled, - advert.name, - ) + await record_contact_name_and_reconcile( + public_key=advert.public_key, + contact_name=advert.name, + timestamp=timestamp, + log=logger, + ) # Read back from DB so the broadcast includes all fields (last_contacted, # last_read_at, flags, on_radio, etc.) matching the REST Contact shape exactly. diff --git a/app/radio_sync.py b/app/radio_sync.py index 0430be8..b7b4a47 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -24,8 +24,8 @@ from app.repository import ( AppSettingsRepository, ChannelRepository, ContactRepository, - MessageRepository, ) +from app.services.contact_reconciliation import reconcile_contact_messages logger = logging.getLogger(__name__) @@ -156,24 +156,11 @@ async def sync_and_offload_contacts(mc: MeshCore) -> dict: await ContactRepository.upsert( Contact.from_radio_dict(public_key, contact_data, on_radio=False) ) - claimed = await MessageRepository.claim_prefix_messages(public_key.lower()) - if claimed > 0: - logger.info( - "Claimed %d prefix DM message(s) for contact %s", - claimed, - public_key[:12], - ) - adv_name = contact_data.get("adv_name") - if adv_name: - backfilled = await MessageRepository.backfill_channel_sender_key( - public_key, adv_name - ) - if backfilled > 0: - logger.info( - "Backfilled sender_key on %d channel message(s) for %s", - backfilled, - adv_name, - ) + await reconcile_contact_messages( + public_key=public_key, + contact_name=contact_data.get("adv_name"), + log=logger, + ) synced += 1 # Remove from radio diff --git a/app/routers/contacts.py b/app/routers/contacts.py index f145312..ad1de33 100644 --- a/app/routers/contacts.py +++ b/app/routers/contacts.py @@ -26,6 +26,7 @@ from app.repository import ( ContactRepository, MessageRepository, ) +from app.services.contact_reconciliation import reconcile_contact_messages logger = logging.getLogger(__name__) @@ -181,18 +182,11 @@ async def create_contact( await ContactRepository.upsert(contact_data) logger.info("Created contact %s", lower_key[:12]) - # Promote any prefix-stored messages to this full key - claimed = await MessageRepository.claim_prefix_messages(lower_key) - if claimed > 0: - logger.info("Claimed %d prefix messages for contact %s", claimed, lower_key[:12]) - - # Backfill sender_key on channel messages that match this contact's name - if request.name: - backfilled = await MessageRepository.backfill_channel_sender_key(lower_key, request.name) - if backfilled > 0: - logger.info( - "Backfilled sender_key on %d channel message(s) for %s", backfilled, request.name - ) + await reconcile_contact_messages( + public_key=lower_key, + contact_name=request.name, + log=logger, + ) # Trigger historical decryption if requested if request.try_historical: @@ -318,18 +312,11 @@ async def sync_contacts_from_radio() -> dict: Contact.from_radio_dict(lower_key, contact_data, on_radio=True) ) synced_keys.append(lower_key) - claimed = await MessageRepository.claim_prefix_messages(lower_key) - if claimed > 0: - logger.info("Claimed %d prefix DM message(s) for contact %s", claimed, public_key[:12]) - adv_name = contact_data.get("adv_name") - if adv_name: - backfilled = await MessageRepository.backfill_channel_sender_key(lower_key, adv_name) - if backfilled > 0: - logger.info( - "Backfilled sender_key on %d channel message(s) for %s", - backfilled, - adv_name, - ) + await reconcile_contact_messages( + public_key=lower_key, + contact_name=contact_data.get("adv_name"), + log=logger, + ) count += 1 # Clear on_radio for contacts not found on the radio diff --git a/app/services/contact_reconciliation.py b/app/services/contact_reconciliation.py new file mode 100644 index 0000000..7b71dc4 --- /dev/null +++ b/app/services/contact_reconciliation.py @@ -0,0 +1,115 @@ +"""Shared contact/message reconciliation helpers.""" + +import logging + +from app.repository import ContactNameHistoryRepository, MessageRepository + +logger = logging.getLogger(__name__) + + +async def claim_prefix_messages_for_contact( + *, + public_key: str, + message_repository=MessageRepository, + log: logging.Logger | None = None, +) -> int: + """Promote prefix-key DMs to a resolved full public key.""" + normalized_key = public_key.lower() + claimed = await message_repository.claim_prefix_messages(normalized_key) + if claimed > 0: + (log or logger).info( + "Claimed %d prefix DM message(s) for contact %s", + claimed, + normalized_key[:12], + ) + return claimed + + +async def backfill_channel_sender_for_contact( + *, + public_key: str, + contact_name: str | None, + message_repository=MessageRepository, + log: logging.Logger | None = None, +) -> int: + """Backfill channel sender attribution once a contact name is known.""" + if not contact_name: + return 0 + + normalized_key = public_key.lower() + backfilled = await message_repository.backfill_channel_sender_key( + normalized_key, + contact_name, + ) + if backfilled > 0: + (log or logger).info( + "Backfilled sender_key on %d channel message(s) for %s", + backfilled, + contact_name, + ) + return backfilled + + +async def reconcile_contact_messages( + *, + public_key: str, + contact_name: str | None, + message_repository=MessageRepository, + log: logging.Logger | None = None, +) -> tuple[int, int]: + """Apply message reconciliation once a contact's identity is resolved.""" + claimed = await claim_prefix_messages_for_contact( + public_key=public_key, + message_repository=message_repository, + log=log, + ) + backfilled = await backfill_channel_sender_for_contact( + public_key=public_key, + contact_name=contact_name, + message_repository=message_repository, + log=log, + ) + return claimed, backfilled + + +async def record_contact_name( + *, + public_key: str, + contact_name: str | None, + timestamp: int, + contact_name_history_repository=ContactNameHistoryRepository, +) -> bool: + """Record contact name history when a non-empty name is available.""" + if not contact_name: + return False + + await contact_name_history_repository.record_name( + public_key.lower(), + contact_name, + timestamp, + ) + return True + + +async def record_contact_name_and_reconcile( + *, + public_key: str, + contact_name: str | None, + timestamp: int, + message_repository=MessageRepository, + contact_name_history_repository=ContactNameHistoryRepository, + log: logging.Logger | None = None, +) -> tuple[int, int]: + """Record name history, then reconcile message identity for the contact.""" + await record_contact_name( + public_key=public_key, + contact_name=contact_name, + timestamp=timestamp, + contact_name_history_repository=contact_name_history_repository, + ) + return await reconcile_contact_messages( + public_key=public_key, + contact_name=contact_name, + message_repository=message_repository, + log=log, + ) diff --git a/tests/test_contact_reconciliation_service.py b/tests/test_contact_reconciliation_service.py new file mode 100644 index 0000000..ccf3c00 --- /dev/null +++ b/tests/test_contact_reconciliation_service.py @@ -0,0 +1,72 @@ +"""Tests for shared contact/message reconciliation helpers.""" + +import pytest + +from app.repository import ContactNameHistoryRepository, ContactRepository, MessageRepository +from app.services.contact_reconciliation import ( + claim_prefix_messages_for_contact, + record_contact_name_and_reconcile, +) + + +@pytest.mark.asyncio +async def test_claim_prefix_messages_for_contact_promotes_prefix_dm(test_db): + public_key = "aa" * 32 + await ContactRepository.upsert({"public_key": public_key, "name": "Alice", "type": 1}) + + await MessageRepository.create( + msg_type="PRIV", + text="hello", + conversation_key=public_key[:12], + sender_timestamp=1000, + received_at=1000, + ) + + claimed = await claim_prefix_messages_for_contact(public_key=public_key) + + assert claimed == 1 + messages = await MessageRepository.get_all(conversation_key=public_key) + assert len(messages) == 1 + assert messages[0].conversation_key == public_key + + +@pytest.mark.asyncio +async def test_record_contact_name_and_reconcile_records_history_and_backfills(test_db): + public_key = "bb" * 32 + channel_key = "CC" * 16 + await ContactRepository.upsert({"public_key": public_key, "name": "Alice", "type": 1}) + + await MessageRepository.create( + msg_type="PRIV", + text="dm", + conversation_key=public_key[:12], + sender_timestamp=1000, + received_at=1000, + ) + await MessageRepository.create( + msg_type="CHAN", + text="Alice: hello", + conversation_key=channel_key, + sender_timestamp=1001, + received_at=1001, + sender_name="Alice", + ) + + claimed, backfilled = await record_contact_name_and_reconcile( + public_key=public_key, + contact_name="Alice", + timestamp=1234, + ) + + assert claimed == 1 + assert backfilled == 1 + + history = await ContactNameHistoryRepository.get_history(public_key) + assert len(history) == 1 + assert history[0].name == "Alice" + assert history[0].first_seen == 1234 + assert history[0].last_seen == 1234 + + messages = await MessageRepository.get_all(msg_type="CHAN", conversation_key=channel_key) + assert len(messages) == 1 + assert messages[0].sender_key == public_key