Be better about blind loading/auto-evict logging and run-through

This commit is contained in:
Jack Kingsman
2026-04-19 00:27:53 -07:00
parent 05493d06fc
commit c098f9eeb5
2 changed files with 290 additions and 95 deletions

View File

@@ -50,17 +50,17 @@ AdvertMode = Literal["flood", "zero_hop"]
_AUTO_ADD_OVERWRITE_OLDEST = 0x01
async def _enable_autoevict_on_radio(mc: MeshCore) -> None:
async def _enable_autoevict_on_radio(mc: MeshCore) -> bool:
"""Ensure the radio's AUTO_ADD_OVERWRITE_OLDEST preference bit is set."""
try:
current = await mc.commands.get_autoadd_config()
if current is None or current.type == EventType.ERROR:
logger.warning("Could not read autoadd config from radio: %s", current)
return
return False
current_flags = current.payload.get("config", 0)
if current_flags & _AUTO_ADD_OVERWRITE_OLDEST:
logger.debug("Radio autoevict already enabled (autoadd_config=0x%02x)", current_flags)
return
return True
new_flags = current_flags | _AUTO_ADD_OVERWRITE_OLDEST
result = await mc.commands.set_autoadd_config(new_flags)
if result is not None and result.type == EventType.OK:
@@ -69,10 +69,13 @@ async def _enable_autoevict_on_radio(mc: MeshCore) -> None:
current_flags,
new_flags,
)
return True
else:
logger.warning("Failed to enable radio autoevict: %s", result)
return False
except Exception as exc:
logger.warning("Error enabling radio autoevict: %s", exc)
return False
def _contact_sync_debug_fields(contact: Contact) -> dict[str, object]:
@@ -458,10 +461,16 @@ async def ensure_default_channels() -> None:
async def sync_and_offload_all(mc: MeshCore) -> dict:
"""Run fast startup sync, then background contact reconcile."""
autoevict = settings.load_with_autoevict
autoevict_requested = settings.load_with_autoevict
autoevict = False
if autoevict:
await _enable_autoevict_on_radio(mc)
if autoevict_requested:
autoevict = await _enable_autoevict_on_radio(mc)
if not autoevict:
logger.warning(
"Autoevict requested but unavailable; falling back to snapshot-based "
"background contact reconcile"
)
# Contact on_radio is legacy/stale metadata. Clear it during the offload/reload
# cycle so old rows stop claiming radio residency we do not actively track.
@@ -475,8 +484,7 @@ async def sync_and_offload_all(mc: MeshCore) -> dict:
contact_reconcile_started = False
if "error" in contacts_result and not autoevict:
# In normal mode, we can't reconcile blind — skip and warn.
# In autoevict mode, we can load blind because adds never fail.
# Without confirmed autoevict support we cannot reconcile blindly.
logger.warning("Skipping background contact reconcile — could not enumerate radio contacts")
broadcast_error(
"Could not enumerate radio contacts",
@@ -1161,14 +1169,17 @@ async def _reconcile_radio_contacts_in_background(
"""Converge radio contacts toward the desired favorites+recents working set.
When *autoevict* is ``True`` the removal phase is skipped entirely and the
radio's ``AUTO_ADD_OVERWRITE_OLDEST`` preference is assumed to be enabled,
so ``add_contact`` never returns ``TABLE_FULL``.
desired working set is blind-refreshed. Re-adding the full desired list
refreshes each contact's recency on supported firmware, so one successful
full pass converges the radio toward the desired working set without relying
on a stale contact snapshot.
"""
radio_contacts = dict(initial_radio_contacts)
removed = 0
loaded = 0
failed = 0
table_full = False
autoevict_next_index = 0
try:
while True:
@@ -1177,10 +1188,20 @@ async def _reconcile_radio_contacts_in_background(
break
selected_contacts = await get_contacts_selected_for_radio_sync()
desired_fill_contacts = [
contact for contact in selected_contacts if len(contact.public_key) >= 64
]
if autoevict:
if not desired_fill_contacts:
logger.info(
"Background contact blind fill complete: no desired contacts selected"
)
break
if autoevict_next_index >= len(desired_fill_contacts):
autoevict_next_index = 0
desired_contacts = {
contact.public_key.lower(): contact
for contact in selected_contacts
if len(contact.public_key) >= 64
contact.public_key.lower(): contact for contact in desired_fill_contacts
}
removable_keys = (
[] if autoevict else [key for key in radio_contacts if key not in desired_contacts]
@@ -1189,7 +1210,7 @@ async def _reconcile_radio_contacts_in_background(
contact for key, contact in desired_contacts.items() if key not in radio_contacts
]
if not removable_keys and not missing_contacts:
if not autoevict and not removable_keys and not missing_contacts:
logger.info(
"Background contact reconcile complete: %d contacts on radio working set",
len(radio_contacts),
@@ -1197,6 +1218,8 @@ async def _reconcile_radio_contacts_in_background(
break
progressed = False
autoevict_pass_complete = False
autoevict_pass_failed = False
try:
async with radio_manager.radio_operation(
"background_contact_reconcile",
@@ -1210,119 +1233,213 @@ async def _reconcile_radio_contacts_in_background(
budget = CONTACT_RECONCILE_BATCH_SIZE
selected_contacts = await get_contacts_selected_for_radio_sync()
desired_fill_contacts = [
contact for contact in selected_contacts if len(contact.public_key) >= 64
]
if autoevict and autoevict_next_index >= len(desired_fill_contacts):
autoevict_next_index = 0
desired_contacts = {
contact.public_key.lower(): contact
for contact in selected_contacts
if len(contact.public_key) >= 64
contact.public_key.lower(): contact for contact in desired_fill_contacts
}
for public_key in list(radio_contacts):
if budget <= 0:
break
if public_key in desired_contacts:
continue
remove_payload = (
mc.get_contact_by_key_prefix(public_key[:12])
or radio_contacts.get(public_key)
or {"public_key": public_key}
)
try:
remove_result = await mc.commands.remove_contact(remove_payload)
except Exception as exc:
failed += 1
budget -= 1
logger.warning(
"Error removing contact %s during background reconcile: %s",
public_key[:12],
exc,
)
continue
budget -= 1
if remove_result.type == EventType.OK:
radio_contacts.pop(public_key, None)
_evict_removed_contact_from_library_cache(mc, public_key)
removed += 1
progressed = True
else:
failed += 1
logger.warning(
"Failed to remove contact %s during background reconcile: %s",
public_key[:12],
remove_result.payload,
)
if budget > 0:
for public_key, contact in desired_contacts.items():
if not autoevict:
for public_key in list(radio_contacts):
if budget <= 0:
break
if public_key in radio_contacts:
continue
if mc.get_contact_by_key_prefix(public_key[:12]):
radio_contacts[public_key] = {"public_key": public_key}
if public_key in desired_contacts:
continue
remove_payload = (
mc.get_contact_by_key_prefix(public_key[:12])
or radio_contacts.get(public_key)
or {"public_key": public_key}
)
try:
add_payload = contact.to_radio_dict()
add_result = await mc.commands.add_contact(add_payload)
remove_result = await mc.commands.remove_contact(remove_payload)
except Exception as exc:
failed += 1
budget -= 1
logger.warning(
"Error adding contact %s during background reconcile: %s",
"Error removing contact %s during background reconcile: %s",
public_key[:12],
exc,
exc_info=True,
)
continue
budget -= 1
if add_result.type == EventType.OK:
radio_contacts[public_key] = add_payload
loaded += 1
if remove_result.type == EventType.OK:
radio_contacts.pop(public_key, None)
_evict_removed_contact_from_library_cache(mc, public_key)
removed += 1
progressed = True
else:
failed += 1
reason = add_result.payload
if isinstance(reason, dict) and reason.get("error_code") == 3:
logger.warning(
"Radio contact table full — stopping "
"contact reconcile (loaded %d this cycle)",
loaded,
)
table_full = True
break
hint = ""
if reason is None:
hint = (
" (no response from radio — if this repeats, check for "
"serial port contention from another process or try a "
"power cycle)"
)
logger.warning(
"Failed to add contact %s during background reconcile: %s%s",
"Failed to remove contact %s during background reconcile: %s",
public_key[:12],
reason,
hint,
remove_result.payload,
)
if budget > 0:
if autoevict:
batch_contacts = desired_fill_contacts[
autoevict_next_index : autoevict_next_index + budget
]
processed_contacts = 0
for contact in batch_contacts:
public_key = contact.public_key.lower()
try:
add_payload = contact.to_radio_dict()
add_result = await mc.commands.add_contact(add_payload)
except Exception as exc:
failed += 1
logger.warning(
"Error blind-filling contact %s during background reconcile: %s",
public_key[:12],
exc,
exc_info=True,
)
autoevict_pass_failed = True
processed_contacts += 1
continue
if add_result.type == EventType.OK:
radio_contacts[public_key] = add_payload
loaded += 1
progressed = True
else:
failed += 1
autoevict_pass_failed = True
reason = add_result.payload
if isinstance(reason, dict) and reason.get("error_code") == 3:
logger.warning(
"Radio contact table full — stopping "
"contact reconcile (loaded %d this cycle)",
loaded,
)
table_full = True
break
hint = ""
if reason is None:
hint = (
" (no response from radio — if this repeats, check for "
"serial port contention from another process or try a "
"power cycle)"
)
logger.warning(
"Failed to blind-fill contact %s during background reconcile: %s%s",
public_key[:12],
reason,
hint,
)
processed_contacts += 1
autoevict_next_index += processed_contacts
autoevict_pass_complete = autoevict_next_index >= len(
desired_fill_contacts
)
else:
for public_key, contact in desired_contacts.items():
if budget <= 0:
break
if public_key in radio_contacts:
continue
if mc.get_contact_by_key_prefix(public_key[:12]):
radio_contacts[public_key] = {"public_key": public_key}
continue
try:
add_payload = contact.to_radio_dict()
add_result = await mc.commands.add_contact(add_payload)
except Exception as exc:
failed += 1
budget -= 1
logger.warning(
"Error adding contact %s during background reconcile: %s",
public_key[:12],
exc,
exc_info=True,
)
continue
budget -= 1
if add_result.type == EventType.OK:
radio_contacts[public_key] = add_payload
loaded += 1
progressed = True
else:
failed += 1
reason = add_result.payload
if isinstance(reason, dict) and reason.get("error_code") == 3:
logger.warning(
"Radio contact table full — stopping "
"contact reconcile (loaded %d this cycle)",
loaded,
)
table_full = True
break
hint = ""
if reason is None:
hint = (
" (no response from radio — if this repeats, check for "
"serial port contention from another process or try a "
"power cycle)"
)
logger.warning(
"Failed to add contact %s during background reconcile: %s%s",
public_key[:12],
reason,
hint,
)
except RadioOperationBusyError:
logger.debug("Background contact reconcile yielding: radio busy")
await asyncio.sleep(CONTACT_RECONCILE_BUSY_BACKOFF_SECONDS)
continue
if table_full:
broadcast_error(
"Could not load all desired contacts onto the radio for auto-DM ack",
"The radio's contact table is full. Clearing your radio contacts "
"using another client, lowering your contact fill target in "
"settings, or setting MESHCORE_LOAD_WITH_AUTOEVICT=true may "
"relieve this. See 'Contact Loading Issues' in the Advanced "
"Setup documentation.",
)
if autoevict:
logger.error(
"We're expecting the radio to be in AUTO_ADD_OVERWRITE_OLDEST mode, "
"so a full-table error means we have no idea what is going on with "
"this radio; it is misbehaving. You should consider DM auto-acking "
"to be unreliable and/or not working for this radio. Sending and "
"receiving messages are not impacted by this error unless other "
"things are broken on your radio."
)
broadcast_error(
"Could not load all desired contacts onto the radio for auto-DM ack: ",
"Despite having auto-evict enabled, we got a contact-table-full error "
"from your radio. DM auto-ack is likely unavailable.",
)
else:
normal_table_full_message = (
"The radio's contact table is full. Clearing your radio contacts "
"using another client, lowering your contact fill target in "
"settings, or setting MESHCORE_LOAD_WITH_AUTOEVICT=true may "
"relieve this. See 'Contact Loading Issues' in the Advanced "
"README.md"
)
logger.error(
"Contact reconcile hit TABLE_FULL. %s",
normal_table_full_message,
)
broadcast_error(
"Could not load all desired contacts onto the radio for auto-DM ack",
normal_table_full_message,
)
break
if autoevict and autoevict_pass_complete:
if autoevict_pass_failed:
autoevict_next_index = 0
else:
logger.info(
"Background contact blind fill complete: refreshed %d desired contacts",
len(desired_fill_contacts),
)
break
await asyncio.sleep(CONTACT_RECONCILE_YIELD_SECONDS)
if not progressed:
continue

View File

@@ -520,6 +520,37 @@ class TestSyncAndOffloadAll:
)
assert result["contact_reconcile_started"] is True
@pytest.mark.asyncio
async def test_falls_back_to_snapshot_reconcile_when_autoevict_enable_fails(self, test_db):
mock_mc = MagicMock()
radio_contacts = {KEY_A: {"public_key": KEY_A}}
with (
patch.object(radio_sync.settings, "load_with_autoevict", True),
patch(
"app.radio_sync._enable_autoevict_on_radio",
new=AsyncMock(return_value=False),
),
patch(
"app.radio_sync.sync_contacts_from_radio",
new=AsyncMock(return_value={"synced": 1, "radio_contacts": radio_contacts}),
),
patch(
"app.radio_sync.sync_and_offload_channels",
new=AsyncMock(return_value={"synced": 0, "cleared": 0}),
),
patch("app.radio_sync.ensure_default_channels", new=AsyncMock()),
patch("app.radio_sync.start_background_contact_reconciliation") as mock_start,
):
result = await sync_and_offload_all(mock_mc)
mock_start.assert_called_once_with(
initial_radio_contacts=radio_contacts,
expected_mc=mock_mc,
autoevict=False,
)
assert result["contact_reconcile_started"] is True
@pytest.mark.asyncio
async def test_advert_fill_skips_repeaters(self, test_db):
"""Recent advert fallback only considers non-repeaters."""
@@ -844,6 +875,53 @@ class TestBackgroundContactReconcile:
payload = mock_mc.commands.add_contact.call_args.args[0]
assert payload["public_key"] == KEY_B
@pytest.mark.asyncio
async def test_autoevict_blind_fill_readds_full_desired_set(self, test_db):
await _insert_contact(KEY_A, "Alice", last_contacted=2000)
await _insert_contact(KEY_B, "Bob", last_contacted=1000)
alice = await ContactRepository.get_by_key(KEY_A)
bob = await ContactRepository.get_by_key(KEY_B)
assert alice is not None
assert bob is not None
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.get_contact_by_key_prefix = MagicMock(return_value=None)
mock_mc.commands.remove_contact = AsyncMock(return_value=MagicMock(type=EventType.OK))
mock_mc.commands.add_contact = AsyncMock(return_value=MagicMock(type=EventType.OK))
radio_manager._meshcore = mock_mc
@asynccontextmanager
async def _radio_operation(*args, **kwargs):
del args, kwargs
yield mock_mc
with (
patch.object(
radio_sync.radio_manager,
"radio_operation",
side_effect=lambda *args, **kwargs: _radio_operation(*args, **kwargs),
),
patch("app.radio_sync.CONTACT_RECONCILE_BATCH_SIZE", 10),
patch(
"app.radio_sync.get_contacts_selected_for_radio_sync",
side_effect=[[alice, bob], [alice, bob]],
),
patch("app.radio_sync.asyncio.sleep", new=AsyncMock()),
):
await radio_sync._reconcile_radio_contacts_in_background(
initial_radio_contacts={KEY_A: {"public_key": KEY_A}},
expected_mc=mock_mc,
autoevict=True,
)
mock_mc.commands.remove_contact.assert_not_called()
assert mock_mc.commands.add_contact.await_count == 2
loaded_keys = [
call.args[0]["public_key"] for call in mock_mc.commands.add_contact.call_args_list
]
assert loaded_keys == [KEY_A, KEY_B]
@pytest.mark.asyncio
async def test_yields_radio_lock_every_two_contact_operations(self, test_db):
await _insert_contact(KEY_A, "Alice", last_contacted=3000)