Pluck in HA radio stats into the WS fanout endpoint

This commit is contained in:
Jack Kingsman
2026-04-10 12:39:37 -07:00
parent 43c5e0f67d
commit 3bd756ee4e
13 changed files with 504 additions and 178 deletions

View File

@@ -76,8 +76,8 @@ from app.routers import (
ws,
)
from app.security import add_optional_basic_auth_middleware
from app.services.radio_noise_floor import start_noise_floor_sampling, stop_noise_floor_sampling
from app.services.radio_runtime import radio_runtime as radio_manager
from app.services.radio_stats import start_radio_stats_sampling, stop_radio_stats_sampling
from app.version_info import get_app_build_info
setup_logging()
@@ -108,7 +108,7 @@ async def lifespan(app: FastAPI):
from app.radio_sync import ensure_default_channels
await ensure_default_channels()
await start_noise_floor_sampling()
await start_radio_stats_sampling()
# Always start connection monitor (even if initial connection failed)
await radio_manager.start_connection_monitor()
@@ -137,7 +137,7 @@ async def lifespan(app: FastAPI):
await radio_manager.stop_connection_monitor()
await stop_background_contact_reconciliation()
await stop_message_polling()
await stop_noise_floor_sampling()
await stop_radio_stats_sampling()
await stop_periodic_advert()
await stop_periodic_sync()
await stop_telemetry_collect()

View File

@@ -877,10 +877,6 @@ class NoiseFloorHistoryStats(BaseModel):
latest_timestamp: int | None = Field(
default=None, description="Unix timestamp of the most recent sample"
)
supported: bool | None = Field(
default=None,
description="Whether the connected radio appears to support radio stats sampling",
)
samples: list[NoiseFloorSample] = Field(default_factory=list)

View File

@@ -7,6 +7,7 @@ from pydantic import BaseModel, Field
from app.config import settings
from app.repository import RawPacketRepository
from app.services.radio_runtime import radio_runtime as radio_manager
from app.services.radio_stats import get_latest_radio_stats
from app.version_info import get_app_build_info
router = APIRouter(tags=["health"])
@@ -32,6 +33,28 @@ class FanoutStatusResponse(BaseModel):
last_error: str | None = None
class RadioStatsSnapshot(BaseModel):
"""Latest cached stats from the local radio's periodic 60s poll."""
timestamp: int | None = None
# Core stats
battery_mv: int | None = None
uptime_secs: int | None = None
# Radio stats
noise_floor: int | None = None
last_rssi: int | None = None
last_snr: float | None = None
tx_air_secs: int | None = None
rx_air_secs: int | None = None
# Packet stats
packets_recv: int | None = None
packets_sent: int | None = None
flood_tx: int | None = None
direct_tx: int | None = None
flood_rx: int | None = None
direct_rx: int | None = None
class HealthResponse(BaseModel):
status: str
radio_connected: bool
@@ -40,6 +63,7 @@ class HealthResponse(BaseModel):
connection_info: str | None
app_info: AppInfoResponse | None = None
radio_device_info: RadioDeviceInfoResponse | None = None
radio_stats: RadioStatsSnapshot | None = None
database_size_mb: float
oldest_undecrypted_timestamp: int | None
fanout_statuses: dict[str, FanoutStatusResponse] = Field(default_factory=dict)
@@ -122,6 +146,28 @@ async def build_health_data(radio_connected: bool, connection_info: str | None)
"max_channels": getattr(radio_manager, "max_channels", None),
}
# Local radio stats from the 60s background sampler
raw_stats = get_latest_radio_stats()
radio_stats = None
if raw_stats:
packets = raw_stats.get("packets") or {}
radio_stats = {
"timestamp": raw_stats.get("timestamp"),
"battery_mv": raw_stats.get("battery_mv"),
"uptime_secs": raw_stats.get("uptime_secs"),
"noise_floor": raw_stats.get("noise_floor"),
"last_rssi": raw_stats.get("last_rssi"),
"last_snr": raw_stats.get("last_snr"),
"tx_air_secs": raw_stats.get("tx_air_secs"),
"rx_air_secs": raw_stats.get("rx_air_secs"),
"packets_recv": packets.get("recv"),
"packets_sent": packets.get("sent"),
"flood_tx": packets.get("flood_tx"),
"direct_tx": packets.get("direct_tx"),
"flood_rx": packets.get("flood_rx"),
"direct_rx": packets.get("direct_rx"),
}
return {
"status": "ok" if radio_connected and not radio_initializing else "degraded",
"radio_connected": radio_connected,
@@ -133,6 +179,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None)
"commit_hash": app_build_info.commit_hash,
},
"radio_device_info": radio_device_info,
"radio_stats": radio_stats,
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
"fanout_statuses": fanout_statuses,

View File

@@ -2,7 +2,7 @@ from fastapi import APIRouter
from app.models import StatisticsResponse
from app.repository import StatisticsRepository
from app.services.radio_noise_floor import get_noise_floor_history
from app.services.radio_stats import get_noise_floor_history
router = APIRouter(prefix="/statistics", tags=["statistics"])
@@ -10,5 +10,5 @@ router = APIRouter(prefix="/statistics", tags=["statistics"])
@router.get("", response_model=StatisticsResponse)
async def get_statistics() -> StatisticsResponse:
data = await StatisticsRepository.get_all()
data["noise_floor_24h"] = await get_noise_floor_history()
data["noise_floor_24h"] = get_noise_floor_history()
return StatisticsResponse(**data)

View File

@@ -1,119 +0,0 @@
"""In-memory local-radio noise floor history sampling."""
import asyncio
import logging
import time
from collections import deque
from meshcore import EventType
from app.radio import RadioDisconnectedError, RadioOperationBusyError
from app.services.radio_runtime import radio_runtime as radio_manager
logger = logging.getLogger(__name__)
NOISE_FLOOR_SAMPLE_INTERVAL_SECONDS = 300
NOISE_FLOOR_WINDOW_SECONDS = 24 * 60 * 60
MAX_NOISE_FLOOR_SAMPLES = 300
_noise_floor_task: asyncio.Task | None = None
_noise_floor_samples: deque[tuple[int, int]] = deque(maxlen=MAX_NOISE_FLOOR_SAMPLES)
_noise_floor_supported: bool | None = None
_samples_lock = asyncio.Lock()
async def _append_sample(timestamp: int, noise_floor_dbm: int) -> None:
async with _samples_lock:
_noise_floor_samples.append((timestamp, noise_floor_dbm))
async def sample_noise_floor_once(*, blocking: bool = False) -> None:
"""Fetch the current radio noise floor once and record it when available."""
global _noise_floor_supported
if not radio_manager.is_connected:
return
try:
async with radio_manager.radio_operation("noise_floor_sample", blocking=blocking) as mc:
event = await mc.commands.get_stats_radio()
except (RadioDisconnectedError, RadioOperationBusyError):
return
except Exception as exc:
logger.debug("Noise floor sampling failed: %s", exc)
return
if event.type == EventType.ERROR:
_noise_floor_supported = False
return
if event.type != EventType.STATS_RADIO:
return
noise_floor = event.payload.get("noise_floor")
if not isinstance(noise_floor, int):
return
_noise_floor_supported = True
await _append_sample(int(time.time()), noise_floor)
async def _noise_floor_sampling_loop() -> None:
while True:
try:
await sample_noise_floor_once()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Noise floor sampling loop crashed during sample")
try:
await asyncio.sleep(NOISE_FLOOR_SAMPLE_INTERVAL_SECONDS)
except asyncio.CancelledError:
raise
async def start_noise_floor_sampling() -> None:
global _noise_floor_task
if _noise_floor_task is not None and not _noise_floor_task.done():
return
_noise_floor_task = asyncio.create_task(_noise_floor_sampling_loop())
async def stop_noise_floor_sampling() -> None:
global _noise_floor_task
if _noise_floor_task is None:
return
if not _noise_floor_task.done():
_noise_floor_task.cancel()
try:
await _noise_floor_task
except asyncio.CancelledError:
pass
_noise_floor_task = None
async def get_noise_floor_history() -> dict:
"""Return the current 24-hour in-memory noise floor history snapshot."""
now = int(time.time())
cutoff = now - NOISE_FLOOR_WINDOW_SECONDS
async with _samples_lock:
samples = [
{"timestamp": timestamp, "noise_floor_dbm": noise_floor_dbm}
for timestamp, noise_floor_dbm in _noise_floor_samples
if timestamp >= cutoff
]
latest = samples[-1] if samples else None
oldest_timestamp = samples[0]["timestamp"] if samples else None
coverage_seconds = 0 if oldest_timestamp is None else max(0, now - oldest_timestamp)
return {
"sample_interval_seconds": NOISE_FLOOR_SAMPLE_INTERVAL_SECONDS,
"coverage_seconds": coverage_seconds,
"latest_noise_floor_dbm": latest["noise_floor_dbm"] if latest else None,
"latest_timestamp": latest["timestamp"] if latest else None,
"supported": _noise_floor_supported,
"samples": samples,
}

141
app/services/radio_stats.py Normal file
View File

@@ -0,0 +1,141 @@
"""In-memory local-radio stats sampling.
A single 60s loop fetches core, radio, and packet stats from the connected
radio in one radio-lock acquisition and caches everything in memory. The
noise-floor 24h history deque is maintained as a side effect.
Consumers:
- GET /api/health → get_latest_radio_stats() (battery, uptime, etc.)
- GET /api/statistics → get_noise_floor_history() (24h noise-floor chart)
"""
import asyncio
import logging
import time
from collections import deque
from typing import Any
from meshcore import EventType
from app.radio import RadioDisconnectedError, RadioOperationBusyError
from app.services.radio_runtime import radio_runtime as radio_manager
logger = logging.getLogger(__name__)
STATS_SAMPLE_INTERVAL_SECONDS = 60
NOISE_FLOOR_WINDOW_SECONDS = 24 * 60 * 60
MAX_NOISE_FLOOR_SAMPLES = 1500 # 24h at 60s intervals = 1440
_stats_task: asyncio.Task | None = None
_noise_floor_samples: deque[tuple[int, int]] = deque(maxlen=MAX_NOISE_FLOOR_SAMPLES)
_latest_stats: dict[str, Any] = {}
async def _sample_all_stats() -> None:
"""Fetch core, radio, and packet stats in one radio operation."""
global _latest_stats
if not radio_manager.is_connected:
_latest_stats = {}
return
try:
async with radio_manager.radio_operation("radio_stats_sample") as mc:
core_event = await mc.commands.get_stats_core()
radio_event = await mc.commands.get_stats_radio()
packet_event = await mc.commands.get_stats_packets()
except (RadioDisconnectedError, RadioOperationBusyError):
return
except Exception as exc:
logger.debug("Radio stats sampling failed: %s", exc)
return
now = int(time.time())
snapshot: dict[str, Any] = {"timestamp": now}
if getattr(core_event, "type", None) == EventType.STATS_CORE:
snapshot.update(core_event.payload)
if getattr(radio_event, "type", None) == EventType.STATS_RADIO:
snapshot.update(radio_event.payload)
noise_floor = radio_event.payload.get("noise_floor")
if isinstance(noise_floor, int):
_noise_floor_samples.append((now, noise_floor))
if getattr(packet_event, "type", None) == EventType.STATS_PACKETS:
snapshot["packets"] = packet_event.payload
has_any_data = len(snapshot) > 1
_latest_stats = snapshot if has_any_data else {}
async def _stats_sampling_loop() -> None:
while True:
try:
await _sample_all_stats()
from app.websocket import broadcast_health
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Radio stats sampling loop error")
try:
await asyncio.sleep(STATS_SAMPLE_INTERVAL_SECONDS)
except asyncio.CancelledError:
raise
# ── Public API ────────────────────────────────────────────────────────────
async def start_radio_stats_sampling() -> None:
"""Start the periodic radio stats background task."""
global _stats_task
if _stats_task is not None and not _stats_task.done():
return
_stats_task = asyncio.create_task(_stats_sampling_loop())
async def stop_radio_stats_sampling() -> None:
"""Stop the periodic radio stats background task."""
global _stats_task
if _stats_task is None:
return
if not _stats_task.done():
_stats_task.cancel()
try:
await _stats_task
except asyncio.CancelledError:
pass
_stats_task = None
def get_noise_floor_history() -> dict:
"""Return the current 24-hour in-memory noise floor history snapshot."""
now = int(time.time())
cutoff = now - NOISE_FLOOR_WINDOW_SECONDS
samples = [
{"timestamp": timestamp, "noise_floor_dbm": noise_floor_dbm}
for timestamp, noise_floor_dbm in _noise_floor_samples
if timestamp >= cutoff
]
latest = samples[-1] if samples else None
oldest_timestamp = samples[0]["timestamp"] if samples else None
coverage_seconds = 0 if oldest_timestamp is None else max(0, now - oldest_timestamp)
return {
"sample_interval_seconds": STATS_SAMPLE_INTERVAL_SECONDS,
"coverage_seconds": coverage_seconds,
"latest_noise_floor_dbm": latest["noise_floor_dbm"] if latest else None,
"latest_timestamp": latest["timestamp"] if latest else None,
"samples": samples,
}
def get_latest_radio_stats() -> dict[str, Any]:
"""Return the most recent radio stats snapshot."""
return dict(_latest_stats)

View File

@@ -447,7 +447,7 @@ export function SettingsStatisticsSection({ className }: { className?: string })
)}
{/* Noise Floor */}
{stats.noise_floor_24h.supported !== false && (
{stats.noise_floor_24h && (
<>
<Separator />
<div>
@@ -468,14 +468,14 @@ export function SettingsStatisticsSection({ className }: { className?: string })
<NoiseFloorChart samples={stats.noise_floor_24h.samples} />
) : stats.noise_floor_24h.samples.length === 0 ? (
<p className="text-sm text-muted-foreground">
No noise floor samples collected yet. Samples are collected every five minutes,
and retained until server restart.
No noise floor samples collected yet. Samples are collected every minute and
retained until server restart.
</p>
) : (
<p className="text-sm text-muted-foreground">
Only one sample so far ({stats.noise_floor_24h.samples[0].noise_floor_dbm} dBm).
More data needed for a chart. Samples are collected every five minutes, and
retained until server restart.
More data needed for a chart. Samples are collected every minute and retained
until server restart.
</p>
)}
</div>

View File

@@ -657,11 +657,10 @@ describe('SettingsModal', () => {
{ timestamp: 1711796400, count: 8 },
],
noise_floor_24h: {
sample_interval_seconds: 300,
sample_interval_seconds: 60,
coverage_seconds: 3600,
latest_noise_floor_dbm: -105,
latest_timestamp: 1711800000,
supported: true,
samples: [],
},
};
@@ -728,11 +727,10 @@ describe('SettingsModal', () => {
},
packets_per_hour_72h: [],
noise_floor_24h: {
sample_interval_seconds: 300,
sample_interval_seconds: 60,
coverage_seconds: 0,
latest_noise_floor_dbm: null,
latest_timestamp: null,
supported: null,
samples: [],
},
};

View File

@@ -62,6 +62,23 @@ export interface AppInfo {
commit_hash: string | null;
}
export interface RadioStatsSnapshot {
timestamp: number | null;
battery_mv: number | null;
uptime_secs: number | null;
noise_floor: number | null;
last_rssi: number | null;
last_snr: number | null;
tx_air_secs: number | null;
rx_air_secs: number | null;
packets_recv: number | null;
packets_sent: number | null;
flood_tx: number | null;
direct_tx: number | null;
flood_rx: number | null;
direct_rx: number | null;
}
export interface HealthStatus {
status: string;
radio_connected: boolean;
@@ -76,6 +93,7 @@ export interface HealthStatus {
max_contacts: number | null;
max_channels: number | null;
} | null;
radio_stats?: RadioStatsSnapshot | null;
database_size_mb: number;
oldest_undecrypted_timestamp: number | null;
fanout_statuses: Record<string, FanoutStatusEntry>;
@@ -540,7 +558,6 @@ export interface NoiseFloorHistoryStats {
coverage_seconds: number;
latest_noise_floor_dbm: number | null;
latest_timestamp: number | null;
supported: boolean | null;
samples: NoiseFloorSample[];
}

View File

@@ -127,6 +127,78 @@ class TestHealthEndpoint:
assert data["radio_connected"] is False
assert data["connection_info"] is None
def test_health_includes_radio_stats_when_available(self):
"""Health endpoint includes cached radio stats snapshot."""
from fastapi.testclient import TestClient
fake_stats = {
"timestamp": 1700000000,
"battery_mv": 4150,
"uptime_secs": 3600,
"noise_floor": -120,
"last_rssi": -85,
"last_snr": 9.5,
"tx_air_secs": 100,
"rx_air_secs": 200,
"packets": {
"recv": 500,
"sent": 250,
"flood_tx": 100,
"direct_tx": 150,
"flood_rx": 300,
"direct_rx": 200,
},
}
with (
patch("app.routers.health.radio_manager") as mock_rm,
patch("app.routers.health.get_latest_radio_stats", return_value=fake_stats),
):
mock_rm.is_connected = True
mock_rm.connection_info = "Serial: /dev/ttyUSB0"
mock_rm.is_setup_in_progress = False
mock_rm.is_setup_complete = True
mock_rm.connection_desired = True
mock_rm.is_reconnecting = False
mock_rm.device_info_loaded = False
from app.main import app
client = TestClient(app)
response = client.get("/api/health")
assert response.status_code == 200
stats = response.json()["radio_stats"]
assert stats["battery_mv"] == 4150
assert stats["uptime_secs"] == 3600
assert stats["noise_floor"] == -120
assert stats["packets_recv"] == 500
assert stats["packets_sent"] == 250
def test_health_radio_stats_null_when_no_data(self):
"""Health endpoint returns null radio_stats when cache is empty."""
from fastapi.testclient import TestClient
with (
patch("app.routers.health.radio_manager") as mock_rm,
patch("app.routers.health.get_latest_radio_stats", return_value={}),
):
mock_rm.is_connected = False
mock_rm.connection_info = None
mock_rm.is_setup_in_progress = False
mock_rm.is_setup_complete = False
mock_rm.connection_desired = True
mock_rm.is_reconnecting = False
mock_rm.device_info_loaded = False
from app.main import app
client = TestClient(app)
response = client.get("/api/health")
assert response.status_code == 200
assert response.json()["radio_stats"] is None
class TestDebugEndpoint:
"""Test the debug support snapshot endpoint."""

View File

@@ -1,37 +0,0 @@
import asyncio
from unittest.mock import patch
import pytest
from app.services import radio_noise_floor
class TestNoiseFloorSamplingLoop:
@pytest.mark.asyncio
async def test_logs_and_continues_after_unexpected_sample_exception(self):
sample_calls = 0
sleep_calls = 0
async def fake_sample() -> None:
nonlocal sample_calls
sample_calls += 1
if sample_calls == 1:
raise RuntimeError("boom")
async def fake_sleep(_seconds: int) -> None:
nonlocal sleep_calls
sleep_calls += 1
if sleep_calls >= 2:
raise asyncio.CancelledError()
with (
patch.object(radio_noise_floor, "sample_noise_floor_once", side_effect=fake_sample),
patch.object(radio_noise_floor.asyncio, "sleep", side_effect=fake_sleep),
patch.object(radio_noise_floor.logger, "exception") as mock_exception,
):
with pytest.raises(asyncio.CancelledError):
await radio_noise_floor._noise_floor_sampling_loop()
assert sample_calls == 2
assert sleep_calls == 2
mock_exception.assert_called_once()

212
tests/test_radio_stats.py Normal file
View File

@@ -0,0 +1,212 @@
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.services import radio_stats
def _make_event(event_type, payload=None):
return SimpleNamespace(type=event_type, payload=payload or {})
class TestRadioStatsSamplingLoop:
@pytest.mark.asyncio
async def test_logs_and_continues_after_unexpected_sample_exception(self):
sample_calls = 0
sleep_calls = 0
async def fake_sample() -> None:
nonlocal sample_calls
sample_calls += 1
if sample_calls == 1:
raise RuntimeError("boom")
async def fake_sleep(_seconds: int) -> None:
nonlocal sleep_calls
sleep_calls += 1
if sleep_calls >= 2:
raise asyncio.CancelledError()
with (
patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample),
patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep),
patch.object(radio_stats.logger, "exception") as mock_exception,
):
with pytest.raises(asyncio.CancelledError):
await radio_stats._stats_sampling_loop()
assert sample_calls == 2
assert sleep_calls == 2
mock_exception.assert_called_once()
@pytest.mark.asyncio
async def test_broadcasts_health_every_cycle(self):
"""The loop should push a WS health broadcast after every iteration."""
sleep_calls = 0
async def fake_sample() -> None:
pass # no-op; just testing that broadcast fires
async def fake_sleep(_seconds: int) -> None:
nonlocal sleep_calls
sleep_calls += 1
if sleep_calls >= 2:
raise asyncio.CancelledError()
with (
patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample),
patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep),
patch("app.websocket.broadcast_health") as mock_broadcast,
):
with pytest.raises(asyncio.CancelledError):
await radio_stats._stats_sampling_loop()
assert mock_broadcast.call_count == 2
class TestSampleAllStats:
@pytest.mark.asyncio
async def test_clears_cache_when_disconnected(self):
"""Stats cache should be empty when radio is disconnected."""
radio_stats._latest_stats = {"old": "data"}
with patch.object(radio_stats, "radio_manager") as mock_rm:
mock_rm.is_connected = False
await radio_stats._sample_all_stats()
assert radio_stats._latest_stats == {}
@pytest.mark.asyncio
async def test_partial_stats_still_records_available_data(self):
"""If core stats return ERROR but radio/packet stats succeed, noise floor
is still sampled and available fields are cached."""
from meshcore import EventType
radio_stats._latest_stats = {}
radio_stats._noise_floor_samples.clear()
core_event = _make_event(EventType.ERROR, {"reason": "unsupported"})
radio_event = _make_event(
EventType.STATS_RADIO,
{
"noise_floor": -118,
"last_rssi": -90,
"last_snr": 8.0,
"tx_air_secs": 10,
"rx_air_secs": 20,
},
)
packet_event = _make_event(
EventType.STATS_PACKETS,
{
"recv": 100,
"sent": 50,
"flood_tx": 20,
"direct_tx": 30,
"flood_rx": 60,
"direct_rx": 40,
},
)
mock_mc = AsyncMock()
mock_mc.commands.get_stats_core = AsyncMock(return_value=core_event)
mock_mc.commands.get_stats_radio = AsyncMock(return_value=radio_event)
mock_mc.commands.get_stats_packets = AsyncMock(return_value=packet_event)
mock_ctx = AsyncMock()
mock_ctx.__aenter__ = AsyncMock(return_value=mock_mc)
mock_ctx.__aexit__ = AsyncMock(return_value=False)
with patch.object(radio_stats, "radio_manager") as mock_rm:
mock_rm.is_connected = True
mock_rm.radio_operation = MagicMock(return_value=mock_ctx)
await radio_stats._sample_all_stats()
snapshot = radio_stats._latest_stats
# Core fields missing (ERROR), but radio + packet fields present
assert "battery_mv" not in snapshot
assert snapshot["noise_floor"] == -118
assert snapshot["packets"]["recv"] == 100
# Noise floor history was still appended
assert len(radio_stats._noise_floor_samples) == 1
@pytest.mark.asyncio
async def test_all_stats_succeed(self):
"""All three stats commands succeed — full snapshot cached."""
from meshcore import EventType
radio_stats._latest_stats = {}
radio_stats._noise_floor_samples.clear()
core_event = _make_event(
EventType.STATS_CORE,
{"battery_mv": 4100, "uptime_secs": 7200, "errors": 0, "queue_len": 2},
)
radio_event = _make_event(
EventType.STATS_RADIO,
{
"noise_floor": -120,
"last_rssi": -85,
"last_snr": 9.5,
"tx_air_secs": 100,
"rx_air_secs": 200,
},
)
packet_event = _make_event(
EventType.STATS_PACKETS,
{
"recv": 500,
"sent": 250,
"flood_tx": 100,
"direct_tx": 150,
"flood_rx": 300,
"direct_rx": 200,
},
)
mock_mc = AsyncMock()
mock_mc.commands.get_stats_core = AsyncMock(return_value=core_event)
mock_mc.commands.get_stats_radio = AsyncMock(return_value=radio_event)
mock_mc.commands.get_stats_packets = AsyncMock(return_value=packet_event)
mock_ctx = AsyncMock()
mock_ctx.__aenter__ = AsyncMock(return_value=mock_mc)
mock_ctx.__aexit__ = AsyncMock(return_value=False)
with patch.object(radio_stats, "radio_manager") as mock_rm:
mock_rm.is_connected = True
mock_rm.radio_operation = MagicMock(return_value=mock_ctx)
await radio_stats._sample_all_stats()
snapshot = radio_stats._latest_stats
assert snapshot["battery_mv"] == 4100
assert snapshot["noise_floor"] == -120
assert snapshot["packets"]["sent"] == 250
assert len(radio_stats._noise_floor_samples) == 1
@pytest.mark.asyncio
async def test_all_errors_clears_cache(self):
"""If every stats command returns ERROR, cache is empty."""
from meshcore import EventType
radio_stats._latest_stats = {"old": "stale"}
error = _make_event(EventType.ERROR, {"reason": "unsupported"})
mock_mc = AsyncMock()
mock_mc.commands.get_stats_core = AsyncMock(return_value=error)
mock_mc.commands.get_stats_radio = AsyncMock(return_value=error)
mock_mc.commands.get_stats_packets = AsyncMock(return_value=error)
mock_ctx = AsyncMock()
mock_ctx.__aenter__ = AsyncMock(return_value=mock_mc)
mock_ctx.__aexit__ = AsyncMock(return_value=False)
with patch.object(radio_stats, "radio_manager") as mock_rm:
mock_rm.is_connected = True
mock_rm.radio_operation = MagicMock(return_value=mock_ctx)
await radio_stats._sample_all_stats()
assert radio_stats._latest_stats == {}

View File

@@ -450,11 +450,10 @@ class TestStatisticsEndpoint:
@pytest.mark.asyncio
async def test_statistics_endpoint_includes_noise_floor_history(self, test_db, client):
noise_floor_history = {
"sample_interval_seconds": 300,
"sample_interval_seconds": 60,
"coverage_seconds": 1800,
"latest_noise_floor_dbm": -119,
"latest_timestamp": 1_700_000_000,
"supported": True,
"samples": [
{"timestamp": 1_699_998_200, "noise_floor_dbm": -121},
{"timestamp": 1_700_000_000, "noise_floor_dbm": -119},
@@ -463,7 +462,7 @@ class TestStatisticsEndpoint:
with patch(
"app.routers.statistics.get_noise_floor_history",
new=AsyncMock(return_value=noise_floor_history),
return_value=noise_floor_history,
):
response = await client.get("/api/statistics")