From 6e5256acce9b55772805c8177365d52b56fd2fa6 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Fri, 27 Mar 2026 12:49:01 -0700 Subject: [PATCH] Be more flexible about radio offload. Closes #118. --- app/main.py | 2 + app/radio.py | 3 + app/radio_sync.py | 306 +++++++++++++++++++++++++++++++++++---- tests/test_radio_sync.py | 135 ++++++++++++++++- 4 files changed, 408 insertions(+), 38 deletions(-) diff --git a/app/main.py b/app/main.py index 911635b..ddbe0bc 100644 --- a/app/main.py +++ b/app/main.py @@ -17,6 +17,7 @@ from app.frontend_static import ( ) from app.radio import RadioDisconnectedError from app.radio_sync import ( + stop_background_contact_reconciliation, stop_message_polling, stop_periodic_advert, stop_periodic_sync, @@ -95,6 +96,7 @@ async def lifespan(app: FastAPI): pass await fanout_manager.stop_all() await radio_manager.stop_connection_monitor() + await stop_background_contact_reconciliation() await stop_message_polling() await stop_periodic_advert() await stop_periodic_sync() diff --git a/app/radio.py b/app/radio.py index a9e8c22..6f12d1f 100644 --- a/app/radio.py +++ b/app/radio.py @@ -548,11 +548,14 @@ class RadioManager: async def disconnect(self) -> None: """Disconnect from the radio.""" + from app.radio_sync import stop_background_contact_reconciliation + clear_keys() self._reset_reconnect_error_broadcasts() if self._meshcore is None: return + await stop_background_contact_reconciliation() await self._acquire_operation_lock("disconnect", blocking=True) try: mc = self._meshcore diff --git a/app/radio_sync.py b/app/radio_sync.py index 9ad6c1f..eb488c7 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -166,6 +166,9 @@ async def pause_polling(): # Background task handle _sync_task: asyncio.Task | None = None +# Startup/background contact reconciliation task handle +_contact_reconcile_task: asyncio.Task | None = None + # Periodic maintenance check interval in seconds (5 minutes) SYNC_INTERVAL = 300 @@ -266,30 +269,7 @@ async def sync_and_offload_contacts(mc: MeshCore) -> dict: remove_result = await mc.commands.remove_contact(contact_data) if remove_result.type == EventType.OK: removed += 1 - - # LIBRARY INTERNAL FIXUP: The MeshCore library's - # commands.remove_contact() sends the remove command over - # the wire but does NOT update the library's in-memory - # contact cache (mc._contacts). This is a gap in the - # library — there's no public API to clear a single - # contact from the cache, and the library only refreshes - # it on a full get_contacts() call. - # - # Why this matters: sync_recent_contacts_to_radio() uses - # mc.get_contact_by_key_prefix() to check whether a - # contact is already loaded on the radio. That method - # searches mc._contacts. If we don't evict the removed - # contact from the cache here, get_contact_by_key_prefix() - # will still find it and skip the add_contact() call — - # meaning contacts never get loaded back onto the radio - # after offload. The result: no DM ACKs, degraded routing - # for potentially minutes until the next periodic sync - # refreshes the cache from the (now-empty) radio. - # - # We access mc._contacts directly because the library - # exposes it as a read-only property (mc.contacts) with - # no removal API. The dict is keyed by public_key string. - mc._contacts.pop(public_key, None) + _evict_removed_contact_from_library_cache(mc, public_key) else: logger.warning( "Failed to remove contact %s: %s", public_key[:12], remove_result.payload @@ -461,28 +441,28 @@ async def ensure_default_channels() -> None: async def sync_and_offload_all(mc: MeshCore) -> dict: - """Sync and offload both contacts and channels, then ensure defaults exist.""" + """Run fast startup sync, then background contact reconcile.""" logger.info("Starting full radio sync and offload") # Contact on_radio is legacy/stale metadata. Clear it during the offload/reload # cycle so old rows stop claiming radio residency we do not actively track. await ContactRepository.clear_on_radio_except([]) - contacts_result = await sync_and_offload_contacts(mc) + contacts_result = await sync_contacts_from_radio(mc) channels_result = await sync_and_offload_channels(mc) # Ensure default channels exist await ensure_default_channels() - # Reload favorites plus a working-set fill back onto the radio immediately. - # Pass mc directly since the caller already holds the radio operation lock - # (asyncio.Lock is not reentrant). - reload_result = await sync_recent_contacts_to_radio(force=True, mc=mc) + start_background_contact_reconciliation( + initial_radio_contacts=contacts_result.get("radio_contacts", {}), + expected_mc=mc, + ) return { "contacts": contacts_result, "channels": channels_result, - "reloaded": reload_result, + "contact_reconcile_started": True, } @@ -1036,6 +1016,270 @@ async def stop_periodic_sync(): # Throttling for contact sync to radio _last_contact_sync: float = 0.0 CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds +CONTACT_RECONCILE_BATCH_SIZE = 2 +CONTACT_RECONCILE_YIELD_SECONDS = 0.05 + + +def _evict_removed_contact_from_library_cache(mc: MeshCore, public_key: str) -> None: + """Keep the library's contact cache consistent after a successful removal.""" + # LIBRARY INTERNAL FIXUP: The MeshCore library's remove_contact() sends the + # remove command over the wire but does NOT update the library's in-memory + # contact cache (mc._contacts). This is a gap in the library — there's no + # public API to clear a single contact from the cache, and the library only + # refreshes it on a full get_contacts() call. + # + # Why this matters: contact sync and targeted ensure/load paths use + # mc.get_contact_by_key_prefix() to check whether a contact is already + # loaded on the radio. That method searches mc._contacts. If we don't evict + # the removed contact from the cache here, later syncs will still find it + # and skip add_contact() calls, leaving the radio without the contact even + # though the app thinks it is resident. + mc._contacts.pop(public_key, None) + + +def _normalize_radio_contacts_payload(contacts: dict | None) -> dict[str, dict]: + """Return radio contacts keyed by normalized lowercase full public key.""" + normalized: dict[str, dict] = {} + for public_key, contact_data in (contacts or {}).items(): + normalized[str(public_key).lower()] = contact_data + return normalized + + +async def sync_contacts_from_radio(mc: MeshCore) -> dict: + """Pull contacts from the radio and persist them to the database without removing them.""" + synced = 0 + + try: + result = await mc.commands.get_contacts() + + if result is None or result.type == EventType.ERROR: + logger.error( + "Failed to get contacts from radio: %s. " + "If you see this repeatedly, the radio may be visible on the " + "serial/TCP/BLE port but not responding to commands. Check for " + "another process with the serial port open (other RemoteTerm " + "instances, serial monitors, etc.), verify the firmware is " + "up-to-date and in client mode (not repeater), or try a " + "power cycle.", + result, + ) + return {"synced": 0, "radio_contacts": {}, "error": str(result)} + + contacts = _normalize_radio_contacts_payload(result.payload) + logger.info("Found %d contacts on radio", len(contacts)) + + for public_key, contact_data in contacts.items(): + await ContactRepository.upsert( + ContactUpsert.from_radio_dict(public_key, contact_data, on_radio=False) + ) + asyncio.create_task( + _reconcile_contact_messages_background( + public_key, + contact_data.get("adv_name"), + ) + ) + synced += 1 + + logger.info("Synced %d contacts from radio snapshot", synced) + return {"synced": synced, "radio_contacts": contacts} + except Exception as e: + logger.error("Error during contact snapshot sync: %s", e) + return {"synced": synced, "radio_contacts": {}, "error": str(e)} + + +async def _reconcile_radio_contacts_in_background( + *, + initial_radio_contacts: dict[str, dict], + expected_mc: MeshCore, +) -> None: + """Converge radio contacts toward the desired favorites+recents working set.""" + radio_contacts = dict(initial_radio_contacts) + removed = 0 + loaded = 0 + failed = 0 + + try: + while True: + if not radio_manager.is_connected or radio_manager.meshcore is not expected_mc: + logger.info("Stopping background contact reconcile: radio transport changed") + break + + selected_contacts = await get_contacts_selected_for_radio_sync() + desired_contacts = { + contact.public_key.lower(): contact + for contact in selected_contacts + if len(contact.public_key) >= 64 + } + removable_keys = [key for key in radio_contacts if key not in desired_contacts] + missing_contacts = [ + contact for key, contact in desired_contacts.items() if key not in radio_contacts + ] + + if not removable_keys and not missing_contacts: + logger.info( + "Background contact reconcile complete: %d contacts on radio working set", + len(radio_contacts), + ) + break + + progressed = False + try: + async with radio_manager.radio_operation( + "background_contact_reconcile", + blocking=False, + ) as mc: + if mc is not expected_mc: + logger.info( + "Stopping background contact reconcile: radio transport changed" + ) + break + + budget = CONTACT_RECONCILE_BATCH_SIZE + selected_contacts = await get_contacts_selected_for_radio_sync() + desired_contacts = { + contact.public_key.lower(): contact + for contact in selected_contacts + if len(contact.public_key) >= 64 + } + + for public_key in list(radio_contacts): + if budget <= 0: + break + if public_key in desired_contacts: + continue + + remove_payload = ( + mc.get_contact_by_key_prefix(public_key[:12]) + or radio_contacts.get(public_key) + or {"public_key": public_key} + ) + try: + remove_result = await mc.commands.remove_contact(remove_payload) + except Exception as exc: + failed += 1 + budget -= 1 + logger.warning( + "Error removing contact %s during background reconcile: %s", + public_key[:12], + exc, + ) + continue + + budget -= 1 + if remove_result.type == EventType.OK: + radio_contacts.pop(public_key, None) + _evict_removed_contact_from_library_cache(mc, public_key) + removed += 1 + progressed = True + else: + failed += 1 + logger.warning( + "Failed to remove contact %s during background reconcile: %s", + public_key[:12], + remove_result.payload, + ) + + if budget > 0: + for public_key, contact in desired_contacts.items(): + if budget <= 0: + break + if public_key in radio_contacts: + continue + + if mc.get_contact_by_key_prefix(public_key[:12]): + radio_contacts[public_key] = {"public_key": public_key} + continue + + try: + add_payload = contact.to_radio_dict() + add_result = await mc.commands.add_contact(add_payload) + except Exception as exc: + failed += 1 + budget -= 1 + logger.warning( + "Error adding contact %s during background reconcile: %s", + public_key[:12], + exc, + exc_info=True, + ) + continue + + budget -= 1 + if add_result.type == EventType.OK: + radio_contacts[public_key] = add_payload + loaded += 1 + progressed = True + else: + failed += 1 + reason = add_result.payload + hint = "" + if reason is None: + hint = ( + " (no response from radio — if this repeats, check for " + "serial port contention from another process or try a " + "power cycle)" + ) + logger.warning( + "Failed to add contact %s during background reconcile: %s%s", + public_key[:12], + reason, + hint, + ) + except RadioOperationBusyError: + logger.debug("Background contact reconcile yielding: radio busy") + + await asyncio.sleep(CONTACT_RECONCILE_YIELD_SECONDS) + if not progressed: + continue + except asyncio.CancelledError: + logger.info("Background contact reconcile task cancelled") + raise + except Exception as exc: + logger.error("Background contact reconcile failed: %s", exc, exc_info=True) + finally: + if removed > 0 or loaded > 0 or failed > 0: + logger.info( + "Background contact reconcile summary: removed %d, loaded %d, failed %d", + removed, + loaded, + failed, + ) + + +def start_background_contact_reconciliation( + *, + initial_radio_contacts: dict[str, dict], + expected_mc: MeshCore, +) -> None: + """Start or replace the background contact reconcile task for the current radio.""" + global _contact_reconcile_task + + if _contact_reconcile_task is not None and not _contact_reconcile_task.done(): + _contact_reconcile_task.cancel() + + _contact_reconcile_task = asyncio.create_task( + _reconcile_radio_contacts_in_background( + initial_radio_contacts=initial_radio_contacts, + expected_mc=expected_mc, + ) + ) + logger.info( + "Started background contact reconcile for %d radio contact(s)", + len(initial_radio_contacts), + ) + + +async def stop_background_contact_reconciliation() -> None: + """Stop the background contact reconcile task.""" + global _contact_reconcile_task + + if _contact_reconcile_task and not _contact_reconcile_task.done(): + _contact_reconcile_task.cancel() + try: + await _contact_reconcile_task + except asyncio.CancelledError: + pass + _contact_reconcile_task = None async def get_contacts_selected_for_radio_sync() -> list[Contact]: diff --git a/tests/test_radio_sync.py b/tests/test_radio_sync.py index 3f10009..e9c5a1f 100644 --- a/tests/test_radio_sync.py +++ b/tests/test_radio_sync.py @@ -5,12 +5,14 @@ contact/channel sync operations, and default channel management. """ import asyncio +from contextlib import asynccontextmanager from unittest.mock import AsyncMock, MagicMock, call, patch import pytest from meshcore import EventType from meshcore.events import Event +import app.radio_sync as radio_sync from app.models import Favorite from app.radio import RadioManager, radio_manager from app.radio_sync import ( @@ -36,8 +38,6 @@ from app.repository import ( @pytest.fixture(autouse=True) def reset_sync_state(): """Reset polling pause state, sync timestamp, and radio_manager before/after each test.""" - import app.radio_sync as radio_sync - prev_mc = radio_manager._meshcore prev_lock = radio_manager._operation_lock prev_max_channels = radio_manager.max_channels @@ -45,12 +45,20 @@ def reset_sync_state(): prev_slot_by_key = radio_manager._channel_slot_by_key.copy() prev_key_by_slot = radio_manager._channel_key_by_slot.copy() prev_pending_channel_key_by_slot = radio_manager._pending_message_channel_key_by_slot.copy() + prev_contact_reconcile_task = radio_sync._contact_reconcile_task radio_sync._polling_pause_count = 0 radio_sync._last_contact_sync = 0.0 yield + if ( + radio_sync._contact_reconcile_task is not None + and radio_sync._contact_reconcile_task is not prev_contact_reconcile_task + and not radio_sync._contact_reconcile_task.done() + ): + radio_sync._contact_reconcile_task.cancel() radio_sync._polling_pause_count = 0 radio_sync._last_contact_sync = 0.0 + radio_sync._contact_reconcile_task = prev_contact_reconcile_task radio_manager._meshcore = prev_mc radio_manager._operation_lock = prev_lock radio_manager.max_channels = prev_max_channels @@ -433,7 +441,7 @@ class TestSyncAndOffloadAll: """Test session-local contact radio residency reset behavior.""" @pytest.mark.asyncio - async def test_clears_stale_contact_on_radio_flags_before_reload(self, test_db): + async def test_clears_stale_contact_on_radio_flags_before_background_reconcile(self, test_db): await _insert_contact(KEY_A, "Alice", on_radio=True) await _insert_contact(KEY_B, "Bob", on_radio=True) @@ -441,8 +449,8 @@ class TestSyncAndOffloadAll: with ( patch( - "app.radio_sync.sync_and_offload_contacts", - new=AsyncMock(return_value={"synced": 0, "removed": 0}), + "app.radio_sync.sync_contacts_from_radio", + new=AsyncMock(return_value={"synced": 0, "radio_contacts": {}}), ), patch( "app.radio_sync.sync_and_offload_channels", @@ -450,8 +458,7 @@ class TestSyncAndOffloadAll: ), patch("app.radio_sync.ensure_default_channels", new=AsyncMock()), patch( - "app.radio_sync.sync_recent_contacts_to_radio", - new=AsyncMock(return_value={"loaded": 0, "already_on_radio": 0, "failed": 0}), + "app.radio_sync.start_background_contact_reconciliation", ), ): await sync_and_offload_all(mock_mc) @@ -461,6 +468,28 @@ class TestSyncAndOffloadAll: assert alice is not None and alice.on_radio is False assert bob is not None and bob.on_radio is False + @pytest.mark.asyncio + async def test_starts_background_contact_reconcile_with_radio_snapshot(self, test_db): + mock_mc = MagicMock() + radio_contacts = {KEY_A: {"public_key": KEY_A}} + + with ( + patch( + "app.radio_sync.sync_contacts_from_radio", + new=AsyncMock(return_value={"synced": 1, "radio_contacts": radio_contacts}), + ), + patch( + "app.radio_sync.sync_and_offload_channels", + new=AsyncMock(return_value={"synced": 0, "cleared": 0}), + ), + patch("app.radio_sync.ensure_default_channels", new=AsyncMock()), + patch("app.radio_sync.start_background_contact_reconciliation") as mock_start, + ): + result = await sync_and_offload_all(mock_mc) + + mock_start.assert_called_once_with(initial_radio_contacts=radio_contacts, expected_mc=mock_mc) + assert result["contact_reconcile_started"] is True + @pytest.mark.asyncio async def test_advert_fill_skips_repeaters(self, test_db): """Recent advert fallback only considers non-repeaters.""" @@ -1036,6 +1065,98 @@ class TestSyncAndOffloadContacts: assert KEY_A in mock_mc._contacts +class TestBackgroundContactReconcile: + """Test the yielding background contact reconcile loop.""" + + @pytest.mark.asyncio + async def test_rechecks_desired_set_before_deleting_contact(self, test_db): + await _insert_contact(KEY_A, "Alice", last_contacted=2000) + await _insert_contact(KEY_B, "Bob", last_contacted=1000) + alice = await ContactRepository.get_by_key(KEY_A) + bob = await ContactRepository.get_by_key(KEY_B) + assert alice is not None + assert bob is not None + + mock_mc = MagicMock() + mock_mc.is_connected = True + mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None) + mock_mc.commands.remove_contact = AsyncMock(return_value=MagicMock(type=EventType.OK)) + mock_mc.commands.add_contact = AsyncMock(return_value=MagicMock(type=EventType.OK)) + radio_manager._meshcore = mock_mc + + @asynccontextmanager + async def _radio_operation(*args, **kwargs): + del args, kwargs + yield mock_mc + + with ( + patch.object( + radio_sync.radio_manager, + "radio_operation", + side_effect=lambda *args, **kwargs: _radio_operation(*args, **kwargs), + ), + patch( + "app.radio_sync.get_contacts_selected_for_radio_sync", + side_effect=[[bob], [alice, bob], [alice, bob]], + ), + patch("app.radio_sync.asyncio.sleep", new=AsyncMock()), + ): + await radio_sync._reconcile_radio_contacts_in_background( + initial_radio_contacts={KEY_A: {"public_key": KEY_A}}, + expected_mc=mock_mc, + ) + + mock_mc.commands.remove_contact.assert_not_called() + mock_mc.commands.add_contact.assert_awaited_once() + payload = mock_mc.commands.add_contact.call_args.args[0] + assert payload["public_key"] == KEY_B + + @pytest.mark.asyncio + async def test_yields_radio_lock_every_two_contact_operations(self, test_db): + await _insert_contact(KEY_A, "Alice", last_contacted=3000) + await _insert_contact(KEY_B, "Bob", last_contacted=2000) + extra_key = "cc" * 32 + await _insert_contact(extra_key, "Carol", last_contacted=1000) + + mock_mc = MagicMock() + mock_mc.is_connected = True + mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None) + mock_mc.commands.remove_contact = AsyncMock(return_value=MagicMock(type=EventType.OK)) + mock_mc.commands.add_contact = AsyncMock() + radio_manager._meshcore = mock_mc + + acquire_count = 0 + + @asynccontextmanager + async def _radio_operation(*args, **kwargs): + del args, kwargs + nonlocal acquire_count + acquire_count += 1 + yield mock_mc + + with ( + patch.object( + radio_sync.radio_manager, + "radio_operation", + side_effect=lambda *args, **kwargs: _radio_operation(*args, **kwargs), + ), + patch("app.radio_sync.get_contacts_selected_for_radio_sync", return_value=[]), + patch("app.radio_sync.asyncio.sleep", new=AsyncMock()), + ): + await radio_sync._reconcile_radio_contacts_in_background( + initial_radio_contacts={ + KEY_A: {"public_key": KEY_A}, + KEY_B: {"public_key": KEY_B}, + extra_key: {"public_key": extra_key}, + }, + expected_mc=mock_mc, + ) + + assert acquire_count == 2 + assert mock_mc.commands.remove_contact.await_count == 3 + mock_mc.commands.add_contact.assert_not_called() + + class TestSyncAndOffloadChannels: """Test sync_and_offload_channels: pull channels from radio, save to DB, clear from radio."""