Be more flexible about radio offload. Closes #118.

This commit is contained in:
Jack Kingsman
2026-03-27 12:49:01 -07:00
parent 7d27567ae9
commit 6e5256acce
4 changed files with 408 additions and 38 deletions
+2
View File
@@ -17,6 +17,7 @@ from app.frontend_static import (
)
from app.radio import RadioDisconnectedError
from app.radio_sync import (
stop_background_contact_reconciliation,
stop_message_polling,
stop_periodic_advert,
stop_periodic_sync,
@@ -95,6 +96,7 @@ async def lifespan(app: FastAPI):
pass
await fanout_manager.stop_all()
await radio_manager.stop_connection_monitor()
await stop_background_contact_reconciliation()
await stop_message_polling()
await stop_periodic_advert()
await stop_periodic_sync()
+3
View File
@@ -548,11 +548,14 @@ class RadioManager:
async def disconnect(self) -> None:
"""Disconnect from the radio."""
from app.radio_sync import stop_background_contact_reconciliation
clear_keys()
self._reset_reconnect_error_broadcasts()
if self._meshcore is None:
return
await stop_background_contact_reconciliation()
await self._acquire_operation_lock("disconnect", blocking=True)
try:
mc = self._meshcore
+275 -31
View File
@@ -166,6 +166,9 @@ async def pause_polling():
# Background task handle
_sync_task: asyncio.Task | None = None
# Startup/background contact reconciliation task handle
_contact_reconcile_task: asyncio.Task | None = None
# Periodic maintenance check interval in seconds (5 minutes)
SYNC_INTERVAL = 300
@@ -266,30 +269,7 @@ async def sync_and_offload_contacts(mc: MeshCore) -> dict:
remove_result = await mc.commands.remove_contact(contact_data)
if remove_result.type == EventType.OK:
removed += 1
# LIBRARY INTERNAL FIXUP: The MeshCore library's
# commands.remove_contact() sends the remove command over
# the wire but does NOT update the library's in-memory
# contact cache (mc._contacts). This is a gap in the
# library — there's no public API to clear a single
# contact from the cache, and the library only refreshes
# it on a full get_contacts() call.
#
# Why this matters: sync_recent_contacts_to_radio() uses
# mc.get_contact_by_key_prefix() to check whether a
# contact is already loaded on the radio. That method
# searches mc._contacts. If we don't evict the removed
# contact from the cache here, get_contact_by_key_prefix()
# will still find it and skip the add_contact() call —
# meaning contacts never get loaded back onto the radio
# after offload. The result: no DM ACKs, degraded routing
# for potentially minutes until the next periodic sync
# refreshes the cache from the (now-empty) radio.
#
# We access mc._contacts directly because the library
# exposes it as a read-only property (mc.contacts) with
# no removal API. The dict is keyed by public_key string.
mc._contacts.pop(public_key, None)
_evict_removed_contact_from_library_cache(mc, public_key)
else:
logger.warning(
"Failed to remove contact %s: %s", public_key[:12], remove_result.payload
@@ -461,28 +441,28 @@ async def ensure_default_channels() -> None:
async def sync_and_offload_all(mc: MeshCore) -> dict:
"""Sync and offload both contacts and channels, then ensure defaults exist."""
"""Run fast startup sync, then background contact reconcile."""
logger.info("Starting full radio sync and offload")
# Contact on_radio is legacy/stale metadata. Clear it during the offload/reload
# cycle so old rows stop claiming radio residency we do not actively track.
await ContactRepository.clear_on_radio_except([])
contacts_result = await sync_and_offload_contacts(mc)
contacts_result = await sync_contacts_from_radio(mc)
channels_result = await sync_and_offload_channels(mc)
# Ensure default channels exist
await ensure_default_channels()
# Reload favorites plus a working-set fill back onto the radio immediately.
# Pass mc directly since the caller already holds the radio operation lock
# (asyncio.Lock is not reentrant).
reload_result = await sync_recent_contacts_to_radio(force=True, mc=mc)
start_background_contact_reconciliation(
initial_radio_contacts=contacts_result.get("radio_contacts", {}),
expected_mc=mc,
)
return {
"contacts": contacts_result,
"channels": channels_result,
"reloaded": reload_result,
"contact_reconcile_started": True,
}
@@ -1036,6 +1016,270 @@ async def stop_periodic_sync():
# Throttling for contact sync to radio
_last_contact_sync: float = 0.0
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
CONTACT_RECONCILE_BATCH_SIZE = 2
CONTACT_RECONCILE_YIELD_SECONDS = 0.05
def _evict_removed_contact_from_library_cache(mc: MeshCore, public_key: str) -> None:
"""Keep the library's contact cache consistent after a successful removal."""
# LIBRARY INTERNAL FIXUP: The MeshCore library's remove_contact() sends the
# remove command over the wire but does NOT update the library's in-memory
# contact cache (mc._contacts). This is a gap in the library — there's no
# public API to clear a single contact from the cache, and the library only
# refreshes it on a full get_contacts() call.
#
# Why this matters: contact sync and targeted ensure/load paths use
# mc.get_contact_by_key_prefix() to check whether a contact is already
# loaded on the radio. That method searches mc._contacts. If we don't evict
# the removed contact from the cache here, later syncs will still find it
# and skip add_contact() calls, leaving the radio without the contact even
# though the app thinks it is resident.
mc._contacts.pop(public_key, None)
def _normalize_radio_contacts_payload(contacts: dict | None) -> dict[str, dict]:
"""Return radio contacts keyed by normalized lowercase full public key."""
normalized: dict[str, dict] = {}
for public_key, contact_data in (contacts or {}).items():
normalized[str(public_key).lower()] = contact_data
return normalized
async def sync_contacts_from_radio(mc: MeshCore) -> dict:
"""Pull contacts from the radio and persist them to the database without removing them."""
synced = 0
try:
result = await mc.commands.get_contacts()
if result is None or result.type == EventType.ERROR:
logger.error(
"Failed to get contacts from radio: %s. "
"If you see this repeatedly, the radio may be visible on the "
"serial/TCP/BLE port but not responding to commands. Check for "
"another process with the serial port open (other RemoteTerm "
"instances, serial monitors, etc.), verify the firmware is "
"up-to-date and in client mode (not repeater), or try a "
"power cycle.",
result,
)
return {"synced": 0, "radio_contacts": {}, "error": str(result)}
contacts = _normalize_radio_contacts_payload(result.payload)
logger.info("Found %d contacts on radio", len(contacts))
for public_key, contact_data in contacts.items():
await ContactRepository.upsert(
ContactUpsert.from_radio_dict(public_key, contact_data, on_radio=False)
)
asyncio.create_task(
_reconcile_contact_messages_background(
public_key,
contact_data.get("adv_name"),
)
)
synced += 1
logger.info("Synced %d contacts from radio snapshot", synced)
return {"synced": synced, "radio_contacts": contacts}
except Exception as e:
logger.error("Error during contact snapshot sync: %s", e)
return {"synced": synced, "radio_contacts": {}, "error": str(e)}
async def _reconcile_radio_contacts_in_background(
*,
initial_radio_contacts: dict[str, dict],
expected_mc: MeshCore,
) -> None:
"""Converge radio contacts toward the desired favorites+recents working set."""
radio_contacts = dict(initial_radio_contacts)
removed = 0
loaded = 0
failed = 0
try:
while True:
if not radio_manager.is_connected or radio_manager.meshcore is not expected_mc:
logger.info("Stopping background contact reconcile: radio transport changed")
break
selected_contacts = await get_contacts_selected_for_radio_sync()
desired_contacts = {
contact.public_key.lower(): contact
for contact in selected_contacts
if len(contact.public_key) >= 64
}
removable_keys = [key for key in radio_contacts if key not in desired_contacts]
missing_contacts = [
contact for key, contact in desired_contacts.items() if key not in radio_contacts
]
if not removable_keys and not missing_contacts:
logger.info(
"Background contact reconcile complete: %d contacts on radio working set",
len(radio_contacts),
)
break
progressed = False
try:
async with radio_manager.radio_operation(
"background_contact_reconcile",
blocking=False,
) as mc:
if mc is not expected_mc:
logger.info(
"Stopping background contact reconcile: radio transport changed"
)
break
budget = CONTACT_RECONCILE_BATCH_SIZE
selected_contacts = await get_contacts_selected_for_radio_sync()
desired_contacts = {
contact.public_key.lower(): contact
for contact in selected_contacts
if len(contact.public_key) >= 64
}
for public_key in list(radio_contacts):
if budget <= 0:
break
if public_key in desired_contacts:
continue
remove_payload = (
mc.get_contact_by_key_prefix(public_key[:12])
or radio_contacts.get(public_key)
or {"public_key": public_key}
)
try:
remove_result = await mc.commands.remove_contact(remove_payload)
except Exception as exc:
failed += 1
budget -= 1
logger.warning(
"Error removing contact %s during background reconcile: %s",
public_key[:12],
exc,
)
continue
budget -= 1
if remove_result.type == EventType.OK:
radio_contacts.pop(public_key, None)
_evict_removed_contact_from_library_cache(mc, public_key)
removed += 1
progressed = True
else:
failed += 1
logger.warning(
"Failed to remove contact %s during background reconcile: %s",
public_key[:12],
remove_result.payload,
)
if budget > 0:
for public_key, contact in desired_contacts.items():
if budget <= 0:
break
if public_key in radio_contacts:
continue
if mc.get_contact_by_key_prefix(public_key[:12]):
radio_contacts[public_key] = {"public_key": public_key}
continue
try:
add_payload = contact.to_radio_dict()
add_result = await mc.commands.add_contact(add_payload)
except Exception as exc:
failed += 1
budget -= 1
logger.warning(
"Error adding contact %s during background reconcile: %s",
public_key[:12],
exc,
exc_info=True,
)
continue
budget -= 1
if add_result.type == EventType.OK:
radio_contacts[public_key] = add_payload
loaded += 1
progressed = True
else:
failed += 1
reason = add_result.payload
hint = ""
if reason is None:
hint = (
" (no response from radio — if this repeats, check for "
"serial port contention from another process or try a "
"power cycle)"
)
logger.warning(
"Failed to add contact %s during background reconcile: %s%s",
public_key[:12],
reason,
hint,
)
except RadioOperationBusyError:
logger.debug("Background contact reconcile yielding: radio busy")
await asyncio.sleep(CONTACT_RECONCILE_YIELD_SECONDS)
if not progressed:
continue
except asyncio.CancelledError:
logger.info("Background contact reconcile task cancelled")
raise
except Exception as exc:
logger.error("Background contact reconcile failed: %s", exc, exc_info=True)
finally:
if removed > 0 or loaded > 0 or failed > 0:
logger.info(
"Background contact reconcile summary: removed %d, loaded %d, failed %d",
removed,
loaded,
failed,
)
def start_background_contact_reconciliation(
*,
initial_radio_contacts: dict[str, dict],
expected_mc: MeshCore,
) -> None:
"""Start or replace the background contact reconcile task for the current radio."""
global _contact_reconcile_task
if _contact_reconcile_task is not None and not _contact_reconcile_task.done():
_contact_reconcile_task.cancel()
_contact_reconcile_task = asyncio.create_task(
_reconcile_radio_contacts_in_background(
initial_radio_contacts=initial_radio_contacts,
expected_mc=expected_mc,
)
)
logger.info(
"Started background contact reconcile for %d radio contact(s)",
len(initial_radio_contacts),
)
async def stop_background_contact_reconciliation() -> None:
"""Stop the background contact reconcile task."""
global _contact_reconcile_task
if _contact_reconcile_task and not _contact_reconcile_task.done():
_contact_reconcile_task.cancel()
try:
await _contact_reconcile_task
except asyncio.CancelledError:
pass
_contact_reconcile_task = None
async def get_contacts_selected_for_radio_sync() -> list[Contact]:
+128 -7
View File
@@ -5,12 +5,14 @@ contact/channel sync operations, and default channel management.
"""
import asyncio
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, call, patch
import pytest
from meshcore import EventType
from meshcore.events import Event
import app.radio_sync as radio_sync
from app.models import Favorite
from app.radio import RadioManager, radio_manager
from app.radio_sync import (
@@ -36,8 +38,6 @@ from app.repository import (
@pytest.fixture(autouse=True)
def reset_sync_state():
"""Reset polling pause state, sync timestamp, and radio_manager before/after each test."""
import app.radio_sync as radio_sync
prev_mc = radio_manager._meshcore
prev_lock = radio_manager._operation_lock
prev_max_channels = radio_manager.max_channels
@@ -45,12 +45,20 @@ def reset_sync_state():
prev_slot_by_key = radio_manager._channel_slot_by_key.copy()
prev_key_by_slot = radio_manager._channel_key_by_slot.copy()
prev_pending_channel_key_by_slot = radio_manager._pending_message_channel_key_by_slot.copy()
prev_contact_reconcile_task = radio_sync._contact_reconcile_task
radio_sync._polling_pause_count = 0
radio_sync._last_contact_sync = 0.0
yield
if (
radio_sync._contact_reconcile_task is not None
and radio_sync._contact_reconcile_task is not prev_contact_reconcile_task
and not radio_sync._contact_reconcile_task.done()
):
radio_sync._contact_reconcile_task.cancel()
radio_sync._polling_pause_count = 0
radio_sync._last_contact_sync = 0.0
radio_sync._contact_reconcile_task = prev_contact_reconcile_task
radio_manager._meshcore = prev_mc
radio_manager._operation_lock = prev_lock
radio_manager.max_channels = prev_max_channels
@@ -433,7 +441,7 @@ class TestSyncAndOffloadAll:
"""Test session-local contact radio residency reset behavior."""
@pytest.mark.asyncio
async def test_clears_stale_contact_on_radio_flags_before_reload(self, test_db):
async def test_clears_stale_contact_on_radio_flags_before_background_reconcile(self, test_db):
await _insert_contact(KEY_A, "Alice", on_radio=True)
await _insert_contact(KEY_B, "Bob", on_radio=True)
@@ -441,8 +449,8 @@ class TestSyncAndOffloadAll:
with (
patch(
"app.radio_sync.sync_and_offload_contacts",
new=AsyncMock(return_value={"synced": 0, "removed": 0}),
"app.radio_sync.sync_contacts_from_radio",
new=AsyncMock(return_value={"synced": 0, "radio_contacts": {}}),
),
patch(
"app.radio_sync.sync_and_offload_channels",
@@ -450,8 +458,7 @@ class TestSyncAndOffloadAll:
),
patch("app.radio_sync.ensure_default_channels", new=AsyncMock()),
patch(
"app.radio_sync.sync_recent_contacts_to_radio",
new=AsyncMock(return_value={"loaded": 0, "already_on_radio": 0, "failed": 0}),
"app.radio_sync.start_background_contact_reconciliation",
),
):
await sync_and_offload_all(mock_mc)
@@ -461,6 +468,28 @@ class TestSyncAndOffloadAll:
assert alice is not None and alice.on_radio is False
assert bob is not None and bob.on_radio is False
@pytest.mark.asyncio
async def test_starts_background_contact_reconcile_with_radio_snapshot(self, test_db):
mock_mc = MagicMock()
radio_contacts = {KEY_A: {"public_key": KEY_A}}
with (
patch(
"app.radio_sync.sync_contacts_from_radio",
new=AsyncMock(return_value={"synced": 1, "radio_contacts": radio_contacts}),
),
patch(
"app.radio_sync.sync_and_offload_channels",
new=AsyncMock(return_value={"synced": 0, "cleared": 0}),
),
patch("app.radio_sync.ensure_default_channels", new=AsyncMock()),
patch("app.radio_sync.start_background_contact_reconciliation") as mock_start,
):
result = await sync_and_offload_all(mock_mc)
mock_start.assert_called_once_with(initial_radio_contacts=radio_contacts, expected_mc=mock_mc)
assert result["contact_reconcile_started"] is True
@pytest.mark.asyncio
async def test_advert_fill_skips_repeaters(self, test_db):
"""Recent advert fallback only considers non-repeaters."""
@@ -1036,6 +1065,98 @@ class TestSyncAndOffloadContacts:
assert KEY_A in mock_mc._contacts
class TestBackgroundContactReconcile:
"""Test the yielding background contact reconcile loop."""
@pytest.mark.asyncio
async def test_rechecks_desired_set_before_deleting_contact(self, test_db):
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
await _insert_contact(KEY_B, "Bob", last_contacted=1000)
alice = await ContactRepository.get_by_key(KEY_A)
bob = await ContactRepository.get_by_key(KEY_B)
assert alice is not None
assert bob is not None
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
mock_mc.commands.remove_contact = AsyncMock(return_value=MagicMock(type=EventType.OK))
mock_mc.commands.add_contact = AsyncMock(return_value=MagicMock(type=EventType.OK))
radio_manager._meshcore = mock_mc
@asynccontextmanager
async def _radio_operation(*args, **kwargs):
del args, kwargs
yield mock_mc
with (
patch.object(
radio_sync.radio_manager,
"radio_operation",
side_effect=lambda *args, **kwargs: _radio_operation(*args, **kwargs),
),
patch(
"app.radio_sync.get_contacts_selected_for_radio_sync",
side_effect=[[bob], [alice, bob], [alice, bob]],
),
patch("app.radio_sync.asyncio.sleep", new=AsyncMock()),
):
await radio_sync._reconcile_radio_contacts_in_background(
initial_radio_contacts={KEY_A: {"public_key": KEY_A}},
expected_mc=mock_mc,
)
mock_mc.commands.remove_contact.assert_not_called()
mock_mc.commands.add_contact.assert_awaited_once()
payload = mock_mc.commands.add_contact.call_args.args[0]
assert payload["public_key"] == KEY_B
@pytest.mark.asyncio
async def test_yields_radio_lock_every_two_contact_operations(self, test_db):
await _insert_contact(KEY_A, "Alice", last_contacted=3000)
await _insert_contact(KEY_B, "Bob", last_contacted=2000)
extra_key = "cc" * 32
await _insert_contact(extra_key, "Carol", last_contacted=1000)
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
mock_mc.commands.remove_contact = AsyncMock(return_value=MagicMock(type=EventType.OK))
mock_mc.commands.add_contact = AsyncMock()
radio_manager._meshcore = mock_mc
acquire_count = 0
@asynccontextmanager
async def _radio_operation(*args, **kwargs):
del args, kwargs
nonlocal acquire_count
acquire_count += 1
yield mock_mc
with (
patch.object(
radio_sync.radio_manager,
"radio_operation",
side_effect=lambda *args, **kwargs: _radio_operation(*args, **kwargs),
),
patch("app.radio_sync.get_contacts_selected_for_radio_sync", return_value=[]),
patch("app.radio_sync.asyncio.sleep", new=AsyncMock()),
):
await radio_sync._reconcile_radio_contacts_in_background(
initial_radio_contacts={
KEY_A: {"public_key": KEY_A},
KEY_B: {"public_key": KEY_B},
extra_key: {"public_key": extra_key},
},
expected_mc=mock_mc,
)
assert acquire_count == 2
assert mock_mc.commands.remove_contact.await_count == 3
mock_mc.commands.add_contact.assert_not_called()
class TestSyncAndOffloadChannels:
"""Test sync_and_offload_channels: pull channels from radio, save to DB, clear from radio."""