Files
Remote-Terminal-for-MeshCore/app/services/radio_stats.py
2026-04-10 14:31:45 -07:00

196 lines
6.9 KiB
Python

"""In-memory local-radio stats sampling.
A single 60s loop fetches core, radio, and packet stats from the connected
radio in one radio-lock acquisition. The noise-floor 24h history deque is
maintained as a side effect.
After each sample the loop:
1. Broadcasts a WS ``health`` frame so frontend dashboards refresh.
2. Dispatches a ``broadcast_health_fanout`` event carrying the full stats
snapshot plus radio identity, so fanout modules (e.g. HA MQTT) can
publish sensor state without a second radio poll.
Consumers:
- GET /api/health → get_latest_radio_stats() (battery, uptime, etc.)
- GET /api/statistics → get_noise_floor_history() (24h noise-floor chart)
- Fanout on_health → _build_fanout_payload() (identity + stats)
"""
import asyncio
import logging
import time
from collections import deque
from typing import Any
from meshcore import EventType
from app.radio import RadioDisconnectedError, RadioOperationBusyError
from app.services.radio_runtime import radio_runtime as radio_manager
logger = logging.getLogger(__name__)
STATS_SAMPLE_INTERVAL_SECONDS = 60
NOISE_FLOOR_WINDOW_SECONDS = 24 * 60 * 60
MAX_NOISE_FLOOR_SAMPLES = 1500 # 24h at 60s intervals = 1440
_stats_task: asyncio.Task | None = None
_noise_floor_samples: deque[tuple[int, int]] = deque(maxlen=MAX_NOISE_FLOOR_SAMPLES)
_latest_stats: dict[str, Any] = {}
async def _sample_all_stats() -> dict[str, Any]:
"""Fetch core, radio, and packet stats in one radio operation.
Returns the snapshot dict (may be empty if the radio is disconnected or
all commands errored).
"""
if not radio_manager.is_connected:
return {}
try:
async with radio_manager.radio_operation("radio_stats_sample", blocking=False) as mc:
core_event = await mc.commands.get_stats_core()
radio_event = await mc.commands.get_stats_radio()
packet_event = await mc.commands.get_stats_packets()
except (RadioDisconnectedError, RadioOperationBusyError):
return {}
except Exception as exc:
logger.debug("Radio stats sampling failed: %s", exc)
return {}
now = int(time.time())
snapshot: dict[str, Any] = {"timestamp": now}
if getattr(core_event, "type", None) == EventType.STATS_CORE:
snapshot.update(core_event.payload)
if getattr(radio_event, "type", None) == EventType.STATS_RADIO:
snapshot.update(radio_event.payload)
noise_floor = radio_event.payload.get("noise_floor")
if isinstance(noise_floor, int):
_noise_floor_samples.append((now, noise_floor))
if getattr(packet_event, "type", None) == EventType.STATS_PACKETS:
snapshot["packets"] = packet_event.payload
has_any_data = len(snapshot) > 1
return snapshot if has_any_data else {}
def _build_fanout_payload(stats: dict[str, Any]) -> dict:
"""Build the health fanout payload from a stats snapshot + radio identity.
Includes radio identity (public_key, name), connection state, and the
full stats snapshot so fanout modules can publish rich sensor data
without a second radio poll.
"""
mc = radio_manager.meshcore
self_info = mc.self_info if mc else None
payload: dict = {
"connected": radio_manager.is_connected,
"connection_info": radio_manager.connection_info,
"public_key": (self_info.get("public_key") or None) if self_info else None,
"name": (self_info.get("name") or None) if self_info else None,
}
if stats:
payload["noise_floor_dbm"] = stats.get("noise_floor")
payload["battery_mv"] = stats.get("battery_mv")
payload["uptime_secs"] = stats.get("uptime_secs")
payload["last_rssi"] = stats.get("last_rssi")
payload["last_snr"] = stats.get("last_snr")
payload["tx_air_secs"] = stats.get("tx_air_secs")
payload["rx_air_secs"] = stats.get("rx_air_secs")
packets = stats.get("packets") or {}
payload["packets_recv"] = packets.get("recv")
payload["packets_sent"] = packets.get("sent")
payload["flood_tx"] = packets.get("flood_tx")
payload["direct_tx"] = packets.get("direct_tx")
payload["flood_rx"] = packets.get("flood_rx")
payload["direct_rx"] = packets.get("direct_rx")
return payload
async def _stats_sampling_loop() -> None:
global _latest_stats
while True:
try:
snapshot = await _sample_all_stats()
if snapshot:
_latest_stats = snapshot
elif not radio_manager.is_connected:
_latest_stats = {}
from app.websocket import broadcast_health
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
# Dispatch enriched health snapshot to fanout modules
from app.fanout.manager import fanout_manager
await fanout_manager.broadcast_health_fanout(_build_fanout_payload(snapshot))
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Radio stats sampling loop error")
try:
await asyncio.sleep(STATS_SAMPLE_INTERVAL_SECONDS)
except asyncio.CancelledError:
raise
# ── Public API ────────────────────────────────────────────────────────────
async def start_radio_stats_sampling() -> None:
"""Start the periodic radio stats background task."""
global _stats_task
if _stats_task is not None and not _stats_task.done():
return
_stats_task = asyncio.create_task(_stats_sampling_loop())
async def stop_radio_stats_sampling() -> None:
"""Stop the periodic radio stats background task."""
global _stats_task
if _stats_task is None:
return
if not _stats_task.done():
_stats_task.cancel()
try:
await _stats_task
except asyncio.CancelledError:
pass
_stats_task = None
def get_noise_floor_history() -> dict:
"""Return the current 24-hour in-memory noise floor history snapshot."""
now = int(time.time())
cutoff = now - NOISE_FLOOR_WINDOW_SECONDS
samples = [
{"timestamp": timestamp, "noise_floor_dbm": noise_floor_dbm}
for timestamp, noise_floor_dbm in _noise_floor_samples
if timestamp >= cutoff
]
latest = samples[-1] if samples else None
oldest_timestamp = samples[0]["timestamp"] if samples else None
coverage_seconds = 0 if oldest_timestamp is None else max(0, now - oldest_timestamp)
return {
"sample_interval_seconds": STATS_SAMPLE_INTERVAL_SECONDS,
"coverage_seconds": coverage_seconds,
"latest_noise_floor_dbm": latest["noise_floor_dbm"] if latest else None,
"latest_timestamp": latest["timestamp"] if latest else None,
"samples": samples,
}
def get_latest_radio_stats() -> dict[str, Any]:
"""Return the most recent radio stats snapshot (for health endpoint)."""
return dict(_latest_stats)