diff --git a/app/main.py b/app/main.py index 25c87d9..cb70d50 100644 --- a/app/main.py +++ b/app/main.py @@ -12,7 +12,10 @@ from app.database import db from app.event_handlers import register_event_handlers from app.radio import radio_manager from app.radio_sync import ( + drain_pending_messages, + start_message_polling, start_periodic_sync, + stop_message_polling, stop_periodic_sync, sync_and_offload_all, ) @@ -49,6 +52,14 @@ async def lifespan(app: FastAPI): await radio_manager.meshcore.start_auto_message_fetching() logger.info("Auto message fetching started") + + # Drain any messages that were queued before we connected + drained = await drain_pending_messages() + if drained > 0: + logger.info("Drained %d pending message(s)", drained) + + # Start periodic message polling as fallback for unreliable push events + start_message_polling() except Exception as e: logger.warning("Failed to connect to radio on startup: %s", e) @@ -59,6 +70,7 @@ async def lifespan(app: FastAPI): logger.info("Shutting down") await radio_manager.stop_connection_monitor() + stop_message_polling() stop_periodic_sync() if radio_manager.meshcore: await radio_manager.meshcore.stop_auto_message_fetching() diff --git a/app/radio_sync.py b/app/radio_sync.py index ec87d73..a4965d5 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -5,6 +5,8 @@ This module handles syncing contacts and channels from the radio to the database then removing them from the radio to free up space for new discoveries. Also handles loading recent non-repeater contacts TO the radio for DM ACK support. +Also handles periodic message polling as a fallback for platforms where push events +don't work reliably. """ import asyncio @@ -20,6 +22,12 @@ from app.repository import ChannelRepository, ContactRepository logger = logging.getLogger(__name__) +# Message poll task handle +_message_poll_task: asyncio.Task | None = None + +# Message poll interval in seconds +MESSAGE_POLL_INTERVAL = 5 + # Background task handle _sync_task: asyncio.Task | None = None @@ -187,6 +195,111 @@ async def sync_and_offload_all() -> dict: } +async def drain_pending_messages() -> int: + """ + Drain all pending messages from the radio. + + Calls get_msg() repeatedly until NO_MORE_MSGS is received. + Returns the count of messages retrieved. + """ + if not radio_manager.is_connected or radio_manager.meshcore is None: + return 0 + + mc = radio_manager.meshcore + count = 0 + max_iterations = 100 # Safety limit + + for _ in range(max_iterations): + try: + result = await mc.commands.get_msg(timeout=2.0) + + if result.type == EventType.NO_MORE_MSGS: + break + elif result.type == EventType.ERROR: + logger.debug("Error during message drain: %s", result.payload) + break + elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV): + count += 1 + + # Small delay between fetches + await asyncio.sleep(0.1) + + except asyncio.TimeoutError: + break + except Exception as e: + logger.debug("Error draining messages: %s", e) + break + + return count + + +async def poll_for_messages() -> int: + """ + Poll the radio for any pending messages (single pass). + + This is a fallback for platforms where MESSAGES_WAITING push events + don't work reliably. + + Returns the count of messages retrieved. + """ + if not radio_manager.is_connected or radio_manager.meshcore is None: + return 0 + + mc = radio_manager.meshcore + count = 0 + + try: + # Try to get one message + result = await mc.commands.get_msg(timeout=2.0) + + if result.type == EventType.NO_MORE_MSGS: + # No messages waiting + return 0 + elif result.type == EventType.ERROR: + return 0 + elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV): + count += 1 + # If we got a message, there might be more - drain them + count += await drain_pending_messages() + + except asyncio.TimeoutError: + pass + except Exception as e: + logger.debug("Message poll exception: %s", e) + + return count + + +async def _message_poll_loop(): + """Background task that periodically polls for messages.""" + while True: + try: + await asyncio.sleep(MESSAGE_POLL_INTERVAL) + + if radio_manager.is_connected: + await poll_for_messages() + + except asyncio.CancelledError: + break + except Exception as e: + logger.debug("Error in message poll loop: %s", e) + + +def start_message_polling(): + """Start the periodic message polling background task.""" + global _message_poll_task + if _message_poll_task is None or _message_poll_task.done(): + _message_poll_task = asyncio.create_task(_message_poll_loop()) + logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL) + + +def stop_message_polling(): + """Stop the periodic message polling background task.""" + global _message_poll_task + if _message_poll_task and not _message_poll_task.done(): + _message_poll_task.cancel() + + async def _periodic_sync_loop(): """Background task that periodically syncs and offloads.""" while True: