diff --git a/app/radio_sync.py b/app/radio_sync.py index 4e831e4..cd57de8 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -29,7 +29,10 @@ from app.repository import ( ChannelRepository, ContactRepository, ) -from app.services.contact_reconciliation import reconcile_contact_messages +from app.services.contact_reconciliation import ( + promote_prefix_contacts_for_contact, + reconcile_contact_messages, +) from app.services.messages import create_fallback_channel_message from app.services.radio_runtime import radio_runtime as radio_manager from app.websocket import broadcast_error, broadcast_event @@ -63,13 +66,25 @@ async def _reconcile_contact_messages_background( public_key: str, contact_name: str | None, ) -> None: - """Run contact/message reconciliation outside the radio critical path.""" + """Run prefix promotion and contact/message reconciliation outside the radio critical path.""" try: + promoted_keys = await promote_prefix_contacts_for_contact( + public_key=public_key, + log=logger, + ) await reconcile_contact_messages( public_key=public_key, contact_name=contact_name, log=logger, ) + if promoted_keys: + contact = await ContactRepository.get_by_key(public_key.lower()) + if contact is not None: + for old_key in promoted_keys: + broadcast_event( + "contact_resolved", + {"previous_public_key": old_key, "contact": contact.model_dump()}, + ) except Exception as exc: logger.warning( "Background contact reconciliation failed for %s: %s", diff --git a/app/repository/contacts.py b/app/repository/contacts.py index ae5e310..0250725 100644 --- a/app/repository/contacts.py +++ b/app/repository/contacts.py @@ -1,3 +1,4 @@ +import logging import time from collections.abc import Mapping from typing import Any @@ -12,6 +13,8 @@ from app.models import ( ) from app.path_utils import first_hop_hex, normalize_contact_route, normalize_route_override +logger = logging.getLogger(__name__) + class AmbiguousPublicKeyPrefixError(ValueError): """Raised when a public key prefix matches multiple contacts.""" @@ -500,7 +503,13 @@ class ContactRepository: (old_key,), ) match_row = await match_cursor.fetchone() - if (match_row["match_count"] if match_row is not None else 0) != 1: + match_count = match_row["match_count"] if match_row is not None else 0 + if match_count != 1: + logger.warning( + "Skipping prefix promotion for %s: %d full-key contacts match (expected 1)", + old_key, + match_count, + ) continue await migrate_child_rows(old_key, normalized_full_key) @@ -529,7 +538,12 @@ class ContactRepository: WHEN ? < contacts.first_seen THEN ? ELSE contacts.first_seen END, - last_read_at = COALESCE(contacts.last_read_at, ?) + last_read_at = CASE + WHEN contacts.last_read_at IS NULL THEN ? + WHEN ? IS NULL THEN contacts.last_read_at + WHEN ? > contacts.last_read_at THEN ? + ELSE contacts.last_read_at + END WHERE public_key = ? """, ( @@ -546,6 +560,9 @@ class ContactRepository: row["first_seen"], row["first_seen"], row["last_read_at"], + row["last_read_at"], + row["last_read_at"], + row["last_read_at"], normalized_full_key, ), ) diff --git a/app/repository/messages.py b/app/repository/messages.py index 7c1c1aa..5070e62 100644 --- a/app/repository/messages.py +++ b/app/repository/messages.py @@ -158,7 +158,11 @@ class MessageRepository: """ lower_key = full_key.lower() cursor = await db.conn.execute( - """UPDATE messages SET conversation_key = ? + """UPDATE messages SET conversation_key = ?, + sender_key = CASE + WHEN sender_key IS NOT NULL AND length(sender_key) < 64 + AND ? LIKE sender_key || '%' + THEN ? ELSE sender_key END WHERE type = 'PRIV' AND length(conversation_key) < 64 AND ? LIKE conversation_key || '%' AND ( @@ -166,7 +170,7 @@ class MessageRepository: WHERE length(public_key) = 64 AND public_key LIKE messages.conversation_key || '%' ) = 1""", - (lower_key, lower_key), + (lower_key, lower_key, lower_key, lower_key), ) await db.conn.commit() return cursor.rowcount diff --git a/app/routers/contacts.py b/app/routers/contacts.py index ac86a8c..0ef2a97 100644 --- a/app/routers/contacts.py +++ b/app/routers/contacts.py @@ -1,6 +1,7 @@ import asyncio import logging import random +import time from contextlib import suppress from fastapi import APIRouter, BackgroundTasks, HTTPException, Query @@ -32,7 +33,7 @@ from app.repository import ( ) from app.services.contact_reconciliation import ( promote_prefix_contacts_for_contact, - reconcile_contact_messages, + record_contact_name_and_reconcile, ) from app.services.radio_runtime import radio_runtime as radio_manager @@ -278,12 +279,18 @@ async def create_contact( # Check if contact already exists existing = await ContactRepository.get_by_key(request.public_key) if existing: - # Update name if provided + # Update name if provided and record name history if request.name: await ContactRepository.upsert(existing.to_upsert(name=request.name)) refreshed = await ContactRepository.get_by_key(request.public_key) if refreshed is not None: existing = refreshed + await record_contact_name_and_reconcile( + public_key=request.public_key, + contact_name=request.name, + timestamp=int(time.time()), + log=logger, + ) promoted_keys = await promote_prefix_contacts_for_contact( public_key=request.public_key, @@ -318,9 +325,10 @@ async def create_contact( log=logger, ) - await reconcile_contact_messages( + await record_contact_name_and_reconcile( public_key=lower_key, contact_name=request.name, + timestamp=int(time.time()), log=logger, ) diff --git a/app/routers/radio.py b/app/routers/radio.py index 920697a..097c426 100644 --- a/app/routers/radio.py +++ b/app/routers/radio.py @@ -24,7 +24,10 @@ from app.models import ( from app.radio_sync import send_advertisement as do_send_advertisement from app.radio_sync import sync_radio_time from app.repository import ContactRepository -from app.services.contact_reconciliation import promote_prefix_contacts_for_contact +from app.services.contact_reconciliation import ( + promote_prefix_contacts_for_contact, + reconcile_contact_messages, +) from app.services.radio_commands import ( KeystoreRefreshError, PathHashModeUnsupportedError, @@ -214,11 +217,19 @@ async def _persist_new_discovery_contacts(results: list[RadioDiscoveryResult]) - public_key=result.public_key, log=logger, ) + await reconcile_contact_messages( + public_key=result.public_key, + contact_name=result.name, + log=logger, + ) created = await ContactRepository.get_by_key(result.public_key) if created is not None: broadcast_event("contact", created.model_dump()) - for old_key in promoted_keys: - broadcast_event("contact_deleted", {"public_key": old_key}) + for old_key in promoted_keys: + broadcast_event( + "contact_resolved", + {"previous_public_key": old_key, "contact": created.model_dump()}, + ) async def _attach_known_names(results: list[RadioDiscoveryResult]) -> None: diff --git a/tests/test_radio_router.py b/tests/test_radio_router.py index a3a9ac0..3774654 100644 --- a/tests/test_radio_router.py +++ b/tests/test_radio_router.py @@ -381,6 +381,11 @@ class TestDiscoverMesh: new_callable=AsyncMock, return_value=[], ), + patch( + "app.routers.radio.reconcile_contact_messages", + new_callable=AsyncMock, + return_value=(0, 0), + ), patch("app.routers.radio.broadcast_event"), ): response = await discover_mesh(RadioDiscoveryRequest(target="repeaters")) @@ -454,6 +459,11 @@ class TestDiscoverMesh: new_callable=AsyncMock, return_value=[], ) as mock_promote, + patch( + "app.routers.radio.reconcile_contact_messages", + new_callable=AsyncMock, + return_value=(0, 0), + ), patch("app.routers.radio.broadcast_event") as mock_broadcast, ): response = await discover_mesh(RadioDiscoveryRequest(target="repeaters")) @@ -779,6 +789,11 @@ class TestTracePath: new_callable=AsyncMock, return_value=[], ), + patch( + "app.routers.radio.reconcile_contact_messages", + new_callable=AsyncMock, + return_value=(0, 0), + ), patch("app.routers.radio.broadcast_event"), ): response = await discover_mesh(RadioDiscoveryRequest(target="all")) diff --git a/tests/test_radio_sync.py b/tests/test_radio_sync.py index 6b92809..134ebbb 100644 --- a/tests/test_radio_sync.py +++ b/tests/test_radio_sync.py @@ -884,6 +884,11 @@ class TestSyncAndOffloadContacts: return task with ( + patch( + "app.radio_sync.promote_prefix_contacts_for_contact", + new_callable=AsyncMock, + return_value=[], + ), patch("app.radio_sync.reconcile_contact_messages", side_effect=_slow_reconcile), patch("app.radio_sync.asyncio.create_task", side_effect=_capture_task), ):