feat: implement background scheduling for deferred network publishing tasks, tidy shutdown process

This commit is contained in:
Lloyd
2026-04-21 10:07:15 +01:00
parent 3df4b03fd9
commit 9797e08421
2 changed files with 105 additions and 54 deletions

View File

@@ -103,6 +103,18 @@ class StorageCollector:
task.add_done_callback(on_done)
def _schedule_background(self, coro_factory, *args, sync_fallback=None):
"""Schedule a coroutine if a loop exists; otherwise run sync fallback."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
if sync_fallback is not None:
sync_fallback(*args)
return
task = loop.create_task(coro_factory(*args))
self._track_task(task)
def _get_live_stats(self) -> dict:
"""Get live stats from RepeaterHandler"""
if not self.repeater_handler:
@@ -167,49 +179,48 @@ class StorageCollector:
# DEFERRED: Publish to network sinks and WebSocket in background tasks
# This prevents network latency from blocking packet processing
task = asyncio.create_task(
self._deferred_publish(
packet_record, skip_letsmesh_if_invalid, cumulative_counts
)
self._schedule_background(
self._deferred_publish,
packet_record,
skip_letsmesh_if_invalid,
sync_fallback=self._publish_packet_sync,
)
self._track_task(task)
async def _deferred_publish(self, packet_record: dict, skip_letsmesh: bool, cumulative_counts: dict):
async def _deferred_publish(self, packet_record: dict, skip_letsmesh: bool):
"""Deferred background task for all network publishing operations."""
try:
# Publish to local MQTT
self.mqtt_handler.publish(packet_record, "packet")
self._publish_to_glass(packet_record, "packet")
# Broadcast to WebSocket clients with stats
if self.websocket_available:
try:
self.websocket_broadcast_packet(packet_record)
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
uptime_seconds = (
time.time() - self.repeater_handler.start_time
if self.repeater_handler
else 0
)
self.websocket_broadcast_stats(
{
"packet_stats": packet_stats_24h,
"system_stats": {"uptime_seconds": uptime_seconds},
}
)
except Exception as e:
logger.debug(f"WebSocket broadcast failed: {e}")
# Publish to LetsMesh if enabled
if skip_letsmesh and packet_record.get("drop_reason"):
logger.debug(
f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}"
)
else:
self._publish_to_letsmesh(packet_record)
self._publish_packet_sync(packet_record, skip_letsmesh)
except Exception as e:
logger.error(f"Deferred publish failed: {e}", exc_info=True)
def _publish_packet_sync(self, packet_record: dict, skip_letsmesh: bool):
"""Publish packet updates synchronously (used when no asyncio loop is active)."""
self.mqtt_handler.publish(packet_record, "packet")
self._publish_to_glass(packet_record, "packet")
if self.websocket_available:
try:
self.websocket_broadcast_packet(packet_record)
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
uptime_seconds = (
time.time() - self.repeater_handler.start_time if self.repeater_handler else 0
)
self.websocket_broadcast_stats(
{
"packet_stats": packet_stats_24h,
"system_stats": {"uptime_seconds": uptime_seconds},
}
)
except Exception as e:
logger.debug(f"WebSocket broadcast failed: {e}")
if skip_letsmesh and packet_record.get("drop_reason"):
logger.debug(
f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}"
)
else:
self._publish_to_letsmesh(packet_record)
def _publish_to_letsmesh(self, packet_record: dict):
"""Publish packet to LetsMesh broker if enabled and allowed"""
if not self.letsmesh_handler:
@@ -242,56 +253,65 @@ class StorageCollector:
def record_advert(self, advert_record: dict):
"""Record advert to storage and defer network publishing to background tasks."""
self.sqlite_handler.store_advert(advert_record)
# Defer MQTT and Glass publishing to background task
task = asyncio.create_task(
self._deferred_publish_advert(advert_record)
self._schedule_background(
self._deferred_publish_advert,
advert_record,
sync_fallback=self._publish_advert_sync,
)
self._track_task(task)
async def _deferred_publish_advert(self, advert_record: dict):
"""Deferred background task for advert publishing."""
try:
self.mqtt_handler.publish(advert_record, "advert")
self._publish_to_glass(advert_record, "advert")
self._publish_advert_sync(advert_record)
except Exception as e:
logger.error(f"Deferred advert publish failed: {e}", exc_info=True)
def _publish_advert_sync(self, advert_record: dict):
self.mqtt_handler.publish(advert_record, "advert")
self._publish_to_glass(advert_record, "advert")
def record_noise_floor(self, noise_floor_dbm: float):
"""Record noise floor to storage and defer network publishing to background tasks."""
noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm}
self.sqlite_handler.store_noise_floor(noise_record)
# Defer MQTT and Glass publishing to background task
task = asyncio.create_task(
self._deferred_publish_noise_floor(noise_record)
self._schedule_background(
self._deferred_publish_noise_floor,
noise_record,
sync_fallback=self._publish_noise_floor_sync,
)
self._track_task(task)
async def _deferred_publish_noise_floor(self, noise_record: dict):
"""Deferred background task for noise floor publishing."""
try:
self.mqtt_handler.publish(noise_record, "noise_floor")
self._publish_to_glass(noise_record, "noise_floor")
self._publish_noise_floor_sync(noise_record)
except Exception as e:
logger.error(f"Deferred noise floor publish failed: {e}", exc_info=True)
def _publish_noise_floor_sync(self, noise_record: dict):
self.mqtt_handler.publish(noise_record, "noise_floor")
self._publish_to_glass(noise_record, "noise_floor")
def record_crc_errors(self, count: int):
"""Record a batch of CRC errors detected since last poll and defer publishing."""
crc_record = {"timestamp": time.time(), "count": count}
self.sqlite_handler.store_crc_errors(crc_record)
# Defer MQTT and Glass publishing to background task
task = asyncio.create_task(
self._deferred_publish_crc_errors(crc_record)
self._schedule_background(
self._deferred_publish_crc_errors,
crc_record,
sync_fallback=self._publish_crc_errors_sync,
)
self._track_task(task)
async def _deferred_publish_crc_errors(self, crc_record: dict):
"""Deferred background task for CRC error publishing."""
try:
self.mqtt_handler.publish(crc_record, "crc_errors")
self._publish_to_glass(crc_record, "crc_errors")
self._publish_crc_errors_sync(crc_record)
except Exception as e:
logger.error(f"Deferred CRC errors publish failed: {e}", exc_info=True)
def _publish_crc_errors_sync(self, crc_record: dict):
self.mqtt_handler.publish(crc_record, "crc_errors")
self._publish_to_glass(crc_record, "crc_errors")
def get_crc_error_count(self, hours: int = 24) -> int:
return self.sqlite_handler.get_crc_error_count(hours)

View File

@@ -53,6 +53,7 @@ class RepeaterDaemon:
self.router = None
self.companion_bridges: dict[int, object] = {}
self.companion_frame_servers: list = []
self._shutdown_started = False
log_level = config.get("logging", {}).get("level", "INFO")
logging.basicConfig(
@@ -1020,11 +1021,34 @@ class RepeaterDaemon:
def _signal_shutdown(self, sig, loop):
"""Handle SIGTERM/SIGINT by scheduling async shutdown."""
if self._shutdown_started:
logger.info(f"Received signal {sig.name}, shutdown already in progress")
return
logger.info(f"Received signal {sig.name}, shutting down...")
loop.create_task(self._shutdown())
async def _shutdown(self):
"""Best-effort shutdown: stop background services and release hardware."""
if self._shutdown_started:
return
self._shutdown_started = True
# Stop companion frame servers first to close client sockets and child workers.
for frame_server in getattr(self, "companion_frame_servers", []):
try:
await frame_server.stop()
except Exception as e:
logger.warning(f"Companion frame server stop error: {e}")
# Stop companion bridges to flush/persist state.
if hasattr(self, "companion_bridges"):
for bridge in self.companion_bridges.values():
if hasattr(bridge, "stop"):
try:
await bridge.stop()
except Exception as e:
logger.warning(f"Companion bridge stop error: {e}")
# Stop router
if self.router:
try:
@@ -1046,6 +1070,13 @@ class RepeaterDaemon:
except Exception as e:
logger.warning(f"Error stopping Glass handler: {e}")
# Close storage publishers (MQTT/LetsMesh) to stop their worker threads.
try:
if self.repeater_handler and self.repeater_handler.storage:
self.repeater_handler.storage.close()
except Exception as e:
logger.warning(f"Error closing storage: {e}")
# Release radio resources
if self.radio and hasattr(self.radio, "cleanup"):
try: