feat: implement threaded stats broadcasting in StorageCollector

- Introduced a dedicated thread for broadcasting aggregate statistics to WebSocket clients, improving performance by offloading heavy queries from the packet write path.
- Updated the _publish_packet_sync method to ensure only fast, per-packet operations are executed, while the stats are handled separately.
- Added methods for broadcasting stats at intervals and managing the lifecycle of the stats thread.
- Enhanced unit tests to validate the new stats broadcasting behavior and ensure correct functionality under various conditions.
This commit is contained in:
agessaman
2026-06-18 21:38:20 -07:00
parent 2435757197
commit cd763e2cac
2 changed files with 104 additions and 52 deletions
+54 -21
View File
@@ -1,6 +1,7 @@
import asyncio
import concurrent.futures
import logging
import threading
import time
from typing import Optional
@@ -67,8 +68,9 @@ class StorageCollector:
# Initialize WebSocket handler for real-time updates
self.websocket_available = False
self.websocket_has_connected_clients = lambda: False
self._last_ws_stats_broadcast: float = 0.0
self._ws_stats_broadcast_interval_sec: float = 5.0
self._stats_stop_event = threading.Event()
self._stats_thread: Optional[threading.Thread] = None
try:
from .websocket_handler import (
broadcast_packet,
@@ -81,6 +83,19 @@ class StorageCollector:
self.websocket_has_connected_clients = has_connected_clients
self.websocket_available = True
logger.info("WebSocket handler initialized for real-time updates")
# Broadcast aggregate stats on a fixed cadence rather than inline on the
# per-packet write path. get_packet_stats(24h) is a multi-second aggregate;
# running it inside _record_packet_blocking made the storage writer thread
# spend ~1-2s of every 5s on it, competing with packet inserts. A dedicated
# tick keeps the writer doing only fast writes and only runs the aggregate
# when a dashboard client is actually connected.
self._stats_thread = threading.Thread(
target=self._stats_broadcast_loop,
name="stats-broadcast",
daemon=True,
)
self._stats_thread.start()
except ImportError:
logger.debug("WebSocket handler not available")
@@ -200,36 +215,49 @@ class StorageCollector:
self._publish_packet_sync(packet_record, skip_mqtt)
def _publish_packet_sync(self, packet_record: dict, skip_mqtt: bool):
"""Publish packet updates synchronously (used when no asyncio loop is active)."""
"""Publish a single packet (glass, per-packet WebSocket event, MQTT).
Only fast, per-packet work runs here. The aggregate stats broadcast is
driven separately by _stats_broadcast_loop so the writer thread is not
held by the multi-second get_packet_stats(24h) query.
"""
self._publish_to_glass(packet_record, "packet")
if self.websocket_available:
try:
self.websocket_broadcast_packet(packet_record)
if self.websocket_has_connected_clients():
now_mono = time.monotonic()
if (
now_mono - self._last_ws_stats_broadcast
>= self._ws_stats_broadcast_interval_sec
):
self._last_ws_stats_broadcast = now_mono
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
uptime_seconds = (
time.time() - self.repeater_handler.start_time
if self.repeater_handler
else 0
)
self.websocket_broadcast_stats(
{
"packet_stats": packet_stats_24h,
"system_stats": {"uptime_seconds": uptime_seconds},
}
)
except Exception as e:
logger.debug(f"WebSocket broadcast failed: {e}")
self._publish_packet_to_mqtt(packet_record)
def _broadcast_stats_once(self) -> None:
"""Compute the 24h aggregate and broadcast it to WebSocket clients."""
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
uptime_seconds = (
time.time() - self.repeater_handler.start_time if self.repeater_handler else 0
)
self.websocket_broadcast_stats(
{
"packet_stats": packet_stats_24h,
"system_stats": {"uptime_seconds": uptime_seconds},
}
)
def _stats_broadcast_loop(self) -> None:
"""Broadcast aggregate stats every interval while clients are connected.
Runs on its own thread (off the event loop and off the storage writer) so
the heavy get_packet_stats(24h) aggregate never sits in the packet write
path. Skips the query entirely when no dashboard client is connected.
"""
while not self._stats_stop_event.wait(self._ws_stats_broadcast_interval_sec):
try:
if self.websocket_has_connected_clients():
self._broadcast_stats_once()
except Exception as e:
logger.debug(f"Stats broadcast failed: {e}")
def _publish_packet_to_mqtt(self, packet_record: dict):
"""Publish packet to mqtt broker if enabled and allowed.
@@ -443,6 +471,11 @@ class StorageCollector:
return self.sqlite_handler.get_noise_floor_stats(hours)
def close(self):
# Stop the stats broadcast thread.
self._stats_stop_event.set()
if self._stats_thread is not None:
self._stats_thread.join(timeout=2)
# Drain and stop the storage writer thread first so pending writes and
# publishes complete before MQTT and the DB connections are torn down.
self._db_executor.shutdown(wait=True)
@@ -1,4 +1,5 @@
import sys
import threading
import types
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
@@ -30,6 +31,14 @@ def _make_collector() -> StorageCollector:
):
collector = StorageCollector(config={"storage": {"storage_dir": "/tmp/pymc_repeater_test"}})
# Stop any real stats-broadcast thread started during construction so the tests
# drive the loop deterministically.
collector._stats_stop_event.set()
if collector._stats_thread is not None:
collector._stats_thread.join(timeout=1)
collector._stats_stop_event = threading.Event()
collector._stats_thread = None
collector.sqlite_handler = MagicMock()
collector.sqlite_handler.get_packet_stats.return_value = {"total_packets": 1}
collector.websocket_available = True
@@ -40,40 +49,50 @@ def _make_collector() -> StorageCollector:
return collector
def test_publish_packet_sync_first_call_broadcasts_stats_immediately():
def test_publish_packet_sync_broadcasts_packet_event_not_stats():
# The per-packet path must stay fast: it broadcasts the packet event but never
# runs the heavy aggregate or the stats broadcast (those moved to the loop).
collector = _make_collector()
with patch("repeater.data_acquisition.storage_collector.time.monotonic", return_value=1000.0):
collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False)
collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False)
assert collector.sqlite_handler.get_packet_stats.call_count == 1
assert collector.websocket_broadcast_stats.call_count == 1
assert collector.websocket_broadcast_packet.call_count == 1
def test_publish_packet_sync_throttles_stats_to_interval():
collector = _make_collector()
call_times = [1000.0 + (i * 0.1) for i in range(10)] + [1005.1]
with patch(
"repeater.data_acquisition.storage_collector.time.monotonic",
side_effect=call_times,
):
for _ in call_times:
collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False)
assert collector.sqlite_handler.get_packet_stats.call_count == 2
assert collector.websocket_broadcast_stats.call_count == 2
assert collector.websocket_broadcast_packet.call_count == len(call_times)
def test_publish_packet_sync_always_broadcasts_packet_event_even_without_clients():
collector = _make_collector()
collector.websocket_has_connected_clients.return_value = False
for _ in range(5):
collector._publish_packet_sync({"type": 1, "transmitted": True}, skip_mqtt=False)
assert collector.websocket_broadcast_packet.call_count == 5
assert collector.sqlite_handler.get_packet_stats.call_count == 0
assert collector.websocket_broadcast_stats.call_count == 0
def test_broadcast_stats_once_queries_and_broadcasts():
collector = _make_collector()
collector._broadcast_stats_once()
collector.sqlite_handler.get_packet_stats.assert_called_once_with(hours=24)
assert collector.websocket_broadcast_stats.call_count == 1
payload = collector.websocket_broadcast_stats.call_args.args[0]
assert payload["packet_stats"] == {"total_packets": 1}
assert "uptime_seconds" in payload["system_stats"]
def test_stats_loop_broadcasts_when_clients_connected():
collector = _make_collector()
collector._broadcast_stats_once = MagicMock()
collector.websocket_has_connected_clients = MagicMock(return_value=True)
# Drive exactly one iteration: wait() returns False (proceed) then True (exit).
collector._stats_stop_event = MagicMock()
collector._stats_stop_event.wait.side_effect = [False, True]
collector._stats_broadcast_loop()
assert collector._broadcast_stats_once.call_count == 1
def test_stats_loop_skips_when_no_clients():
collector = _make_collector()
collector._broadcast_stats_once = MagicMock()
collector.websocket_has_connected_clients = MagicMock(return_value=False)
collector._stats_stop_event = MagicMock()
collector._stats_stop_event.wait.side_effect = [False, True]
collector._stats_broadcast_loop()
assert collector._broadcast_stats_once.call_count == 0