diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index 32e7506..6d54b2c 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -1,6 +1,7 @@ import asyncio import concurrent.futures import logging +import threading import time from typing import Optional @@ -67,8 +68,9 @@ class StorageCollector: # Initialize WebSocket handler for real-time updates self.websocket_available = False self.websocket_has_connected_clients = lambda: False - self._last_ws_stats_broadcast: float = 0.0 self._ws_stats_broadcast_interval_sec: float = 5.0 + self._stats_stop_event = threading.Event() + self._stats_thread: Optional[threading.Thread] = None try: from .websocket_handler import ( broadcast_packet, @@ -81,6 +83,19 @@ class StorageCollector: self.websocket_has_connected_clients = has_connected_clients self.websocket_available = True logger.info("WebSocket handler initialized for real-time updates") + + # Broadcast aggregate stats on a fixed cadence rather than inline on the + # per-packet write path. get_packet_stats(24h) is a multi-second aggregate; + # running it inside _record_packet_blocking made the storage writer thread + # spend ~1-2s of every 5s on it, competing with packet inserts. A dedicated + # tick keeps the writer doing only fast writes and only runs the aggregate + # when a dashboard client is actually connected. + self._stats_thread = threading.Thread( + target=self._stats_broadcast_loop, + name="stats-broadcast", + daemon=True, + ) + self._stats_thread.start() except ImportError: logger.debug("WebSocket handler not available") @@ -200,36 +215,49 @@ class StorageCollector: self._publish_packet_sync(packet_record, skip_mqtt) def _publish_packet_sync(self, packet_record: dict, skip_mqtt: bool): - """Publish packet updates synchronously (used when no asyncio loop is active).""" + """Publish a single packet (glass, per-packet WebSocket event, MQTT). + + Only fast, per-packet work runs here. The aggregate stats broadcast is + driven separately by _stats_broadcast_loop so the writer thread is not + held by the multi-second get_packet_stats(24h) query. + """ self._publish_to_glass(packet_record, "packet") if self.websocket_available: try: self.websocket_broadcast_packet(packet_record) - if self.websocket_has_connected_clients(): - now_mono = time.monotonic() - if ( - now_mono - self._last_ws_stats_broadcast - >= self._ws_stats_broadcast_interval_sec - ): - self._last_ws_stats_broadcast = now_mono - packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) - uptime_seconds = ( - time.time() - self.repeater_handler.start_time - if self.repeater_handler - else 0 - ) - self.websocket_broadcast_stats( - { - "packet_stats": packet_stats_24h, - "system_stats": {"uptime_seconds": uptime_seconds}, - } - ) except Exception as e: logger.debug(f"WebSocket broadcast failed: {e}") self._publish_packet_to_mqtt(packet_record) + def _broadcast_stats_once(self) -> None: + """Compute the 24h aggregate and broadcast it to WebSocket clients.""" + packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) + uptime_seconds = ( + time.time() - self.repeater_handler.start_time if self.repeater_handler else 0 + ) + self.websocket_broadcast_stats( + { + "packet_stats": packet_stats_24h, + "system_stats": {"uptime_seconds": uptime_seconds}, + } + ) + + def _stats_broadcast_loop(self) -> None: + """Broadcast aggregate stats every interval while clients are connected. + + Runs on its own thread (off the event loop and off the storage writer) so + the heavy get_packet_stats(24h) aggregate never sits in the packet write + path. Skips the query entirely when no dashboard client is connected. + """ + while not self._stats_stop_event.wait(self._ws_stats_broadcast_interval_sec): + try: + if self.websocket_has_connected_clients(): + self._broadcast_stats_once() + except Exception as e: + logger.debug(f"Stats broadcast failed: {e}") + def _publish_packet_to_mqtt(self, packet_record: dict): """Publish packet to mqtt broker if enabled and allowed. @@ -443,6 +471,11 @@ class StorageCollector: return self.sqlite_handler.get_noise_floor_stats(hours) def close(self): + # Stop the stats broadcast thread. + self._stats_stop_event.set() + if self._stats_thread is not None: + self._stats_thread.join(timeout=2) + # Drain and stop the storage writer thread first so pending writes and # publishes complete before MQTT and the DB connections are torn down. self._db_executor.shutdown(wait=True) diff --git a/tests/test_storage_collector_ws_stats_throttle.py b/tests/test_storage_collector_ws_stats_throttle.py index fb3f7f0..1650904 100644 --- a/tests/test_storage_collector_ws_stats_throttle.py +++ b/tests/test_storage_collector_ws_stats_throttle.py @@ -1,4 +1,5 @@ import sys +import threading import types from types import SimpleNamespace from unittest.mock import MagicMock, patch @@ -30,6 +31,14 @@ def _make_collector() -> StorageCollector: ): collector = StorageCollector(config={"storage": {"storage_dir": "/tmp/pymc_repeater_test"}}) + # Stop any real stats-broadcast thread started during construction so the tests + # drive the loop deterministically. + collector._stats_stop_event.set() + if collector._stats_thread is not None: + collector._stats_thread.join(timeout=1) + collector._stats_stop_event = threading.Event() + collector._stats_thread = None + collector.sqlite_handler = MagicMock() collector.sqlite_handler.get_packet_stats.return_value = {"total_packets": 1} collector.websocket_available = True @@ -40,40 +49,50 @@ def _make_collector() -> StorageCollector: return collector -def test_publish_packet_sync_first_call_broadcasts_stats_immediately(): +def test_publish_packet_sync_broadcasts_packet_event_not_stats(): + # The per-packet path must stay fast: it broadcasts the packet event but never + # runs the heavy aggregate or the stats broadcast (those moved to the loop). collector = _make_collector() - with patch("repeater.data_acquisition.storage_collector.time.monotonic", return_value=1000.0): - collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False) + collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False) - assert collector.sqlite_handler.get_packet_stats.call_count == 1 - assert collector.websocket_broadcast_stats.call_count == 1 assert collector.websocket_broadcast_packet.call_count == 1 - - -def test_publish_packet_sync_throttles_stats_to_interval(): - collector = _make_collector() - call_times = [1000.0 + (i * 0.1) for i in range(10)] + [1005.1] - - with patch( - "repeater.data_acquisition.storage_collector.time.monotonic", - side_effect=call_times, - ): - for _ in call_times: - collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False) - - assert collector.sqlite_handler.get_packet_stats.call_count == 2 - assert collector.websocket_broadcast_stats.call_count == 2 - assert collector.websocket_broadcast_packet.call_count == len(call_times) - - -def test_publish_packet_sync_always_broadcasts_packet_event_even_without_clients(): - collector = _make_collector() - collector.websocket_has_connected_clients.return_value = False - - for _ in range(5): - collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False) - - assert collector.websocket_broadcast_packet.call_count == 5 assert collector.sqlite_handler.get_packet_stats.call_count == 0 assert collector.websocket_broadcast_stats.call_count == 0 + + +def test_broadcast_stats_once_queries_and_broadcasts(): + collector = _make_collector() + + collector._broadcast_stats_once() + + collector.sqlite_handler.get_packet_stats.assert_called_once_with(hours=24) + assert collector.websocket_broadcast_stats.call_count == 1 + payload = collector.websocket_broadcast_stats.call_args.args[0] + assert payload["packet_stats"] == {"total_packets": 1} + assert "uptime_seconds" in payload["system_stats"] + + +def test_stats_loop_broadcasts_when_clients_connected(): + collector = _make_collector() + collector._broadcast_stats_once = MagicMock() + collector.websocket_has_connected_clients = MagicMock(return_value=True) + # Drive exactly one iteration: wait() returns False (proceed) then True (exit). + collector._stats_stop_event = MagicMock() + collector._stats_stop_event.wait.side_effect = [False, True] + + collector._stats_broadcast_loop() + + assert collector._broadcast_stats_once.call_count == 1 + + +def test_stats_loop_skips_when_no_clients(): + collector = _make_collector() + collector._broadcast_stats_once = MagicMock() + collector.websocket_has_connected_clients = MagicMock(return_value=False) + collector._stats_stop_event = MagicMock() + collector._stats_stop_event.wait.side_effect = [False, True] + + collector._stats_broadcast_loop() + + assert collector._broadcast_stats_once.call_count == 0