Files
Remote-Terminal-for-MeshCore/app/radio_sync.py
2026-01-14 20:08:41 -08:00

469 lines
15 KiB
Python

"""
Radio sync and offload management.
This module handles syncing contacts and channels from the radio to the database,
then removing them from the radio to free up space for new discoveries.
Also handles loading recent non-repeater contacts TO the radio for DM ACK support.
Also handles periodic message polling as a fallback for platforms where push events
don't work reliably.
"""
import asyncio
import logging
import time
from contextlib import asynccontextmanager
from meshcore import EventType
from app.config import settings
from app.models import Contact
from app.radio import radio_manager
from app.repository import ChannelRepository, ContactRepository
logger = logging.getLogger(__name__)
# Message poll task handle
_message_poll_task: asyncio.Task | None = None
# Message poll interval in seconds
MESSAGE_POLL_INTERVAL = 5
# Counter to pause polling during repeater operations (supports nested pauses)
_polling_pause_count: int = 0
def is_polling_paused() -> bool:
"""Check if polling is currently paused."""
return _polling_pause_count > 0
@asynccontextmanager
async def pause_polling():
"""Context manager to pause message polling during repeater operations.
Supports nested pauses - polling only resumes when all pause contexts have exited.
"""
global _polling_pause_count
_polling_pause_count += 1
try:
yield
finally:
_polling_pause_count -= 1
# Background task handle
_sync_task: asyncio.Task | None = None
# Sync interval in seconds (5 minutes)
SYNC_INTERVAL = 300
async def sync_and_offload_contacts() -> dict:
"""
Sync contacts from radio to database, then remove them from radio.
Returns counts of synced and removed contacts.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
logger.warning("Cannot sync contacts: radio not connected")
return {"synced": 0, "removed": 0, "error": "Radio not connected"}
mc = radio_manager.meshcore
synced = 0
removed = 0
try:
# Get all contacts from radio
result = await mc.commands.get_contacts()
if result is None or result.type == EventType.ERROR:
logger.error("Failed to get contacts from radio: %s", result)
return {"synced": 0, "removed": 0, "error": str(result)}
contacts = result.payload or {}
logger.info("Found %d contacts on radio", len(contacts))
# Sync each contact to database, then remove from radio
for public_key, contact_data in contacts.items():
# Save to database
await ContactRepository.upsert(
Contact.from_radio_dict(public_key, contact_data, on_radio=False)
)
synced += 1
# Remove from radio
try:
remove_result = await mc.commands.remove_contact(contact_data)
if remove_result.type == EventType.OK:
removed += 1
else:
logger.warning(
"Failed to remove contact %s: %s", public_key[:12], remove_result.payload
)
except Exception as e:
logger.warning("Error removing contact %s: %s", public_key[:12], e)
logger.info("Synced %d contacts, removed %d from radio", synced, removed)
except Exception as e:
logger.error("Error during contact sync: %s", e)
return {"synced": synced, "removed": removed, "error": str(e)}
return {"synced": synced, "removed": removed}
async def sync_and_offload_channels() -> dict:
"""
Sync channels from radio to database, then clear them from radio.
Returns counts of synced and cleared channels.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
logger.warning("Cannot sync channels: radio not connected")
return {"synced": 0, "cleared": 0, "error": "Radio not connected"}
mc = radio_manager.meshcore
synced = 0
cleared = 0
try:
# Check all 40 channel slots
for idx in range(40):
result = await mc.commands.get_channel(idx)
if result.type != EventType.CHANNEL_INFO:
continue
payload = result.payload
name = payload.get("channel_name", "")
secret = payload.get("channel_secret", b"")
# Skip empty channels
if not name or name == "\x00" * len(name) or all(b == 0 for b in secret):
continue
is_hashtag = name.startswith("#")
# Convert key bytes to hex string
key_bytes = secret if isinstance(secret, bytes) else bytes(secret)
key_hex = key_bytes.hex().upper()
# Save to database
await ChannelRepository.upsert(
key=key_hex,
name=name,
is_hashtag=is_hashtag,
on_radio=False, # We're about to clear it
)
synced += 1
logger.debug("Synced channel %s: %s", key_hex[:8], name)
# Clear from radio (set empty name and zero key)
try:
clear_result = await mc.commands.set_channel(
channel_idx=idx,
channel_name="",
channel_secret=bytes(16),
)
if clear_result.type == EventType.OK:
cleared += 1
else:
logger.warning("Failed to clear channel %d: %s", idx, clear_result.payload)
except Exception as e:
logger.warning("Error clearing channel %d: %s", idx, e)
logger.info("Synced %d channels, cleared %d from radio", synced, cleared)
except Exception as e:
logger.error("Error during channel sync: %s", e)
return {"synced": synced, "cleared": cleared, "error": str(e)}
return {"synced": synced, "cleared": cleared}
async def ensure_default_channels() -> None:
"""
Ensure default channels exist in the database.
These will be configured on the radio when needed for sending.
The Public channel is protected - it always exists with the canonical name.
"""
# Public channel - no hashtag, specific well-known key
PUBLIC_CHANNEL_KEY_HEX = "8B3387E9C5CDEA6AC9E5EDBAA115CD72"
# Check by KEY (not name) since that's what's fixed
existing = await ChannelRepository.get_by_key(PUBLIC_CHANNEL_KEY_HEX)
if not existing or existing.name != "Public":
logger.info("Ensuring default Public channel exists with correct name")
await ChannelRepository.upsert(
key=PUBLIC_CHANNEL_KEY_HEX,
name="Public",
is_hashtag=False,
on_radio=existing.on_radio if existing else False,
)
async def sync_and_offload_all() -> dict:
"""Sync and offload both contacts and channels, then ensure defaults exist."""
logger.info("Starting full radio sync and offload")
contacts_result = await sync_and_offload_contacts()
channels_result = await sync_and_offload_channels()
# Ensure default channels exist
await ensure_default_channels()
return {
"contacts": contacts_result,
"channels": channels_result,
}
async def drain_pending_messages() -> int:
"""
Drain all pending messages from the radio.
Calls get_msg() repeatedly until NO_MORE_MSGS is received.
Returns the count of messages retrieved.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
return 0
mc = radio_manager.meshcore
count = 0
max_iterations = 100 # Safety limit
for _ in range(max_iterations):
try:
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
break
elif result.type == EventType.ERROR:
logger.debug("Error during message drain: %s", result.payload)
break
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# Small delay between fetches
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
break
except Exception as e:
logger.debug("Error draining messages: %s", e)
break
return count
async def poll_for_messages() -> int:
"""
Poll the radio for any pending messages (single pass).
This is a fallback for platforms where MESSAGES_WAITING push events
don't work reliably.
Returns the count of messages retrieved.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
return 0
mc = radio_manager.meshcore
count = 0
try:
# Try to get one message
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
# No messages waiting
return 0
elif result.type == EventType.ERROR:
return 0
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# If we got a message, there might be more - drain them
count += await drain_pending_messages()
except asyncio.TimeoutError:
pass
except Exception as e:
logger.debug("Message poll exception: %s", e)
return count
async def _message_poll_loop():
"""Background task that periodically polls for messages."""
while True:
try:
await asyncio.sleep(MESSAGE_POLL_INTERVAL)
if radio_manager.is_connected and not is_polling_paused():
await poll_for_messages()
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Error in message poll loop: %s", e)
def start_message_polling():
"""Start the periodic message polling background task."""
global _message_poll_task
if _message_poll_task is None or _message_poll_task.done():
_message_poll_task = asyncio.create_task(_message_poll_loop())
logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL)
async def stop_message_polling():
"""Stop the periodic message polling background task."""
global _message_poll_task
if _message_poll_task and not _message_poll_task.done():
_message_poll_task.cancel()
try:
await _message_poll_task
except asyncio.CancelledError:
pass
_message_poll_task = None
logger.info("Stopped periodic message polling")
async def sync_radio_time() -> bool:
"""Sync the radio's clock with the system time.
Returns True if successful, False otherwise.
"""
mc = radio_manager.meshcore
if not mc:
logger.debug("Cannot sync time: radio not connected")
return False
try:
now = int(time.time())
await mc.commands.set_time(now)
logger.debug("Synced radio time to %d", now)
return True
except Exception as e:
logger.warning("Failed to sync radio time: %s", e)
return False
async def _periodic_sync_loop():
"""Background task that periodically syncs and offloads."""
while True:
try:
await asyncio.sleep(SYNC_INTERVAL)
logger.debug("Running periodic radio sync")
await sync_and_offload_all()
await sync_radio_time()
except asyncio.CancelledError:
logger.info("Periodic sync task cancelled")
break
except Exception as e:
logger.error("Error in periodic sync: %s", e)
def start_periodic_sync():
"""Start the periodic sync background task."""
global _sync_task
if _sync_task is None or _sync_task.done():
_sync_task = asyncio.create_task(_periodic_sync_loop())
logger.info("Started periodic radio sync (interval: %ds)", SYNC_INTERVAL)
async def stop_periodic_sync():
"""Stop the periodic sync background task."""
global _sync_task
if _sync_task and not _sync_task.done():
_sync_task.cancel()
try:
await _sync_task
except asyncio.CancelledError:
pass
_sync_task = None
logger.info("Stopped periodic radio sync")
# Throttling for contact sync to radio
_last_contact_sync: float = 0.0
CONTACT_SYNC_THROTTLE_SECONDS = 30 # Don't sync more than once per 30 seconds
async def sync_recent_contacts_to_radio(force: bool = False) -> dict:
"""
Load recent non-repeater contacts to the radio for DM ACK support.
This ensures the radio can auto-ACK incoming DMs from recent contacts.
Only runs at most once every CONTACT_SYNC_THROTTLE_SECONDS unless forced.
Returns counts of contacts loaded.
"""
global _last_contact_sync
# Throttle unless forced
now = time.time()
if not force and (now - _last_contact_sync) < CONTACT_SYNC_THROTTLE_SECONDS:
logger.debug("Contact sync throttled (last sync %ds ago)", int(now - _last_contact_sync))
return {"loaded": 0, "throttled": True}
if not radio_manager.is_connected or radio_manager.meshcore is None:
logger.debug("Cannot sync contacts to radio: not connected")
return {"loaded": 0, "error": "Radio not connected"}
mc = radio_manager.meshcore
_last_contact_sync = now
try:
# Get recent non-repeater contacts from database
max_contacts = settings.max_radio_contacts
contacts = await ContactRepository.get_recent_non_repeaters(limit=max_contacts)
logger.debug("Found %d recent non-repeater contacts to sync", len(contacts))
loaded = 0
already_on_radio = 0
failed = 0
for contact in contacts:
# Check if already on radio
radio_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if radio_contact:
already_on_radio += 1
# Update DB if not marked as on_radio
if not contact.on_radio:
await ContactRepository.set_on_radio(contact.public_key, True)
continue
try:
result = await mc.commands.add_contact(contact.to_radio_dict())
if result.type == EventType.OK:
loaded += 1
await ContactRepository.set_on_radio(contact.public_key, True)
logger.debug("Loaded contact %s to radio", contact.public_key[:12])
else:
failed += 1
logger.warning(
"Failed to load contact %s: %s", contact.public_key[:12], result.payload
)
except Exception as e:
failed += 1
logger.warning("Error loading contact %s: %s", contact.public_key[:12], e)
if loaded > 0 or failed > 0:
logger.info(
"Contact sync: loaded %d, already on radio %d, failed %d",
loaded,
already_on_radio,
failed,
)
return {
"loaded": loaded,
"already_on_radio": already_on_radio,
"failed": failed,
}
except Exception as e:
logger.error("Error syncing contacts to radio: %s", e)
return {"loaded": 0, "error": str(e)}