Files
Remote-Terminal-for-MeshCore/app/radio_sync.py

718 lines
25 KiB
Python

"""
Radio sync and offload management.
This module handles syncing contacts and channels from the radio to the database,
then removing them from the radio to free up space for new discoveries.
Also handles loading recent non-repeater contacts TO the radio for DM ACK support.
Also handles periodic message polling as a fallback for platforms where push events
don't work reliably.
"""
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from meshcore import EventType, MeshCore
from app.event_handlers import cleanup_expired_acks
from app.models import Contact
from app.radio import RadioOperationBusyError, radio_manager
from app.repository import (
AmbiguousPublicKeyPrefixError,
AppSettingsRepository,
ChannelRepository,
ContactRepository,
MessageRepository,
)
logger = logging.getLogger(__name__)
async def upsert_channel_from_radio_slot(payload: dict, *, on_radio: bool) -> str | None:
"""Parse a radio channel-slot payload and upsert to the database.
Returns the uppercase hex key if a channel was upserted, or None if the
slot was empty/invalid.
"""
name = payload.get("channel_name", "")
secret = payload.get("channel_secret", b"")
# Skip empty channels
if not name or name == "\x00" * len(name):
return None
is_hashtag = name.startswith("#")
key_bytes = secret if isinstance(secret, bytes) else bytes(secret)
key_hex = key_bytes.hex().upper()
await ChannelRepository.upsert(
key=key_hex,
name=name,
is_hashtag=is_hashtag,
on_radio=on_radio,
)
return key_hex
# Message poll task handle
_message_poll_task: asyncio.Task | None = None
# Message poll interval in seconds (10s gives DM ACKs plenty of time to arrive)
MESSAGE_POLL_INTERVAL = 10
# Periodic advertisement task handle
_advert_task: asyncio.Task | None = None
# Default check interval when periodic advertising is disabled (seconds)
# We still need to periodically check if it's been enabled
ADVERT_CHECK_INTERVAL = 60
# Minimum allowed advertisement interval (1 hour).
# Even if the database has a shorter value, we silently refuse to advertise
# more frequently than this.
MIN_ADVERT_INTERVAL = 3600
# Counter to pause polling during repeater operations (supports nested pauses)
_polling_pause_count: int = 0
def is_polling_paused() -> bool:
"""Check if polling is currently paused."""
return _polling_pause_count > 0
@asynccontextmanager
async def pause_polling():
"""Context manager to pause message polling during repeater operations.
Supports nested pauses - polling only resumes when all pause contexts have exited.
"""
global _polling_pause_count
_polling_pause_count += 1
try:
yield
finally:
_polling_pause_count -= 1
# Background task handle
_sync_task: asyncio.Task | None = None
# Sync interval in seconds (5 minutes)
SYNC_INTERVAL = 300
async def sync_and_offload_contacts(mc: MeshCore) -> dict:
"""
Sync contacts from radio to database, then remove them from radio.
Returns counts of synced and removed contacts.
"""
synced = 0
removed = 0
try:
# Get all contacts from radio
result = await mc.commands.get_contacts()
if result is None or result.type == EventType.ERROR:
logger.error("Failed to get contacts from radio: %s", result)
return {"synced": 0, "removed": 0, "error": str(result)}
contacts = result.payload or {}
logger.info("Found %d contacts on radio", len(contacts))
# Sync each contact to database, then remove from radio
for public_key, contact_data in contacts.items():
# Save to database
await ContactRepository.upsert(
Contact.from_radio_dict(public_key, contact_data, on_radio=False)
)
claimed = await MessageRepository.claim_prefix_messages(public_key.lower())
if claimed > 0:
logger.info(
"Claimed %d prefix DM message(s) for contact %s",
claimed,
public_key[:12],
)
synced += 1
# Remove from radio
try:
remove_result = await mc.commands.remove_contact(contact_data)
if remove_result.type == EventType.OK:
removed += 1
# LIBRARY INTERNAL FIXUP: The MeshCore library's
# commands.remove_contact() sends the remove command over
# the wire but does NOT update the library's in-memory
# contact cache (mc._contacts). This is a gap in the
# library — there's no public API to clear a single
# contact from the cache, and the library only refreshes
# it on a full get_contacts() call.
#
# Why this matters: sync_recent_contacts_to_radio() uses
# mc.get_contact_by_key_prefix() to check whether a
# contact is already loaded on the radio. That method
# searches mc._contacts. If we don't evict the removed
# contact from the cache here, get_contact_by_key_prefix()
# will still find it and skip the add_contact() call —
# meaning contacts never get loaded back onto the radio
# after offload. The result: no DM ACKs, degraded routing
# for potentially minutes until the next periodic sync
# refreshes the cache from the (now-empty) radio.
#
# We access mc._contacts directly because the library
# exposes it as a read-only property (mc.contacts) with
# no removal API. The dict is keyed by public_key string.
mc._contacts.pop(public_key, None)
else:
logger.warning(
"Failed to remove contact %s: %s", public_key[:12], remove_result.payload
)
except Exception as e:
logger.warning("Error removing contact %s: %s", public_key[:12], e)
logger.info("Synced %d contacts, removed %d from radio", synced, removed)
except Exception as e:
logger.error("Error during contact sync: %s", e)
return {"synced": synced, "removed": removed, "error": str(e)}
return {"synced": synced, "removed": removed}
async def sync_and_offload_channels(mc: MeshCore) -> dict:
"""
Sync channels from radio to database, then clear them from radio.
Returns counts of synced and cleared channels.
"""
synced = 0
cleared = 0
try:
# Check all 40 channel slots
for idx in range(40):
result = await mc.commands.get_channel(idx)
if result.type != EventType.CHANNEL_INFO:
continue
key_hex = await upsert_channel_from_radio_slot(
result.payload,
on_radio=False, # We're about to clear it
)
if key_hex is None:
continue
synced += 1
logger.debug("Synced channel %s: %s", key_hex[:8], result.payload.get("channel_name"))
# Clear from radio (set empty name and zero key)
try:
clear_result = await mc.commands.set_channel(
channel_idx=idx,
channel_name="",
channel_secret=bytes(16),
)
if clear_result.type == EventType.OK:
cleared += 1
else:
logger.warning("Failed to clear channel %d: %s", idx, clear_result.payload)
except Exception as e:
logger.warning("Error clearing channel %d: %s", idx, e)
logger.info("Synced %d channels, cleared %d from radio", synced, cleared)
except Exception as e:
logger.error("Error during channel sync: %s", e)
return {"synced": synced, "cleared": cleared, "error": str(e)}
return {"synced": synced, "cleared": cleared}
async def ensure_default_channels() -> None:
"""
Ensure default channels exist in the database.
These will be configured on the radio when needed for sending.
The Public channel is protected - it always exists with the canonical name.
"""
# Public channel - no hashtag, specific well-known key
PUBLIC_CHANNEL_KEY_HEX = "8B3387E9C5CDEA6AC9E5EDBAA115CD72"
# Check by KEY (not name) since that's what's fixed
existing = await ChannelRepository.get_by_key(PUBLIC_CHANNEL_KEY_HEX)
if not existing or existing.name != "Public":
logger.info("Ensuring default Public channel exists with correct name")
await ChannelRepository.upsert(
key=PUBLIC_CHANNEL_KEY_HEX,
name="Public",
is_hashtag=False,
on_radio=existing.on_radio if existing else False,
)
async def sync_and_offload_all(mc: MeshCore) -> dict:
"""Sync and offload both contacts and channels, then ensure defaults exist."""
logger.info("Starting full radio sync and offload")
contacts_result = await sync_and_offload_contacts(mc)
channels_result = await sync_and_offload_channels(mc)
# Ensure default channels exist
await ensure_default_channels()
# Reload favorites and recent contacts back onto the radio immediately
# so favorited contacts don't stay in the on_radio=False limbo until the
# next advertisement arrives. Pass mc directly since the caller already
# holds the radio operation lock (asyncio.Lock is not reentrant).
reload_result = await sync_recent_contacts_to_radio(force=True, mc=mc)
return {
"contacts": contacts_result,
"channels": channels_result,
"reloaded": reload_result,
}
async def drain_pending_messages(mc: MeshCore) -> int:
"""
Drain all pending messages from the radio.
Calls get_msg() repeatedly until NO_MORE_MSGS is received.
Returns the count of messages retrieved.
"""
count = 0
max_iterations = 100 # Safety limit
for _ in range(max_iterations):
try:
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
break
elif result.type == EventType.ERROR:
logger.debug("Error during message drain: %s", result.payload)
break
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# Small delay between fetches
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
break
except Exception as e:
logger.debug("Error draining messages: %s", e)
break
return count
async def poll_for_messages(mc: MeshCore) -> int:
"""
Poll the radio for any pending messages (single pass).
This is a fallback for platforms where MESSAGES_WAITING push events
don't work reliably.
Returns the count of messages retrieved.
"""
count = 0
try:
# Try to get one message
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
# No messages waiting
return 0
elif result.type == EventType.ERROR:
return 0
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# If we got a message, there might be more - drain them
count += await drain_pending_messages(mc)
except asyncio.TimeoutError:
pass
except Exception as e:
logger.debug("Message poll exception: %s", e)
return count
async def _message_poll_loop():
"""Background task that periodically polls for messages."""
while True:
try:
await asyncio.sleep(MESSAGE_POLL_INTERVAL)
# Clean up expired pending ACKs every poll cycle so they don't
# accumulate when no ACKs arrive (e.g. all recipients out of range).
cleanup_expired_acks()
if radio_manager.is_connected and not is_polling_paused():
try:
async with radio_manager.radio_operation(
"message_poll_loop",
blocking=False,
suspend_auto_fetch=True,
) as mc:
await poll_for_messages(mc)
except RadioOperationBusyError:
logger.debug("Skipping message poll: radio busy")
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Error in message poll loop: %s", e)
def start_message_polling():
"""Start the periodic message polling background task."""
global _message_poll_task
if _message_poll_task is None or _message_poll_task.done():
_message_poll_task = asyncio.create_task(_message_poll_loop())
logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL)
async def stop_message_polling():
"""Stop the periodic message polling background task."""
global _message_poll_task
if _message_poll_task and not _message_poll_task.done():
_message_poll_task.cancel()
try:
await _message_poll_task
except asyncio.CancelledError:
pass
_message_poll_task = None
logger.info("Stopped periodic message polling")
async def send_advertisement(mc: MeshCore, *, force: bool = False) -> bool:
"""Send an advertisement to announce presence on the mesh.
Respects the configured advert_interval - won't send if not enough time
has elapsed since the last advertisement, unless force=True.
Args:
mc: The MeshCore instance to use for the advertisement.
force: If True, send immediately regardless of interval.
Returns True if successful, False otherwise (including if throttled).
"""
# Check if enough time has elapsed (unless forced)
if not force:
settings = await AppSettingsRepository.get()
interval = settings.advert_interval
last_time = settings.last_advert_time
now = int(time.time())
# If interval is 0, advertising is disabled
if interval <= 0:
logger.debug("Advertisement skipped: periodic advertising is disabled")
return False
# Enforce minimum interval floor
interval = max(interval, MIN_ADVERT_INTERVAL)
# Check if enough time has passed
elapsed = now - last_time
if elapsed < interval:
remaining = interval - elapsed
logger.debug(
"Advertisement throttled: %d seconds remaining (interval=%d, elapsed=%d)",
remaining,
interval,
elapsed,
)
return False
try:
result = await mc.commands.send_advert(flood=True)
if result.type == EventType.OK:
# Update last_advert_time in database
now = int(time.time())
await AppSettingsRepository.update(last_advert_time=now)
logger.info("Advertisement sent successfully")
return True
else:
logger.warning("Failed to send advertisement: %s", result.payload)
return False
except Exception as e:
logger.warning("Error sending advertisement: %s", e)
return False
async def _periodic_advert_loop():
"""Background task that periodically checks if an advertisement should be sent.
The actual throttling logic is in send_advertisement(), which checks
last_advert_time from the database. This loop just triggers the check
periodically and sleeps between attempts.
"""
while True:
try:
await asyncio.sleep(ADVERT_CHECK_INTERVAL)
# Try to send - send_advertisement() handles all checks
# (disabled, throttled, not connected)
if radio_manager.is_connected:
try:
async with radio_manager.radio_operation(
"periodic_advertisement",
blocking=False,
) as mc:
await send_advertisement(mc)
except RadioOperationBusyError:
logger.debug("Skipping periodic advertisement: radio busy")
except asyncio.CancelledError:
logger.info("Periodic advertisement task cancelled")
break
except Exception as e:
logger.error("Error in periodic advertisement loop: %s", e)
await asyncio.sleep(ADVERT_CHECK_INTERVAL)
def start_periodic_advert():
"""Start the periodic advertisement background task.
The task reads interval from app_settings dynamically, so it will
adapt to configuration changes without restart.
"""
global _advert_task
if _advert_task is None or _advert_task.done():
_advert_task = asyncio.create_task(_periodic_advert_loop())
logger.info("Started periodic advertisement task (interval configured in settings)")
async def stop_periodic_advert():
"""Stop the periodic advertisement background task."""
global _advert_task
if _advert_task and not _advert_task.done():
_advert_task.cancel()
try:
await _advert_task
except asyncio.CancelledError:
pass
_advert_task = None
logger.info("Stopped periodic advertisement")
async def sync_radio_time(mc: MeshCore) -> bool:
"""Sync the radio's clock with the system time.
Returns True if successful, False otherwise.
"""
try:
now = int(time.time())
await mc.commands.set_time(now)
logger.debug("Synced radio time to %d", now)
return True
except Exception as e:
logger.warning("Failed to sync radio time: %s", e)
return False
async def _periodic_sync_loop():
"""Background task that periodically syncs and offloads."""
while True:
try:
await asyncio.sleep(SYNC_INTERVAL)
if not radio_manager.is_connected:
continue
try:
async with radio_manager.radio_operation(
"periodic_sync",
blocking=False,
) as mc:
logger.debug("Running periodic radio sync")
await sync_and_offload_all(mc)
await sync_radio_time(mc)
except RadioOperationBusyError:
logger.debug("Skipping periodic sync: radio busy")
except asyncio.CancelledError:
logger.info("Periodic sync task cancelled")
break
except Exception as e:
logger.error("Error in periodic sync: %s", e)
def start_periodic_sync():
"""Start the periodic sync background task."""
global _sync_task
if _sync_task is None or _sync_task.done():
_sync_task = asyncio.create_task(_periodic_sync_loop())
logger.info("Started periodic radio sync (interval: %ds)", SYNC_INTERVAL)
async def stop_periodic_sync():
"""Stop the periodic sync background task."""
global _sync_task
if _sync_task and not _sync_task.done():
_sync_task.cancel()
try:
await _sync_task
except asyncio.CancelledError:
pass
_sync_task = None
logger.info("Stopped periodic radio sync")
# Throttling for contact sync to radio
_last_contact_sync: float = 0.0
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
async def _sync_contacts_to_radio_inner(mc: MeshCore) -> dict:
"""
Core logic for loading contacts onto the radio.
Favorite contacts are prioritized first, then recent non-repeater contacts
fill remaining slots up to max_radio_contacts.
Caller must hold the radio operation lock and pass a valid MeshCore instance.
"""
app_settings = await AppSettingsRepository.get()
max_contacts = app_settings.max_radio_contacts
selected_contacts: list[Contact] = []
selected_keys: set[str] = set()
favorite_contacts_loaded = 0
for favorite in app_settings.favorites:
if favorite.type != "contact":
continue
try:
contact = await ContactRepository.get_by_key_or_prefix(favorite.id)
except AmbiguousPublicKeyPrefixError:
logger.warning(
"Skipping favorite contact '%s': ambiguous key prefix; use full key",
favorite.id,
)
continue
if not contact:
continue
key = contact.public_key.lower()
if key in selected_keys:
continue
selected_keys.add(key)
selected_contacts.append(contact)
favorite_contacts_loaded += 1
if len(selected_contacts) >= max_contacts:
break
if len(selected_contacts) < max_contacts:
recent_contacts = await ContactRepository.get_recent_non_repeaters(limit=max_contacts)
for contact in recent_contacts:
key = contact.public_key.lower()
if key in selected_keys:
continue
selected_keys.add(key)
selected_contacts.append(contact)
if len(selected_contacts) >= max_contacts:
break
logger.debug(
"Selected %d contacts to sync (%d favorite contacts first, limit=%d)",
len(selected_contacts),
favorite_contacts_loaded,
max_contacts,
)
loaded = 0
already_on_radio = 0
failed = 0
for contact in selected_contacts:
# Check if already on radio
radio_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if radio_contact:
already_on_radio += 1
# Update DB if not marked as on_radio
if not contact.on_radio:
await ContactRepository.set_on_radio(contact.public_key, True)
continue
try:
result = await mc.commands.add_contact(contact.to_radio_dict())
if result.type == EventType.OK:
loaded += 1
await ContactRepository.set_on_radio(contact.public_key, True)
logger.debug("Loaded contact %s to radio", contact.public_key[:12])
else:
failed += 1
logger.warning(
"Failed to load contact %s: %s", contact.public_key[:12], result.payload
)
except Exception as e:
failed += 1
logger.warning("Error loading contact %s: %s", contact.public_key[:12], e)
if loaded > 0 or failed > 0:
logger.info(
"Contact sync: loaded %d, already on radio %d, failed %d",
loaded,
already_on_radio,
failed,
)
return {
"loaded": loaded,
"already_on_radio": already_on_radio,
"failed": failed,
}
async def sync_recent_contacts_to_radio(force: bool = False, mc: MeshCore | None = None) -> dict:
"""
Load contacts to the radio for DM ACK support.
Favorite contacts are prioritized first, then recent non-repeater contacts
fill remaining slots up to max_radio_contacts.
Only runs at most once every CONTACT_SYNC_THROTTLE_SECONDS unless forced.
Args:
force: Skip the throttle check.
mc: Optional MeshCore instance. When provided, the caller already holds
the radio operation lock and the inner logic runs directly.
When None, this function acquires its own lock.
Returns counts of contacts loaded.
"""
global _last_contact_sync
# Throttle unless forced
now = time.time()
if not force and (now - _last_contact_sync) < CONTACT_SYNC_THROTTLE_SECONDS:
logger.debug("Contact sync throttled (last sync %ds ago)", int(now - _last_contact_sync))
return {"loaded": 0, "throttled": True}
# If caller provided a MeshCore instance, use it directly (caller holds the lock)
if mc is not None:
_last_contact_sync = now
return await _sync_contacts_to_radio_inner(mc)
if not radio_manager.is_connected or radio_manager.meshcore is None:
logger.debug("Cannot sync contacts to radio: not connected")
return {"loaded": 0, "error": "Radio not connected"}
try:
async with radio_manager.radio_operation(
"sync_recent_contacts_to_radio",
blocking=False,
) as mc:
_last_contact_sync = now
return await _sync_contacts_to_radio_inner(mc)
except RadioOperationBusyError:
logger.debug("Skipping contact sync to radio: radio busy")
return {"loaded": 0, "busy": True}
except Exception as e:
logger.error("Error syncing contacts to radio: %s", e)
return {"loaded": 0, "error": str(e)}