Emit correct events, update sender key, and don't let discovery path skip prefix promotion; other misc. fixes

This commit is contained in:
Jack Kingsman
2026-04-01 21:56:51 -07:00
parent 80c6cc44e5
commit 456f739f51
7 changed files with 87 additions and 12 deletions

View File

@@ -29,7 +29,10 @@ from app.repository import (
ChannelRepository,
ContactRepository,
)
from app.services.contact_reconciliation import reconcile_contact_messages
from app.services.contact_reconciliation import (
promote_prefix_contacts_for_contact,
reconcile_contact_messages,
)
from app.services.messages import create_fallback_channel_message
from app.services.radio_runtime import radio_runtime as radio_manager
from app.websocket import broadcast_error, broadcast_event
@@ -63,13 +66,25 @@ async def _reconcile_contact_messages_background(
public_key: str,
contact_name: str | None,
) -> None:
"""Run contact/message reconciliation outside the radio critical path."""
"""Run prefix promotion and contact/message reconciliation outside the radio critical path."""
try:
promoted_keys = await promote_prefix_contacts_for_contact(
public_key=public_key,
log=logger,
)
await reconcile_contact_messages(
public_key=public_key,
contact_name=contact_name,
log=logger,
)
if promoted_keys:
contact = await ContactRepository.get_by_key(public_key.lower())
if contact is not None:
for old_key in promoted_keys:
broadcast_event(
"contact_resolved",
{"previous_public_key": old_key, "contact": contact.model_dump()},
)
except Exception as exc:
logger.warning(
"Background contact reconciliation failed for %s: %s",

View File

@@ -1,3 +1,4 @@
import logging
import time
from collections.abc import Mapping
from typing import Any
@@ -12,6 +13,8 @@ from app.models import (
)
from app.path_utils import first_hop_hex, normalize_contact_route, normalize_route_override
logger = logging.getLogger(__name__)
class AmbiguousPublicKeyPrefixError(ValueError):
"""Raised when a public key prefix matches multiple contacts."""
@@ -500,7 +503,13 @@ class ContactRepository:
(old_key,),
)
match_row = await match_cursor.fetchone()
if (match_row["match_count"] if match_row is not None else 0) != 1:
match_count = match_row["match_count"] if match_row is not None else 0
if match_count != 1:
logger.warning(
"Skipping prefix promotion for %s: %d full-key contacts match (expected 1)",
old_key,
match_count,
)
continue
await migrate_child_rows(old_key, normalized_full_key)
@@ -529,7 +538,12 @@ class ContactRepository:
WHEN ? < contacts.first_seen THEN ?
ELSE contacts.first_seen
END,
last_read_at = COALESCE(contacts.last_read_at, ?)
last_read_at = CASE
WHEN contacts.last_read_at IS NULL THEN ?
WHEN ? IS NULL THEN contacts.last_read_at
WHEN ? > contacts.last_read_at THEN ?
ELSE contacts.last_read_at
END
WHERE public_key = ?
""",
(
@@ -546,6 +560,9 @@ class ContactRepository:
row["first_seen"],
row["first_seen"],
row["last_read_at"],
row["last_read_at"],
row["last_read_at"],
row["last_read_at"],
normalized_full_key,
),
)

View File

@@ -158,7 +158,11 @@ class MessageRepository:
"""
lower_key = full_key.lower()
cursor = await db.conn.execute(
"""UPDATE messages SET conversation_key = ?
"""UPDATE messages SET conversation_key = ?,
sender_key = CASE
WHEN sender_key IS NOT NULL AND length(sender_key) < 64
AND ? LIKE sender_key || '%'
THEN ? ELSE sender_key END
WHERE type = 'PRIV' AND length(conversation_key) < 64
AND ? LIKE conversation_key || '%'
AND (
@@ -166,7 +170,7 @@ class MessageRepository:
WHERE length(public_key) = 64
AND public_key LIKE messages.conversation_key || '%'
) = 1""",
(lower_key, lower_key),
(lower_key, lower_key, lower_key, lower_key),
)
await db.conn.commit()
return cursor.rowcount

View File

@@ -1,6 +1,7 @@
import asyncio
import logging
import random
import time
from contextlib import suppress
from fastapi import APIRouter, BackgroundTasks, HTTPException, Query
@@ -32,7 +33,7 @@ from app.repository import (
)
from app.services.contact_reconciliation import (
promote_prefix_contacts_for_contact,
reconcile_contact_messages,
record_contact_name_and_reconcile,
)
from app.services.radio_runtime import radio_runtime as radio_manager
@@ -278,12 +279,18 @@ async def create_contact(
# Check if contact already exists
existing = await ContactRepository.get_by_key(request.public_key)
if existing:
# Update name if provided
# Update name if provided and record name history
if request.name:
await ContactRepository.upsert(existing.to_upsert(name=request.name))
refreshed = await ContactRepository.get_by_key(request.public_key)
if refreshed is not None:
existing = refreshed
await record_contact_name_and_reconcile(
public_key=request.public_key,
contact_name=request.name,
timestamp=int(time.time()),
log=logger,
)
promoted_keys = await promote_prefix_contacts_for_contact(
public_key=request.public_key,
@@ -318,9 +325,10 @@ async def create_contact(
log=logger,
)
await reconcile_contact_messages(
await record_contact_name_and_reconcile(
public_key=lower_key,
contact_name=request.name,
timestamp=int(time.time()),
log=logger,
)

View File

@@ -24,7 +24,10 @@ from app.models import (
from app.radio_sync import send_advertisement as do_send_advertisement
from app.radio_sync import sync_radio_time
from app.repository import ContactRepository
from app.services.contact_reconciliation import promote_prefix_contacts_for_contact
from app.services.contact_reconciliation import (
promote_prefix_contacts_for_contact,
reconcile_contact_messages,
)
from app.services.radio_commands import (
KeystoreRefreshError,
PathHashModeUnsupportedError,
@@ -214,11 +217,19 @@ async def _persist_new_discovery_contacts(results: list[RadioDiscoveryResult]) -
public_key=result.public_key,
log=logger,
)
await reconcile_contact_messages(
public_key=result.public_key,
contact_name=result.name,
log=logger,
)
created = await ContactRepository.get_by_key(result.public_key)
if created is not None:
broadcast_event("contact", created.model_dump())
for old_key in promoted_keys:
broadcast_event("contact_deleted", {"public_key": old_key})
for old_key in promoted_keys:
broadcast_event(
"contact_resolved",
{"previous_public_key": old_key, "contact": created.model_dump()},
)
async def _attach_known_names(results: list[RadioDiscoveryResult]) -> None:

View File

@@ -381,6 +381,11 @@ class TestDiscoverMesh:
new_callable=AsyncMock,
return_value=[],
),
patch(
"app.routers.radio.reconcile_contact_messages",
new_callable=AsyncMock,
return_value=(0, 0),
),
patch("app.routers.radio.broadcast_event"),
):
response = await discover_mesh(RadioDiscoveryRequest(target="repeaters"))
@@ -454,6 +459,11 @@ class TestDiscoverMesh:
new_callable=AsyncMock,
return_value=[],
) as mock_promote,
patch(
"app.routers.radio.reconcile_contact_messages",
new_callable=AsyncMock,
return_value=(0, 0),
),
patch("app.routers.radio.broadcast_event") as mock_broadcast,
):
response = await discover_mesh(RadioDiscoveryRequest(target="repeaters"))
@@ -779,6 +789,11 @@ class TestTracePath:
new_callable=AsyncMock,
return_value=[],
),
patch(
"app.routers.radio.reconcile_contact_messages",
new_callable=AsyncMock,
return_value=(0, 0),
),
patch("app.routers.radio.broadcast_event"),
):
response = await discover_mesh(RadioDiscoveryRequest(target="all"))

View File

@@ -884,6 +884,11 @@ class TestSyncAndOffloadContacts:
return task
with (
patch(
"app.radio_sync.promote_prefix_contacts_for_contact",
new_callable=AsyncMock,
return_value=[],
),
patch("app.radio_sync.reconcile_contact_messages", side_effect=_slow_reconcile),
patch("app.radio_sync.asyncio.create_task", side_effect=_capture_task),
):