From 9797e084213d82fe52fdd6e4fc20bfa05491a22a Mon Sep 17 00:00:00 2001 From: Lloyd Date: Tue, 21 Apr 2026 10:07:15 +0100 Subject: [PATCH] feat: implement background scheduling for deferred network publishing tasks, tidy shutdown process --- .../data_acquisition/storage_collector.py | 128 ++++++++++-------- repeater/main.py | 31 +++++ 2 files changed, 105 insertions(+), 54 deletions(-) diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index f6b1510..c980854 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -103,6 +103,18 @@ class StorageCollector: task.add_done_callback(on_done) + def _schedule_background(self, coro_factory, *args, sync_fallback=None): + """Schedule a coroutine if a loop exists; otherwise run sync fallback.""" + try: + loop = asyncio.get_running_loop() + except RuntimeError: + if sync_fallback is not None: + sync_fallback(*args) + return + + task = loop.create_task(coro_factory(*args)) + self._track_task(task) + def _get_live_stats(self) -> dict: """Get live stats from RepeaterHandler""" if not self.repeater_handler: @@ -167,49 +179,48 @@ class StorageCollector: # DEFERRED: Publish to network sinks and WebSocket in background tasks # This prevents network latency from blocking packet processing - task = asyncio.create_task( - self._deferred_publish( - packet_record, skip_letsmesh_if_invalid, cumulative_counts - ) + self._schedule_background( + self._deferred_publish, + packet_record, + skip_letsmesh_if_invalid, + sync_fallback=self._publish_packet_sync, ) - self._track_task(task) - async def _deferred_publish(self, packet_record: dict, skip_letsmesh: bool, cumulative_counts: dict): + async def _deferred_publish(self, packet_record: dict, skip_letsmesh: bool): """Deferred background task for all network publishing operations.""" try: - # Publish to local MQTT - self.mqtt_handler.publish(packet_record, "packet") - self._publish_to_glass(packet_record, "packet") - - # Broadcast to WebSocket clients with stats - if self.websocket_available: - try: - self.websocket_broadcast_packet(packet_record) - packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) - uptime_seconds = ( - time.time() - self.repeater_handler.start_time - if self.repeater_handler - else 0 - ) - self.websocket_broadcast_stats( - { - "packet_stats": packet_stats_24h, - "system_stats": {"uptime_seconds": uptime_seconds}, - } - ) - except Exception as e: - logger.debug(f"WebSocket broadcast failed: {e}") - - # Publish to LetsMesh if enabled - if skip_letsmesh and packet_record.get("drop_reason"): - logger.debug( - f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}" - ) - else: - self._publish_to_letsmesh(packet_record) + self._publish_packet_sync(packet_record, skip_letsmesh) except Exception as e: logger.error(f"Deferred publish failed: {e}", exc_info=True) + def _publish_packet_sync(self, packet_record: dict, skip_letsmesh: bool): + """Publish packet updates synchronously (used when no asyncio loop is active).""" + self.mqtt_handler.publish(packet_record, "packet") + self._publish_to_glass(packet_record, "packet") + + if self.websocket_available: + try: + self.websocket_broadcast_packet(packet_record) + packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) + uptime_seconds = ( + time.time() - self.repeater_handler.start_time if self.repeater_handler else 0 + ) + self.websocket_broadcast_stats( + { + "packet_stats": packet_stats_24h, + "system_stats": {"uptime_seconds": uptime_seconds}, + } + ) + except Exception as e: + logger.debug(f"WebSocket broadcast failed: {e}") + + if skip_letsmesh and packet_record.get("drop_reason"): + logger.debug( + f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}" + ) + else: + self._publish_to_letsmesh(packet_record) + def _publish_to_letsmesh(self, packet_record: dict): """Publish packet to LetsMesh broker if enabled and allowed""" if not self.letsmesh_handler: @@ -242,56 +253,65 @@ class StorageCollector: def record_advert(self, advert_record: dict): """Record advert to storage and defer network publishing to background tasks.""" self.sqlite_handler.store_advert(advert_record) - # Defer MQTT and Glass publishing to background task - task = asyncio.create_task( - self._deferred_publish_advert(advert_record) + self._schedule_background( + self._deferred_publish_advert, + advert_record, + sync_fallback=self._publish_advert_sync, ) - self._track_task(task) async def _deferred_publish_advert(self, advert_record: dict): """Deferred background task for advert publishing.""" try: - self.mqtt_handler.publish(advert_record, "advert") - self._publish_to_glass(advert_record, "advert") + self._publish_advert_sync(advert_record) except Exception as e: logger.error(f"Deferred advert publish failed: {e}", exc_info=True) + def _publish_advert_sync(self, advert_record: dict): + self.mqtt_handler.publish(advert_record, "advert") + self._publish_to_glass(advert_record, "advert") + def record_noise_floor(self, noise_floor_dbm: float): """Record noise floor to storage and defer network publishing to background tasks.""" noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm} self.sqlite_handler.store_noise_floor(noise_record) - # Defer MQTT and Glass publishing to background task - task = asyncio.create_task( - self._deferred_publish_noise_floor(noise_record) + self._schedule_background( + self._deferred_publish_noise_floor, + noise_record, + sync_fallback=self._publish_noise_floor_sync, ) - self._track_task(task) async def _deferred_publish_noise_floor(self, noise_record: dict): """Deferred background task for noise floor publishing.""" try: - self.mqtt_handler.publish(noise_record, "noise_floor") - self._publish_to_glass(noise_record, "noise_floor") + self._publish_noise_floor_sync(noise_record) except Exception as e: logger.error(f"Deferred noise floor publish failed: {e}", exc_info=True) + def _publish_noise_floor_sync(self, noise_record: dict): + self.mqtt_handler.publish(noise_record, "noise_floor") + self._publish_to_glass(noise_record, "noise_floor") + def record_crc_errors(self, count: int): """Record a batch of CRC errors detected since last poll and defer publishing.""" crc_record = {"timestamp": time.time(), "count": count} self.sqlite_handler.store_crc_errors(crc_record) - # Defer MQTT and Glass publishing to background task - task = asyncio.create_task( - self._deferred_publish_crc_errors(crc_record) + self._schedule_background( + self._deferred_publish_crc_errors, + crc_record, + sync_fallback=self._publish_crc_errors_sync, ) - self._track_task(task) async def _deferred_publish_crc_errors(self, crc_record: dict): """Deferred background task for CRC error publishing.""" try: - self.mqtt_handler.publish(crc_record, "crc_errors") - self._publish_to_glass(crc_record, "crc_errors") + self._publish_crc_errors_sync(crc_record) except Exception as e: logger.error(f"Deferred CRC errors publish failed: {e}", exc_info=True) + def _publish_crc_errors_sync(self, crc_record: dict): + self.mqtt_handler.publish(crc_record, "crc_errors") + self._publish_to_glass(crc_record, "crc_errors") + def get_crc_error_count(self, hours: int = 24) -> int: return self.sqlite_handler.get_crc_error_count(hours) diff --git a/repeater/main.py b/repeater/main.py index bb8b4c8..2309425 100644 --- a/repeater/main.py +++ b/repeater/main.py @@ -53,6 +53,7 @@ class RepeaterDaemon: self.router = None self.companion_bridges: dict[int, object] = {} self.companion_frame_servers: list = [] + self._shutdown_started = False log_level = config.get("logging", {}).get("level", "INFO") logging.basicConfig( @@ -1020,11 +1021,34 @@ class RepeaterDaemon: def _signal_shutdown(self, sig, loop): """Handle SIGTERM/SIGINT by scheduling async shutdown.""" + if self._shutdown_started: + logger.info(f"Received signal {sig.name}, shutdown already in progress") + return logger.info(f"Received signal {sig.name}, shutting down...") loop.create_task(self._shutdown()) async def _shutdown(self): """Best-effort shutdown: stop background services and release hardware.""" + if self._shutdown_started: + return + self._shutdown_started = True + + # Stop companion frame servers first to close client sockets and child workers. + for frame_server in getattr(self, "companion_frame_servers", []): + try: + await frame_server.stop() + except Exception as e: + logger.warning(f"Companion frame server stop error: {e}") + + # Stop companion bridges to flush/persist state. + if hasattr(self, "companion_bridges"): + for bridge in self.companion_bridges.values(): + if hasattr(bridge, "stop"): + try: + await bridge.stop() + except Exception as e: + logger.warning(f"Companion bridge stop error: {e}") + # Stop router if self.router: try: @@ -1046,6 +1070,13 @@ class RepeaterDaemon: except Exception as e: logger.warning(f"Error stopping Glass handler: {e}") + # Close storage publishers (MQTT/LetsMesh) to stop their worker threads. + try: + if self.repeater_handler and self.repeater_handler.storage: + self.repeater_handler.storage.close() + except Exception as e: + logger.warning(f"Error closing storage: {e}") + # Release radio resources if self.radio and hasattr(self.radio, "cleanup"): try: