Files
Remote-Terminal-for-MeshCore/app/radio_sync.py

944 lines
33 KiB
Python

"""
Radio sync and offload management.
This module handles syncing contacts and channels from the radio to the database,
then removing them from the radio to free up space for new discoveries.
Also handles loading favorites plus recently active contacts TO the radio for DM ACK support.
Also handles periodic message polling as a fallback for platforms where push events
don't work reliably.
"""
import asyncio
import logging
import math
import time
from contextlib import asynccontextmanager
from meshcore import EventType, MeshCore
from app.config import settings
from app.event_handlers import cleanup_expired_acks
from app.models import Contact, ContactUpsert
from app.radio import RadioOperationBusyError
from app.repository import (
AmbiguousPublicKeyPrefixError,
AppSettingsRepository,
ChannelRepository,
ContactRepository,
)
from app.services.contact_reconciliation import reconcile_contact_messages
from app.services.radio_runtime import radio_runtime as radio_manager
from app.websocket import broadcast_error
logger = logging.getLogger(__name__)
def _contact_sync_debug_fields(contact: Contact) -> dict[str, object]:
"""Return key contact fields for sync failure diagnostics."""
return {
"type": contact.type,
"flags": contact.flags,
"last_path": contact.last_path,
"last_path_len": contact.last_path_len,
"out_path_hash_mode": contact.out_path_hash_mode,
"route_override_path": contact.route_override_path,
"route_override_len": contact.route_override_len,
"route_override_hash_mode": contact.route_override_hash_mode,
"last_advert": contact.last_advert,
"lat": contact.lat,
"lon": contact.lon,
"on_radio": contact.on_radio,
}
async def _reconcile_contact_messages_background(
public_key: str,
contact_name: str | None,
) -> None:
"""Run contact/message reconciliation outside the radio critical path."""
try:
await reconcile_contact_messages(
public_key=public_key,
contact_name=contact_name,
log=logger,
)
except Exception as exc:
logger.warning(
"Background contact reconciliation failed for %s: %s",
public_key[:12],
exc,
exc_info=True,
)
async def upsert_channel_from_radio_slot(payload: dict, *, on_radio: bool) -> str | None:
"""Parse a radio channel-slot payload and upsert to the database.
Returns the uppercase hex key if a channel was upserted, or None if the
slot was empty/invalid.
"""
name = payload.get("channel_name", "")
secret = payload.get("channel_secret", b"")
# Skip empty channels
if not name or name == "\x00" * len(name):
return None
is_hashtag = name.startswith("#")
key_bytes = secret if isinstance(secret, bytes) else bytes(secret)
key_hex = key_bytes.hex().upper()
await ChannelRepository.upsert(
key=key_hex,
name=name,
is_hashtag=is_hashtag,
on_radio=on_radio,
)
return key_hex
# Message poll task handle
_message_poll_task: asyncio.Task | None = None
# Message poll interval in seconds when aggressive fallback is enabled.
MESSAGE_POLL_INTERVAL = 10
# Always-on audit interval when aggressive fallback is disabled.
MESSAGE_POLL_AUDIT_INTERVAL = 3600
# Periodic advertisement task handle
_advert_task: asyncio.Task | None = None
# Default check interval when periodic advertising is disabled (seconds)
# We still need to periodically check if it's been enabled
ADVERT_CHECK_INTERVAL = 60
# Minimum allowed advertisement interval (1 hour).
# Even if the database has a shorter value, we silently refuse to advertise
# more frequently than this.
MIN_ADVERT_INTERVAL = 3600
# Counter to pause polling during repeater operations (supports nested pauses)
_polling_pause_count: int = 0
def is_polling_paused() -> bool:
"""Check if polling is currently paused."""
return _polling_pause_count > 0
@asynccontextmanager
async def pause_polling():
"""Context manager to pause message polling during repeater operations.
Supports nested pauses - polling only resumes when all pause contexts have exited.
"""
global _polling_pause_count
_polling_pause_count += 1
try:
yield
finally:
_polling_pause_count -= 1
# Background task handle
_sync_task: asyncio.Task | None = None
# Periodic maintenance check interval in seconds (5 minutes)
SYNC_INTERVAL = 300
# Reload non-favorite contacts up to 80% of configured radio capacity after offload.
RADIO_CONTACT_REFILL_RATIO = 0.80
# Trigger a full offload/reload once occupancy reaches 95% of configured capacity.
RADIO_CONTACT_FULL_SYNC_RATIO = 0.95
def _compute_radio_contact_limits(max_contacts: int) -> tuple[int, int]:
"""Return (refill_target, full_sync_trigger) for the configured capacity."""
capacity = max(1, max_contacts)
refill_target = max(1, min(capacity, int((capacity * RADIO_CONTACT_REFILL_RATIO) + 0.5)))
full_sync_trigger = max(
refill_target,
min(capacity, math.ceil(capacity * RADIO_CONTACT_FULL_SYNC_RATIO)),
)
return refill_target, full_sync_trigger
async def should_run_full_periodic_sync(mc: MeshCore) -> bool:
"""Check current radio occupancy and decide whether to offload/reload."""
app_settings = await AppSettingsRepository.get()
capacity = app_settings.max_radio_contacts
refill_target, full_sync_trigger = _compute_radio_contact_limits(capacity)
result = await mc.commands.get_contacts()
if result is None or result.type == EventType.ERROR:
logger.warning("Periodic sync occupancy check failed: %s", result)
return False
current_contacts = len(result.payload or {})
if current_contacts >= full_sync_trigger:
logger.info(
"Running full radio sync: %d/%d contacts on radio (trigger=%d, refill_target=%d)",
current_contacts,
capacity,
full_sync_trigger,
refill_target,
)
return True
logger.debug(
"Skipping full radio sync: %d/%d contacts on radio (trigger=%d, refill_target=%d)",
current_contacts,
capacity,
full_sync_trigger,
refill_target,
)
return False
async def sync_and_offload_contacts(mc: MeshCore) -> dict:
"""
Sync contacts from radio to database, then remove them from radio.
Returns counts of synced and removed contacts.
"""
synced = 0
removed = 0
try:
# Get all contacts from radio
result = await mc.commands.get_contacts()
if result is None or result.type == EventType.ERROR:
logger.error(
"Failed to get contacts from radio: %s. "
"If you see this repeatedly, the radio may be visible on the "
"serial/TCP/BLE port but not responding to commands. Check for "
"another process with the serial port open (other RemoteTerm "
"instances, serial monitors, etc.), verify the firmware is "
"up-to-date and in client mode (not repeater), or try a "
"power cycle.",
result,
)
return {"synced": 0, "removed": 0, "error": str(result)}
contacts = result.payload or {}
logger.info("Found %d contacts on radio", len(contacts))
# Sync each contact to database, then remove from radio
for public_key, contact_data in contacts.items():
# Save to database
await ContactRepository.upsert(
ContactUpsert.from_radio_dict(public_key, contact_data, on_radio=False)
)
asyncio.create_task(
_reconcile_contact_messages_background(
public_key,
contact_data.get("adv_name"),
)
)
synced += 1
# Remove from radio
try:
remove_result = await mc.commands.remove_contact(contact_data)
if remove_result.type == EventType.OK:
removed += 1
# LIBRARY INTERNAL FIXUP: The MeshCore library's
# commands.remove_contact() sends the remove command over
# the wire but does NOT update the library's in-memory
# contact cache (mc._contacts). This is a gap in the
# library — there's no public API to clear a single
# contact from the cache, and the library only refreshes
# it on a full get_contacts() call.
#
# Why this matters: sync_recent_contacts_to_radio() uses
# mc.get_contact_by_key_prefix() to check whether a
# contact is already loaded on the radio. That method
# searches mc._contacts. If we don't evict the removed
# contact from the cache here, get_contact_by_key_prefix()
# will still find it and skip the add_contact() call —
# meaning contacts never get loaded back onto the radio
# after offload. The result: no DM ACKs, degraded routing
# for potentially minutes until the next periodic sync
# refreshes the cache from the (now-empty) radio.
#
# We access mc._contacts directly because the library
# exposes it as a read-only property (mc.contacts) with
# no removal API. The dict is keyed by public_key string.
mc._contacts.pop(public_key, None)
else:
logger.warning(
"Failed to remove contact %s: %s", public_key[:12], remove_result.payload
)
except Exception as e:
logger.warning("Error removing contact %s: %s", public_key[:12], e)
logger.info("Synced %d contacts, removed %d from radio", synced, removed)
except Exception as e:
logger.error("Error during contact sync: %s", e)
return {"synced": synced, "removed": removed, "error": str(e)}
return {"synced": synced, "removed": removed}
async def sync_and_offload_channels(mc: MeshCore) -> dict:
"""
Sync channels from radio to database, then clear them from radio.
Returns counts of synced and cleared channels.
"""
synced = 0
cleared = 0
try:
# Check all 40 channel slots
for idx in range(40):
result = await mc.commands.get_channel(idx)
if result.type != EventType.CHANNEL_INFO:
continue
key_hex = await upsert_channel_from_radio_slot(
result.payload,
on_radio=False, # We're about to clear it
)
if key_hex is None:
continue
synced += 1
logger.debug("Synced channel %s: %s", key_hex[:8], result.payload.get("channel_name"))
# Clear from radio (set empty name and zero key)
try:
clear_result = await mc.commands.set_channel(
channel_idx=idx,
channel_name="",
channel_secret=bytes(16),
)
if clear_result.type == EventType.OK:
cleared += 1
else:
logger.warning("Failed to clear channel %d: %s", idx, clear_result.payload)
except Exception as e:
logger.warning("Error clearing channel %d: %s", idx, e)
logger.info("Synced %d channels, cleared %d from radio", synced, cleared)
except Exception as e:
logger.error("Error during channel sync: %s", e)
return {"synced": synced, "cleared": cleared, "error": str(e)}
return {"synced": synced, "cleared": cleared}
async def ensure_default_channels() -> None:
"""
Ensure default channels exist in the database.
These will be configured on the radio when needed for sending.
The Public channel is protected - it always exists with the canonical name.
"""
# Public channel - no hashtag, specific well-known key
PUBLIC_CHANNEL_KEY_HEX = "8B3387E9C5CDEA6AC9E5EDBAA115CD72"
# Check by KEY (not name) since that's what's fixed
existing = await ChannelRepository.get_by_key(PUBLIC_CHANNEL_KEY_HEX)
if not existing or existing.name != "Public":
logger.info("Ensuring default Public channel exists with correct name")
await ChannelRepository.upsert(
key=PUBLIC_CHANNEL_KEY_HEX,
name="Public",
is_hashtag=False,
on_radio=existing.on_radio if existing else False,
)
async def sync_and_offload_all(mc: MeshCore) -> dict:
"""Sync and offload both contacts and channels, then ensure defaults exist."""
logger.info("Starting full radio sync and offload")
contacts_result = await sync_and_offload_contacts(mc)
channels_result = await sync_and_offload_channels(mc)
# Ensure default channels exist
await ensure_default_channels()
# Reload favorites plus a working-set fill back onto the radio immediately
# so they do not stay in on_radio=False limbo after offload. Pass mc directly
# since the caller already holds the radio operation lock (asyncio.Lock is not
# reentrant).
reload_result = await sync_recent_contacts_to_radio(force=True, mc=mc)
return {
"contacts": contacts_result,
"channels": channels_result,
"reloaded": reload_result,
}
async def drain_pending_messages(mc: MeshCore) -> int:
"""
Drain all pending messages from the radio.
Calls get_msg() repeatedly until NO_MORE_MSGS is received.
Returns the count of messages retrieved.
"""
count = 0
max_iterations = 100 # Safety limit
for _ in range(max_iterations):
try:
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
break
elif result.type == EventType.ERROR:
logger.debug("Error during message drain: %s", result.payload)
break
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# Small delay between fetches
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
break
except Exception as e:
logger.warning("Error draining messages: %s", e, exc_info=True)
break
return count
async def poll_for_messages(mc: MeshCore) -> int:
"""
Poll the radio for any pending messages (single pass).
This is a fallback for platforms where MESSAGES_WAITING push events
don't work reliably.
Returns the count of messages retrieved.
"""
count = 0
try:
# Try to get one message
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
# No messages waiting
return 0
elif result.type == EventType.ERROR:
return 0
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# If we got a message, there might be more - drain them
count += await drain_pending_messages(mc)
except asyncio.TimeoutError:
pass
except Exception as e:
logger.warning("Message poll exception: %s", e, exc_info=True)
return count
async def _message_poll_loop():
"""Background task that periodically polls for messages."""
while True:
try:
aggressive_fallback = settings.enable_message_poll_fallback
await asyncio.sleep(
MESSAGE_POLL_INTERVAL if aggressive_fallback else MESSAGE_POLL_AUDIT_INTERVAL
)
if radio_manager.is_connected and not is_polling_paused():
try:
async with radio_manager.radio_operation(
"message_poll_loop",
blocking=False,
suspend_auto_fetch=True,
) as mc:
count = await poll_for_messages(mc)
if count > 0:
if aggressive_fallback:
logger.warning(
"Poll loop caught %d message(s) missed by auto-fetch",
count,
)
else:
logger.error(
"Periodic radio audit caught %d message(s) that were not "
"surfaced via event subscription. See README and consider "
"setting MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK=true to "
"enable more frequent polling.",
count,
)
broadcast_error(
"A periodic poll task has discovered radio inconsistencies.",
"Please check the logs for recommendations (search "
"'MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK').",
)
except RadioOperationBusyError:
logger.debug("Skipping message poll: radio busy")
except asyncio.CancelledError:
break
except Exception as e:
logger.warning("Error in message poll loop: %s", e, exc_info=True)
def start_message_polling():
"""Start the periodic message polling background task."""
global _message_poll_task
if _message_poll_task is None or _message_poll_task.done():
_message_poll_task = asyncio.create_task(_message_poll_loop())
if settings.enable_message_poll_fallback:
logger.info(
"Started periodic message polling task (aggressive fallback, interval: %ds)",
MESSAGE_POLL_INTERVAL,
)
else:
logger.info(
"Started periodic message audit task (interval: %ds)",
MESSAGE_POLL_AUDIT_INTERVAL,
)
async def stop_message_polling():
"""Stop the periodic message polling background task."""
global _message_poll_task
if _message_poll_task and not _message_poll_task.done():
_message_poll_task.cancel()
try:
await _message_poll_task
except asyncio.CancelledError:
pass
_message_poll_task = None
logger.info("Stopped periodic message polling")
async def send_advertisement(mc: MeshCore, *, force: bool = False) -> bool:
"""Send an advertisement to announce presence on the mesh.
Respects the configured advert_interval - won't send if not enough time
has elapsed since the last advertisement, unless force=True.
Args:
mc: The MeshCore instance to use for the advertisement.
force: If True, send immediately regardless of interval.
Returns True if successful, False otherwise (including if throttled).
"""
# Check if enough time has elapsed (unless forced)
if not force:
settings = await AppSettingsRepository.get()
interval = settings.advert_interval
last_time = settings.last_advert_time
now = int(time.time())
# If interval is 0, advertising is disabled
if interval <= 0:
logger.debug("Advertisement skipped: periodic advertising is disabled")
return False
# Enforce minimum interval floor
interval = max(interval, MIN_ADVERT_INTERVAL)
# Check if enough time has passed
elapsed = now - last_time
if elapsed < interval:
remaining = interval - elapsed
logger.debug(
"Advertisement throttled: %d seconds remaining (interval=%d, elapsed=%d)",
remaining,
interval,
elapsed,
)
return False
try:
result = await mc.commands.send_advert(flood=True)
if result.type == EventType.OK:
# Update last_advert_time in database
now = int(time.time())
await AppSettingsRepository.update(last_advert_time=now)
logger.info("Advertisement sent successfully")
return True
else:
logger.warning("Failed to send advertisement: %s", result.payload)
return False
except Exception as e:
logger.warning("Error sending advertisement: %s", e, exc_info=True)
return False
async def _periodic_advert_loop():
"""Background task that periodically checks if an advertisement should be sent.
The actual throttling logic is in send_advertisement(), which checks
last_advert_time from the database. This loop just triggers the check
periodically and sleeps between attempts.
"""
while True:
try:
await asyncio.sleep(ADVERT_CHECK_INTERVAL)
# Try to send - send_advertisement() handles all checks
# (disabled, throttled, not connected)
if radio_manager.is_connected:
try:
async with radio_manager.radio_operation(
"periodic_advertisement",
blocking=False,
) as mc:
await send_advertisement(mc)
except RadioOperationBusyError:
logger.debug("Skipping periodic advertisement: radio busy")
except asyncio.CancelledError:
logger.info("Periodic advertisement task cancelled")
break
except Exception as e:
logger.error("Error in periodic advertisement loop: %s", e, exc_info=True)
await asyncio.sleep(ADVERT_CHECK_INTERVAL)
def start_periodic_advert():
"""Start the periodic advertisement background task.
The task reads interval from app_settings dynamically, so it will
adapt to configuration changes without restart.
"""
global _advert_task
if _advert_task is None or _advert_task.done():
_advert_task = asyncio.create_task(_periodic_advert_loop())
logger.info("Started periodic advertisement task (interval configured in settings)")
async def stop_periodic_advert():
"""Stop the periodic advertisement background task."""
global _advert_task
if _advert_task and not _advert_task.done():
_advert_task.cancel()
try:
await _advert_task
except asyncio.CancelledError:
pass
_advert_task = None
logger.info("Stopped periodic advertisement")
async def sync_radio_time(mc: MeshCore) -> bool:
"""Sync the radio's clock with the system time.
Returns True if successful, False otherwise.
"""
try:
now = int(time.time())
await mc.commands.set_time(now)
logger.debug("Synced radio time to %d", now)
return True
except Exception as e:
logger.warning("Failed to sync radio time: %s", e, exc_info=True)
return False
async def _periodic_sync_loop():
"""Background task that periodically syncs and offloads."""
while True:
try:
await asyncio.sleep(SYNC_INTERVAL)
cleanup_expired_acks()
if not radio_manager.is_connected:
continue
try:
async with radio_manager.radio_operation(
"periodic_sync",
blocking=False,
) as mc:
if await should_run_full_periodic_sync(mc):
await sync_and_offload_all(mc)
await sync_radio_time(mc)
except RadioOperationBusyError:
logger.debug("Skipping periodic sync: radio busy")
except asyncio.CancelledError:
logger.info("Periodic sync task cancelled")
break
except Exception as e:
logger.error("Error in periodic sync: %s", e, exc_info=True)
def start_periodic_sync():
"""Start the periodic sync background task."""
global _sync_task
if _sync_task is None or _sync_task.done():
_sync_task = asyncio.create_task(_periodic_sync_loop())
logger.info("Started periodic radio sync (interval: %ds)", SYNC_INTERVAL)
async def stop_periodic_sync():
"""Stop the periodic sync background task."""
global _sync_task
if _sync_task and not _sync_task.done():
_sync_task.cancel()
try:
await _sync_task
except asyncio.CancelledError:
pass
_sync_task = None
logger.info("Stopped periodic radio sync")
# Throttling for contact sync to radio
_last_contact_sync: float = 0.0
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
"""
Core logic for loading contacts onto the radio.
Fill order is:
1. Favorite contacts
2. Most recently interacted-with non-repeaters
3. Most recently advert-heard non-repeaters without interaction history
Favorite contacts are always reloaded first, up to the configured capacity.
Additional non-favorite fill stops at the refill target (80% of capacity).
Caller must hold the radio operation lock and pass a valid MeshCore instance.
"""
app_settings = await AppSettingsRepository.get()
max_contacts = app_settings.max_radio_contacts
refill_target, _full_sync_trigger = _compute_radio_contact_limits(max_contacts)
selected_contacts: list[Contact] = []
selected_keys: set[str] = set()
favorite_contacts_loaded = 0
for favorite in app_settings.favorites:
if favorite.type != "contact":
continue
try:
contact = await ContactRepository.get_by_key_or_prefix(favorite.id)
except AmbiguousPublicKeyPrefixError:
logger.warning(
"Skipping favorite contact '%s': ambiguous key prefix; use full key",
favorite.id,
)
continue
if not contact:
continue
key = contact.public_key.lower()
if key in selected_keys:
continue
selected_keys.add(key)
selected_contacts.append(contact)
favorite_contacts_loaded += 1
if len(selected_contacts) >= max_contacts:
break
if len(selected_contacts) < refill_target:
for contact in await ContactRepository.get_recently_contacted_non_repeaters(
limit=max_contacts
):
key = contact.public_key.lower()
if key in selected_keys:
continue
selected_keys.add(key)
selected_contacts.append(contact)
if len(selected_contacts) >= refill_target:
break
if len(selected_contacts) < refill_target:
for contact in await ContactRepository.get_recently_advertised_non_repeaters(
limit=max_contacts
):
key = contact.public_key.lower()
if key in selected_keys:
continue
selected_keys.add(key)
selected_contacts.append(contact)
if len(selected_contacts) >= refill_target:
break
logger.debug(
"Selected %d contacts to sync (%d favorites, refill_target=%d, capacity=%d)",
len(selected_contacts),
favorite_contacts_loaded,
refill_target,
max_contacts,
)
return await _load_contacts_to_radio(mc, selected_contacts)
async def ensure_contact_on_radio(
public_key: str,
*,
force: bool = False,
mc: MeshCore | None = None,
) -> dict:
"""Ensure one contact is loaded on the radio for ACK/routing support."""
global _last_contact_sync
now = time.time()
if not force and (now - _last_contact_sync) < CONTACT_SYNC_THROTTLE_SECONDS:
logger.debug(
"Single-contact sync throttled (last sync %ds ago)",
int(now - _last_contact_sync),
)
return {"loaded": 0, "throttled": True}
try:
contact = await ContactRepository.get_by_key_or_prefix(public_key)
except AmbiguousPublicKeyPrefixError:
logger.warning("Cannot sync favorite contact '%s': ambiguous key prefix", public_key)
return {"loaded": 0, "error": "Ambiguous contact key prefix"}
if not contact:
logger.debug("Cannot sync favorite contact %s: not found", public_key[:12])
return {"loaded": 0, "error": "Contact not found"}
if mc is not None:
_last_contact_sync = now
return await _load_contacts_to_radio(mc, [contact])
if not radio_manager.is_connected or radio_manager.meshcore is None:
logger.debug("Cannot sync favorite contact to radio: not connected")
return {"loaded": 0, "error": "Radio not connected"}
try:
async with radio_manager.radio_operation(
"ensure_contact_on_radio",
blocking=False,
) as mc:
_last_contact_sync = now
assert mc is not None
return await _load_contacts_to_radio(mc, [contact])
except RadioOperationBusyError:
logger.debug("Skipping favorite contact sync: radio busy")
return {"loaded": 0, "busy": True}
except Exception as e:
logger.error("Error syncing favorite contact to radio: %s", e, exc_info=True)
return {"loaded": 0, "error": str(e)}
async def _load_contacts_to_radio(mc: MeshCore, contacts: list[Contact]) -> dict:
"""Load the provided contacts onto the radio."""
loaded = 0
already_on_radio = 0
failed = 0
for contact in contacts:
radio_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if radio_contact:
already_on_radio += 1
if not contact.on_radio:
await ContactRepository.set_on_radio(contact.public_key, True)
continue
try:
radio_contact_payload = contact.to_radio_dict()
result = await mc.commands.add_contact(radio_contact_payload)
if result.type == EventType.OK:
loaded += 1
await ContactRepository.set_on_radio(contact.public_key, True)
logger.debug("Loaded contact %s to radio", contact.public_key[:12])
else:
failed += 1
reason = result.payload
hint = ""
if reason is None:
hint = (
" (no response from radio — if this repeats, check for "
"serial port contention from another process or try a "
"power cycle)"
)
logger.warning(
"Failed to load contact %s: %s%s",
contact.public_key[:12],
reason,
hint,
)
except Exception as e:
failed += 1
logger.warning(
"Error loading contact %s with fields=%s radio_payload=%s: %s",
contact.public_key[:12],
_contact_sync_debug_fields(contact),
locals().get("radio_contact_payload"),
e,
exc_info=True,
)
if loaded > 0 or failed > 0:
logger.info(
"Contact sync: loaded %d, already on radio %d, failed %d",
loaded,
already_on_radio,
failed,
)
return {
"loaded": loaded,
"already_on_radio": already_on_radio,
"failed": failed,
}
async def sync_recent_contacts_to_radio(force: bool = False, mc: MeshCore | None = None) -> dict:
"""
Load contacts to the radio for DM ACK support.
Fill order is favorites, then recently contacted non-repeaters,
then recently advert-heard non-repeaters. Favorites are always reloaded
up to the configured capacity; additional non-favorite fill stops at the
80% refill target.
Only runs at most once every CONTACT_SYNC_THROTTLE_SECONDS unless forced.
Args:
force: Skip the throttle check.
mc: Optional MeshCore instance. When provided, the caller already holds
the radio operation lock and the inner logic runs directly.
When None, this function acquires its own lock.
Returns counts of contacts loaded.
"""
global _last_contact_sync
# Throttle unless forced
now = time.time()
if not force and (now - _last_contact_sync) < CONTACT_SYNC_THROTTLE_SECONDS:
logger.debug("Contact sync throttled (last sync %ds ago)", int(now - _last_contact_sync))
return {"loaded": 0, "throttled": True}
# If caller provided a MeshCore instance, use it directly (caller holds the lock)
if mc is not None:
_last_contact_sync = now
assert mc is not None
return await _sync_contacts_to_radio_inner(mc)
if not radio_manager.is_connected or radio_manager.meshcore is None:
logger.debug("Cannot sync contacts to radio: not connected")
return {"loaded": 0, "error": "Radio not connected"}
try:
async with radio_manager.radio_operation(
"sync_recent_contacts_to_radio",
blocking=False,
) as mc:
_last_contact_sync = now
assert mc is not None
return await _sync_contacts_to_radio_inner(mc)
except RadioOperationBusyError:
logger.debug("Skipping contact sync to radio: radio busy")
return {"loaded": 0, "busy": True}
except Exception as e:
logger.error("Error syncing contacts to radio: %s", e, exc_info=True)
return {"loaded": 0, "error": str(e)}