Merge pull request #2 from jkingsman/troubleshoot-linux

Add background polling task for fallback if radio subscriptions are lagging
This commit is contained in:
Jack Kingsman
2026-01-10 16:02:47 -05:00
committed by GitHub
2 changed files with 125 additions and 0 deletions

View File

@@ -12,7 +12,10 @@ from app.database import db
from app.event_handlers import register_event_handlers
from app.radio import radio_manager
from app.radio_sync import (
drain_pending_messages,
start_message_polling,
start_periodic_sync,
stop_message_polling,
stop_periodic_sync,
sync_and_offload_all,
)
@@ -49,6 +52,14 @@ async def lifespan(app: FastAPI):
await radio_manager.meshcore.start_auto_message_fetching()
logger.info("Auto message fetching started")
# Drain any messages that were queued before we connected
drained = await drain_pending_messages()
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
# Start periodic message polling as fallback for unreliable push events
start_message_polling()
except Exception as e:
logger.warning("Failed to connect to radio on startup: %s", e)
@@ -59,6 +70,7 @@ async def lifespan(app: FastAPI):
logger.info("Shutting down")
await radio_manager.stop_connection_monitor()
stop_message_polling()
stop_periodic_sync()
if radio_manager.meshcore:
await radio_manager.meshcore.stop_auto_message_fetching()

View File

@@ -5,6 +5,8 @@ This module handles syncing contacts and channels from the radio to the database
then removing them from the radio to free up space for new discoveries.
Also handles loading recent non-repeater contacts TO the radio for DM ACK support.
Also handles periodic message polling as a fallback for platforms where push events
don't work reliably.
"""
import asyncio
@@ -20,6 +22,12 @@ from app.repository import ChannelRepository, ContactRepository
logger = logging.getLogger(__name__)
# Message poll task handle
_message_poll_task: asyncio.Task | None = None
# Message poll interval in seconds
MESSAGE_POLL_INTERVAL = 5
# Background task handle
_sync_task: asyncio.Task | None = None
@@ -187,6 +195,111 @@ async def sync_and_offload_all() -> dict:
}
async def drain_pending_messages() -> int:
"""
Drain all pending messages from the radio.
Calls get_msg() repeatedly until NO_MORE_MSGS is received.
Returns the count of messages retrieved.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
return 0
mc = radio_manager.meshcore
count = 0
max_iterations = 100 # Safety limit
for _ in range(max_iterations):
try:
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
break
elif result.type == EventType.ERROR:
logger.debug("Error during message drain: %s", result.payload)
break
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# Small delay between fetches
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
break
except Exception as e:
logger.debug("Error draining messages: %s", e)
break
return count
async def poll_for_messages() -> int:
"""
Poll the radio for any pending messages (single pass).
This is a fallback for platforms where MESSAGES_WAITING push events
don't work reliably.
Returns the count of messages retrieved.
"""
if not radio_manager.is_connected or radio_manager.meshcore is None:
return 0
mc = radio_manager.meshcore
count = 0
try:
# Try to get one message
result = await mc.commands.get_msg(timeout=2.0)
if result.type == EventType.NO_MORE_MSGS:
# No messages waiting
return 0
elif result.type == EventType.ERROR:
return 0
elif result.type in (EventType.CONTACT_MSG_RECV, EventType.CHANNEL_MSG_RECV):
count += 1
# If we got a message, there might be more - drain them
count += await drain_pending_messages()
except asyncio.TimeoutError:
pass
except Exception as e:
logger.debug("Message poll exception: %s", e)
return count
async def _message_poll_loop():
"""Background task that periodically polls for messages."""
while True:
try:
await asyncio.sleep(MESSAGE_POLL_INTERVAL)
if radio_manager.is_connected:
await poll_for_messages()
except asyncio.CancelledError:
break
except Exception as e:
logger.debug("Error in message poll loop: %s", e)
def start_message_polling():
"""Start the periodic message polling background task."""
global _message_poll_task
if _message_poll_task is None or _message_poll_task.done():
_message_poll_task = asyncio.create_task(_message_poll_loop())
logger.info("Started periodic message polling (interval: %ds)", MESSAGE_POLL_INTERVAL)
def stop_message_polling():
"""Stop the periodic message polling background task."""
global _message_poll_task
if _message_poll_task and not _message_poll_task.done():
_message_poll_task.cancel()
async def _periodic_sync_loop():
"""Background task that periodically syncs and offloads."""
while True: