mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-03 20:13:00 +02:00
Add radio health &c. to fanout bus
This commit is contained in:
@@ -40,7 +40,7 @@ app/
|
||||
│ ├── contact_reconciliation.py # Prefix-claim, sender-key backfill, name-history wiring
|
||||
│ ├── radio_lifecycle.py # Post-connect setup and reconnect/setup helpers
|
||||
│ ├── radio_commands.py # Radio config/private-key command workflows
|
||||
│ ├── radio_noise_floor.py # In-memory local radio noise-floor sampling/history
|
||||
│ ├── radio_stats.py # In-memory local radio stats sampling and noise-floor history
|
||||
│ └── radio_runtime.py # Router/dependency seam over the global RadioManager
|
||||
├── radio.py # RadioManager transport/session state + lock management
|
||||
├── radio_sync.py # Polling, sync, periodic advertisement loop
|
||||
@@ -161,10 +161,12 @@ app/
|
||||
|
||||
- All external integrations (MQTT, bots, webhooks, Apprise, SQS) are managed through the fanout bus (`app/fanout/`).
|
||||
- Configs stored in `fanout_configs` table, managed via `GET/POST/PATCH/DELETE /api/fanout`.
|
||||
- `broadcast_event()` in `websocket.py` dispatches to the fanout manager for `message` and `raw_packet` events.
|
||||
- Each integration is a `FanoutModule` with scope-based filtering.
|
||||
- `broadcast_event()` in `websocket.py` dispatches to the fanout manager for `message`, `raw_packet`, and `contact` events.
|
||||
- `on_message` and `on_raw` are scope-gated. `on_contact`, `on_telemetry`, and `on_health` are dispatched to all modules unconditionally (modules filter internally).
|
||||
- Repeater telemetry broadcasts are emitted after `RepeaterTelemetryRepository.record()` in both `radio_sync.py` (auto-collect) and `routers/repeaters.py` (manual fetch).
|
||||
- The 60-second radio stats sampling loop in `radio_stats.py` dispatches an enriched health snapshot (radio identity + full stats) to all fanout modules after each sample.
|
||||
- Community MQTT publishes raw packets only, but its derived `path` field for direct packets is emitted as comma-separated hop identifiers, not flat path bytes.
|
||||
- See `app/fanout/AGENTS_fanout.md` for full architecture details.
|
||||
- See `app/fanout/AGENTS_fanout.md` for full architecture details and event payload shapes.
|
||||
|
||||
## API Surface (all under `/api`)
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Fanout Bus Architecture
|
||||
|
||||
The fanout bus is a unified system for dispatching mesh radio events (decoded messages and raw packets) to external integrations. It replaces the previous scattered singleton MQTT publishers with a modular, configurable framework.
|
||||
The fanout bus is a unified system for dispatching mesh radio events to external integrations. It replaces the previous scattered singleton MQTT publishers with a modular, configurable framework.
|
||||
|
||||
## Core Concepts
|
||||
|
||||
@@ -8,10 +8,15 @@ The fanout bus is a unified system for dispatching mesh radio events (decoded me
|
||||
Base class that all integration modules extend:
|
||||
- `__init__(config_id, config, *, name="")` — constructor; receives the config UUID, the type-specific config dict, and the user-assigned name
|
||||
- `start()` / `stop()` — async lifecycle (e.g. open/close connections)
|
||||
- `on_message(data)` — receive decoded messages (DM/channel)
|
||||
- `on_raw(data)` — receive raw RF packets
|
||||
- `on_message(data)` — receive decoded messages (scope-gated)
|
||||
- `on_raw(data)` — receive raw RF packets (scope-gated)
|
||||
- `on_contact(data)` — receive contact upserts; dispatched to all modules
|
||||
- `on_telemetry(data)` — receive repeater telemetry snapshots; dispatched to all modules
|
||||
- `on_health(data)` — receive periodic radio health snapshots; dispatched to all modules
|
||||
- `status` property (**must override**) — return `"connected"`, `"disconnected"`, or `"error"`
|
||||
|
||||
All five event hooks are no-ops by default; modules override only the ones they care about.
|
||||
|
||||
### FanoutManager (manager.py)
|
||||
Singleton that owns all active modules and dispatches events:
|
||||
- `load_from_db()` — startup: load enabled configs, instantiate modules
|
||||
@@ -19,6 +24,9 @@ Singleton that owns all active modules and dispatches events:
|
||||
- `remove_config(id)` — delete: stop and remove
|
||||
- `broadcast_message(data)` — scope-check + dispatch `on_message`
|
||||
- `broadcast_raw(data)` — scope-check + dispatch `on_raw`
|
||||
- `broadcast_contact(data)` — dispatch `on_contact` to all modules
|
||||
- `broadcast_telemetry(data)` — dispatch `on_telemetry` to all modules
|
||||
- `broadcast_health_fanout(data)` — dispatch `on_health` to all modules
|
||||
- `stop_all()` — shutdown
|
||||
- `get_statuses()` — health endpoint data
|
||||
|
||||
@@ -33,19 +41,65 @@ Each config has a `scope` JSON blob controlling what events reach it:
|
||||
```
|
||||
Community MQTT always enforces `{"messages": "none", "raw_packets": "all"}`.
|
||||
|
||||
Scope only gates `on_message` and `on_raw`. The `on_contact`, `on_telemetry`, and `on_health` hooks are dispatched to all modules unconditionally — modules that care about specific contacts or repeaters filter internally based on their own config.
|
||||
|
||||
## Event Flow
|
||||
|
||||
```
|
||||
Radio Event -> packet_processor / event_handler
|
||||
-> broadcast_event("message"|"raw_packet", data, realtime=True)
|
||||
-> broadcast_event("message"|"raw_packet"|"contact", data, realtime=True)
|
||||
-> WebSocket broadcast (always)
|
||||
-> FanoutManager.broadcast_message/raw (only if realtime=True)
|
||||
-> scope check per module
|
||||
-> module.on_message / on_raw
|
||||
-> FanoutManager.broadcast_message/raw/contact (only if realtime=True)
|
||||
-> scope check per module (message/raw only)
|
||||
-> module.on_message / on_raw / on_contact
|
||||
|
||||
Telemetry collect (radio_sync.py / routers/repeaters.py)
|
||||
-> RepeaterTelemetryRepository.record(...)
|
||||
-> FanoutManager.broadcast_telemetry(data)
|
||||
-> module.on_telemetry (all modules, unconditional)
|
||||
|
||||
Health fanout (radio_stats.py, piggybacks on 60s stats sampling loop)
|
||||
-> FanoutManager.broadcast_health_fanout(data)
|
||||
-> module.on_health (all modules, unconditional)
|
||||
```
|
||||
|
||||
Setting `realtime=False` (used during historical decryption) skips fanout dispatch entirely.
|
||||
|
||||
## Event Payloads
|
||||
|
||||
### on_message(data)
|
||||
`Message.model_dump()` — the full Pydantic message model. Key fields:
|
||||
- `type` (`"PRIV"` | `"CHAN"`), `conversation_key`, `text`, `sender_name`, `sender_key`
|
||||
- `outgoing`, `acked`, `paths`, `sender_timestamp`, `received_at`
|
||||
|
||||
### on_raw(data)
|
||||
Raw packet dict from `packet_processor.py`. Key fields:
|
||||
- `id` (storage row ID), `observation_id` (per-arrival), `raw` (hex), `timestamp`
|
||||
- `decrypted_info` (optional: `channel_key`, `contact_key`, `text`)
|
||||
|
||||
### on_contact(data)
|
||||
`Contact.model_dump()` — the full Pydantic contact model. Key fields:
|
||||
- `public_key`, `name`, `type` (0=unknown, 1=client, 2=repeater, 3=room, 4=sensor)
|
||||
- `lat`, `lon`, `last_seen`, `first_seen`, `on_radio`
|
||||
|
||||
### on_telemetry(data)
|
||||
Repeater telemetry snapshot, broadcast after successful `RepeaterTelemetryRepository.record()`.
|
||||
Identical shape from both auto-collect (`radio_sync.py`) and manual fetch (`routers/repeaters.py`):
|
||||
- `public_key`, `name`, `timestamp`
|
||||
- `battery_volts`, `noise_floor_dbm`, `last_rssi_dbm`, `last_snr_db`
|
||||
- `packets_received`, `packets_sent`, `airtime_seconds`, `rx_airtime_seconds`
|
||||
- `uptime_seconds`, `sent_flood`, `sent_direct`, `recv_flood`, `recv_direct`
|
||||
- `flood_dups`, `direct_dups`, `full_events`, `tx_queue_len`
|
||||
|
||||
### on_health(data)
|
||||
Radio health + stats snapshot, broadcast every 60s by the stats sampling loop in `radio_stats.py`:
|
||||
- `connected` (bool), `connection_info` (str | None)
|
||||
- `public_key` (str | None), `name` (str | None)
|
||||
- `noise_floor_dbm`, `battery_mv`, `uptime_secs` (int | None)
|
||||
- `last_rssi` (int | None), `last_snr` (float | None)
|
||||
- `tx_air_secs`, `rx_air_secs` (int | None)
|
||||
- `packets_recv`, `packets_sent`, `flood_tx`, `direct_tx`, `flood_rx`, `direct_rx` (int | None)
|
||||
|
||||
## Current Module Types
|
||||
|
||||
### mqtt_private (mqtt_private.py)
|
||||
|
||||
@@ -38,6 +38,15 @@ class FanoutModule:
|
||||
async def on_raw(self, data: dict) -> None:
|
||||
"""Called for raw RF packets. Override if needed."""
|
||||
|
||||
async def on_contact(self, data: dict) -> None:
|
||||
"""Called for contact upserts (adverts, sync). Override if needed."""
|
||||
|
||||
async def on_telemetry(self, data: dict) -> None:
|
||||
"""Called for repeater telemetry snapshots. Override if needed."""
|
||||
|
||||
async def on_health(self, data: dict) -> None:
|
||||
"""Called for periodic radio health snapshots. Override if needed."""
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
"""Return 'connected', 'disconnected', or 'error'."""
|
||||
|
||||
@@ -86,6 +86,11 @@ def _scope_matches_raw(scope: dict, _data: dict) -> bool:
|
||||
return scope.get("raw_packets", "none") == "all"
|
||||
|
||||
|
||||
def _always_match(_scope: dict, _data: dict) -> bool:
|
||||
"""Match all modules unconditionally (filtering is module-internal)."""
|
||||
return True
|
||||
|
||||
|
||||
class FanoutManager:
|
||||
"""Owns all active fanout modules and dispatches events."""
|
||||
|
||||
@@ -270,6 +275,33 @@ class FanoutManager:
|
||||
log_label="on_raw",
|
||||
)
|
||||
|
||||
async def broadcast_contact(self, data: dict) -> None:
|
||||
"""Dispatch a contact upsert to all modules."""
|
||||
await self._dispatch_matching(
|
||||
data,
|
||||
matcher=_always_match,
|
||||
handler_name="on_contact",
|
||||
log_label="on_contact",
|
||||
)
|
||||
|
||||
async def broadcast_telemetry(self, data: dict) -> None:
|
||||
"""Dispatch a repeater telemetry snapshot to all modules."""
|
||||
await self._dispatch_matching(
|
||||
data,
|
||||
matcher=_always_match,
|
||||
handler_name="on_telemetry",
|
||||
log_label="on_telemetry",
|
||||
)
|
||||
|
||||
async def broadcast_health_fanout(self, data: dict) -> None:
|
||||
"""Dispatch a radio health snapshot to all modules."""
|
||||
await self._dispatch_matching(
|
||||
data,
|
||||
matcher=_always_match,
|
||||
handler_name="on_health",
|
||||
log_label="on_health",
|
||||
)
|
||||
|
||||
async def stop_all(self) -> None:
|
||||
"""Shutdown all modules."""
|
||||
for config_id, (module, _) in list(self._modules.items()):
|
||||
|
||||
@@ -1585,9 +1585,10 @@ async def _collect_repeater_telemetry(mc: MeshCore, contact: Contact) -> bool:
|
||||
}
|
||||
|
||||
try:
|
||||
timestamp = int(time.time())
|
||||
await RepeaterTelemetryRepository.record(
|
||||
public_key=contact.public_key,
|
||||
timestamp=int(time.time()),
|
||||
timestamp=timestamp,
|
||||
data=data,
|
||||
)
|
||||
logger.info(
|
||||
@@ -1595,6 +1596,21 @@ async def _collect_repeater_telemetry(mc: MeshCore, contact: Contact) -> bool:
|
||||
contact.name or contact.public_key[:12],
|
||||
contact.public_key[:12],
|
||||
)
|
||||
|
||||
# Dispatch to fanout modules (e.g. HA MQTT discovery)
|
||||
from app.fanout.manager import fanout_manager
|
||||
|
||||
asyncio.create_task(
|
||||
fanout_manager.broadcast_telemetry(
|
||||
{
|
||||
"public_key": contact.public_key,
|
||||
"name": contact.name or contact.public_key[:12],
|
||||
"timestamp": timestamp,
|
||||
**data,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
@@ -133,6 +134,20 @@ async def repeater_status(public_key: str) -> RepeaterStatusResponse:
|
||||
timestamp=now,
|
||||
data=status_dict,
|
||||
)
|
||||
|
||||
# Dispatch to fanout modules (e.g. HA MQTT discovery)
|
||||
from app.fanout.manager import fanout_manager
|
||||
|
||||
asyncio.create_task(
|
||||
fanout_manager.broadcast_telemetry(
|
||||
{
|
||||
"public_key": contact.public_key,
|
||||
"name": contact.name or contact.public_key[:12],
|
||||
"timestamp": now,
|
||||
**status_dict,
|
||||
}
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to record telemetry history: %s", e)
|
||||
|
||||
|
||||
@@ -1,12 +1,19 @@
|
||||
"""In-memory local-radio stats sampling.
|
||||
|
||||
A single 60s loop fetches core, radio, and packet stats from the connected
|
||||
radio in one radio-lock acquisition and caches everything in memory. The
|
||||
noise-floor 24h history deque is maintained as a side effect.
|
||||
radio in one radio-lock acquisition. The noise-floor 24h history deque is
|
||||
maintained as a side effect.
|
||||
|
||||
After each sample the loop:
|
||||
1. Broadcasts a WS ``health`` frame so frontend dashboards refresh.
|
||||
2. Dispatches a ``broadcast_health_fanout`` event carrying the full stats
|
||||
snapshot plus radio identity, so fanout modules (e.g. HA MQTT) can
|
||||
publish sensor state without a second radio poll.
|
||||
|
||||
Consumers:
|
||||
- GET /api/health → get_latest_radio_stats() (battery, uptime, etc.)
|
||||
- GET /api/statistics → get_noise_floor_history() (24h noise-floor chart)
|
||||
- Fanout on_health → _build_fanout_payload() (identity + stats)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -31,24 +38,25 @@ _noise_floor_samples: deque[tuple[int, int]] = deque(maxlen=MAX_NOISE_FLOOR_SAMP
|
||||
_latest_stats: dict[str, Any] = {}
|
||||
|
||||
|
||||
async def _sample_all_stats() -> None:
|
||||
"""Fetch core, radio, and packet stats in one radio operation."""
|
||||
global _latest_stats
|
||||
async def _sample_all_stats() -> dict[str, Any]:
|
||||
"""Fetch core, radio, and packet stats in one radio operation.
|
||||
|
||||
Returns the snapshot dict (may be empty if the radio is disconnected or
|
||||
all commands errored).
|
||||
"""
|
||||
if not radio_manager.is_connected:
|
||||
_latest_stats = {}
|
||||
return
|
||||
return {}
|
||||
|
||||
try:
|
||||
async with radio_manager.radio_operation("radio_stats_sample") as mc:
|
||||
async with radio_manager.radio_operation("radio_stats_sample", blocking=False) as mc:
|
||||
core_event = await mc.commands.get_stats_core()
|
||||
radio_event = await mc.commands.get_stats_radio()
|
||||
packet_event = await mc.commands.get_stats_packets()
|
||||
except (RadioDisconnectedError, RadioOperationBusyError):
|
||||
return
|
||||
return {}
|
||||
except Exception as exc:
|
||||
logger.debug("Radio stats sampling failed: %s", exc)
|
||||
return
|
||||
return {}
|
||||
|
||||
now = int(time.time())
|
||||
snapshot: dict[str, Any] = {"timestamp": now}
|
||||
@@ -66,16 +74,62 @@ async def _sample_all_stats() -> None:
|
||||
snapshot["packets"] = packet_event.payload
|
||||
|
||||
has_any_data = len(snapshot) > 1
|
||||
_latest_stats = snapshot if has_any_data else {}
|
||||
return snapshot if has_any_data else {}
|
||||
|
||||
|
||||
def _build_fanout_payload(stats: dict[str, Any]) -> dict:
|
||||
"""Build the health fanout payload from a stats snapshot + radio identity.
|
||||
|
||||
Includes radio identity (public_key, name), connection state, and the
|
||||
full stats snapshot so fanout modules can publish rich sensor data
|
||||
without a second radio poll.
|
||||
"""
|
||||
mc = radio_manager.meshcore
|
||||
self_info = mc.self_info if mc else None
|
||||
|
||||
payload: dict = {
|
||||
"connected": radio_manager.is_connected,
|
||||
"connection_info": radio_manager.connection_info,
|
||||
"public_key": (self_info.get("public_key") or None) if self_info else None,
|
||||
"name": (self_info.get("name") or None) if self_info else None,
|
||||
}
|
||||
|
||||
if stats:
|
||||
payload["noise_floor_dbm"] = stats.get("noise_floor")
|
||||
payload["battery_mv"] = stats.get("battery_mv")
|
||||
payload["uptime_secs"] = stats.get("uptime_secs")
|
||||
payload["last_rssi"] = stats.get("last_rssi")
|
||||
payload["last_snr"] = stats.get("last_snr")
|
||||
payload["tx_air_secs"] = stats.get("tx_air_secs")
|
||||
payload["rx_air_secs"] = stats.get("rx_air_secs")
|
||||
packets = stats.get("packets") or {}
|
||||
payload["packets_recv"] = packets.get("recv")
|
||||
payload["packets_sent"] = packets.get("sent")
|
||||
payload["flood_tx"] = packets.get("flood_tx")
|
||||
payload["direct_tx"] = packets.get("direct_tx")
|
||||
payload["flood_rx"] = packets.get("flood_rx")
|
||||
payload["direct_rx"] = packets.get("direct_rx")
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
async def _stats_sampling_loop() -> None:
|
||||
global _latest_stats
|
||||
while True:
|
||||
try:
|
||||
await _sample_all_stats()
|
||||
snapshot = await _sample_all_stats()
|
||||
if snapshot:
|
||||
_latest_stats = snapshot
|
||||
elif not radio_manager.is_connected:
|
||||
_latest_stats = {}
|
||||
from app.websocket import broadcast_health
|
||||
|
||||
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
|
||||
|
||||
# Dispatch enriched health snapshot to fanout modules
|
||||
from app.fanout.manager import fanout_manager
|
||||
|
||||
await fanout_manager.broadcast_health_fanout(_build_fanout_payload(snapshot))
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
@@ -137,5 +191,5 @@ def get_noise_floor_history() -> dict:
|
||||
|
||||
|
||||
def get_latest_radio_stats() -> dict[str, Any]:
|
||||
"""Return the most recent radio stats snapshot."""
|
||||
"""Return the most recent radio stats snapshot (for health endpoint)."""
|
||||
return dict(_latest_stats)
|
||||
|
||||
@@ -110,6 +110,8 @@ def broadcast_event(event_type: str, data: dict, *, realtime: bool = True) -> No
|
||||
asyncio.create_task(fanout_manager.broadcast_message(data))
|
||||
elif event_type == "raw_packet":
|
||||
asyncio.create_task(fanout_manager.broadcast_raw(data))
|
||||
elif event_type == "contact":
|
||||
asyncio.create_task(fanout_manager.broadcast_contact(data))
|
||||
|
||||
|
||||
def broadcast_error(message: str, details: str | None = None) -> None:
|
||||
|
||||
@@ -101,6 +101,9 @@ class StubModule(FanoutModule):
|
||||
super().__init__("stub", {})
|
||||
self.message_calls: list[dict] = []
|
||||
self.raw_calls: list[dict] = []
|
||||
self.contact_calls: list[dict] = []
|
||||
self.telemetry_calls: list[dict] = []
|
||||
self.health_calls: list[dict] = []
|
||||
self._status = "connected"
|
||||
|
||||
async def start(self) -> None:
|
||||
@@ -115,6 +118,15 @@ class StubModule(FanoutModule):
|
||||
async def on_raw(self, data: dict) -> None:
|
||||
self.raw_calls.append(data)
|
||||
|
||||
async def on_contact(self, data: dict) -> None:
|
||||
self.contact_calls.append(data)
|
||||
|
||||
async def on_telemetry(self, data: dict) -> None:
|
||||
self.telemetry_calls.append(data)
|
||||
|
||||
async def on_health(self, data: dict) -> None:
|
||||
self.health_calls.append(data)
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
return self._status
|
||||
@@ -301,6 +313,113 @@ class TestFanoutManagerDispatch:
|
||||
assert statuses["test-id"]["last_error"] == "ConnectionError: broker down"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# New event dispatch (contact, telemetry, health)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFanoutManagerNewEventDispatch:
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_contact_dispatches_to_all_modules(self):
|
||||
manager = FanoutManager()
|
||||
mod = StubModule()
|
||||
manager._modules["test-id"] = (mod, {})
|
||||
|
||||
await manager.broadcast_contact({"public_key": "aabb", "name": "Alice"})
|
||||
|
||||
assert len(mod.contact_calls) == 1
|
||||
assert mod.contact_calls[0]["public_key"] == "aabb"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_contact_ignores_scope(self):
|
||||
"""Contact dispatch is unconditional — scope doesn't affect it."""
|
||||
manager = FanoutManager()
|
||||
mod = StubModule()
|
||||
manager._modules["test-id"] = (mod, {"messages": "none", "raw_packets": "none"})
|
||||
|
||||
await manager.broadcast_contact({"public_key": "aabb"})
|
||||
|
||||
assert len(mod.contact_calls) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_telemetry_dispatches_to_all_modules(self):
|
||||
manager = FanoutManager()
|
||||
mod = StubModule()
|
||||
manager._modules["test-id"] = (mod, {})
|
||||
|
||||
await manager.broadcast_telemetry(
|
||||
{"public_key": "ccdd", "battery_volts": 4.1, "timestamp": 1000}
|
||||
)
|
||||
|
||||
assert len(mod.telemetry_calls) == 1
|
||||
assert mod.telemetry_calls[0]["battery_volts"] == 4.1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcast_health_fanout_dispatches_to_all_modules(self):
|
||||
manager = FanoutManager()
|
||||
mod = StubModule()
|
||||
manager._modules["test-id"] = (mod, {})
|
||||
|
||||
await manager.broadcast_health_fanout({"connected": True, "noise_floor_dbm": -112})
|
||||
|
||||
assert len(mod.health_calls) == 1
|
||||
assert mod.health_calls[0]["connected"] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_events_do_not_affect_message_or_raw(self):
|
||||
"""Verify new dispatch paths are independent of message/raw."""
|
||||
manager = FanoutManager()
|
||||
mod = StubModule()
|
||||
manager._modules["test-id"] = (mod, {"messages": "all", "raw_packets": "all"})
|
||||
|
||||
await manager.broadcast_contact({"public_key": "aabb"})
|
||||
await manager.broadcast_telemetry({"public_key": "ccdd", "battery_volts": 3.8})
|
||||
await manager.broadcast_health_fanout({"connected": False})
|
||||
|
||||
assert len(mod.message_calls) == 0
|
||||
assert len(mod.raw_calls) == 0
|
||||
assert len(mod.contact_calls) == 1
|
||||
assert len(mod.telemetry_calls) == 1
|
||||
assert len(mod.health_calls) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_base_module_no_ops_do_not_raise(self):
|
||||
"""Default FanoutModule no-ops accept data without error."""
|
||||
manager = FanoutManager()
|
||||
|
||||
class MinimalModule(FanoutModule):
|
||||
@property
|
||||
def status(self) -> str:
|
||||
return "connected"
|
||||
|
||||
mod = MinimalModule("test", {})
|
||||
manager._modules["test-id"] = (mod, {})
|
||||
|
||||
# Should not raise — base class no-ops silently accept
|
||||
await manager.broadcast_contact({"public_key": "aabb"})
|
||||
await manager.broadcast_telemetry({"public_key": "ccdd"})
|
||||
await manager.broadcast_health_fanout({"connected": True})
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_error_in_one_module_does_not_block_others(self):
|
||||
manager = FanoutManager()
|
||||
|
||||
bad_mod = StubModule()
|
||||
|
||||
async def fail(data):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
bad_mod.on_contact = fail
|
||||
|
||||
good_mod = StubModule()
|
||||
manager._modules["bad"] = (bad_mod, {})
|
||||
manager._modules["good"] = (good_mod, {})
|
||||
|
||||
await manager.broadcast_contact({"public_key": "aabb"})
|
||||
|
||||
assert len(good_mod.contact_calls) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Repository tests
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -476,6 +595,47 @@ class TestBroadcastEventRealtime:
|
||||
mock_ws.broadcast.assert_called_once()
|
||||
mock_fm.broadcast_message.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_contact_event_dispatches_to_fanout(self):
|
||||
"""broadcast_event for 'contact' should trigger fanout contact dispatch."""
|
||||
from app.websocket import broadcast_event
|
||||
|
||||
with (
|
||||
patch("app.websocket.ws_manager") as mock_ws,
|
||||
patch("app.fanout.manager.fanout_manager") as mock_fm,
|
||||
):
|
||||
mock_ws.broadcast = AsyncMock()
|
||||
mock_fm.broadcast_contact = AsyncMock()
|
||||
|
||||
broadcast_event("contact", {"public_key": "aabb"}, realtime=True)
|
||||
|
||||
import asyncio
|
||||
|
||||
await asyncio.sleep(0)
|
||||
|
||||
mock_ws.broadcast.assert_called_once()
|
||||
mock_fm.broadcast_contact.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_contact_event_skipped_when_not_realtime(self):
|
||||
"""broadcast_event('contact', ..., realtime=False) should skip fanout."""
|
||||
from app.websocket import broadcast_event
|
||||
|
||||
with (
|
||||
patch("app.websocket.ws_manager") as mock_ws,
|
||||
patch("app.fanout.manager.fanout_manager") as mock_fm,
|
||||
):
|
||||
mock_ws.broadcast = AsyncMock()
|
||||
|
||||
broadcast_event("contact", {"public_key": "aabb"}, realtime=False)
|
||||
|
||||
import asyncio
|
||||
|
||||
await asyncio.sleep(0)
|
||||
|
||||
mock_ws.broadcast.assert_called_once()
|
||||
mock_fm.broadcast_contact.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Webhook module unit tests
|
||||
|
||||
@@ -17,11 +17,12 @@ class TestRadioStatsSamplingLoop:
|
||||
sample_calls = 0
|
||||
sleep_calls = 0
|
||||
|
||||
async def fake_sample() -> None:
|
||||
async def fake_sample():
|
||||
nonlocal sample_calls
|
||||
sample_calls += 1
|
||||
if sample_calls == 1:
|
||||
raise RuntimeError("boom")
|
||||
return {}
|
||||
|
||||
async def fake_sleep(_seconds: int) -> None:
|
||||
nonlocal sleep_calls
|
||||
@@ -29,10 +30,14 @@ class TestRadioStatsSamplingLoop:
|
||||
if sleep_calls >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
mock_fanout = MagicMock()
|
||||
mock_fanout.broadcast_health_fanout = AsyncMock()
|
||||
|
||||
with (
|
||||
patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample),
|
||||
patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep),
|
||||
patch.object(radio_stats.logger, "exception") as mock_exception,
|
||||
patch("app.fanout.manager.fanout_manager", mock_fanout),
|
||||
):
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await radio_stats._stats_sampling_loop()
|
||||
@@ -43,11 +48,11 @@ class TestRadioStatsSamplingLoop:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_broadcasts_health_every_cycle(self):
|
||||
"""The loop should push a WS health broadcast after every iteration."""
|
||||
"""The loop should push a WS health broadcast and fanout after every iteration."""
|
||||
sleep_calls = 0
|
||||
|
||||
async def fake_sample() -> None:
|
||||
pass # no-op; just testing that broadcast fires
|
||||
async def fake_sample():
|
||||
return {}
|
||||
|
||||
async def fake_sleep(_seconds: int) -> None:
|
||||
nonlocal sleep_calls
|
||||
@@ -55,36 +60,88 @@ class TestRadioStatsSamplingLoop:
|
||||
if sleep_calls >= 2:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
mock_fanout = MagicMock()
|
||||
mock_fanout.broadcast_health_fanout = AsyncMock()
|
||||
|
||||
with (
|
||||
patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample),
|
||||
patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep),
|
||||
patch("app.websocket.broadcast_health") as mock_broadcast,
|
||||
patch("app.fanout.manager.fanout_manager", mock_fanout),
|
||||
):
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await radio_stats._stats_sampling_loop()
|
||||
|
||||
assert mock_broadcast.call_count == 2
|
||||
assert mock_fanout.broadcast_health_fanout.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fanout_receives_enriched_payload(self):
|
||||
"""The health fanout payload should include radio identity + stats."""
|
||||
sleep_calls = 0
|
||||
fake_snapshot = {
|
||||
"timestamp": 1700000000,
|
||||
"battery_mv": 4100,
|
||||
"uptime_secs": 3600,
|
||||
"noise_floor": -118,
|
||||
"last_rssi": -85,
|
||||
"last_snr": 9.5,
|
||||
"tx_air_secs": 100,
|
||||
"rx_air_secs": 200,
|
||||
"packets": {"recv": 500, "sent": 250},
|
||||
}
|
||||
|
||||
async def fake_sample():
|
||||
return dict(fake_snapshot)
|
||||
|
||||
async def fake_sleep(_seconds: int) -> None:
|
||||
nonlocal sleep_calls
|
||||
sleep_calls += 1
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
mock_fanout = MagicMock()
|
||||
mock_fanout.broadcast_health_fanout = AsyncMock()
|
||||
|
||||
with (
|
||||
patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample),
|
||||
patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep),
|
||||
patch("app.websocket.broadcast_health"),
|
||||
patch("app.fanout.manager.fanout_manager", mock_fanout),
|
||||
patch.object(radio_stats, "radio_manager") as mock_rm,
|
||||
):
|
||||
mock_rm.is_connected = True
|
||||
mock_rm.connection_info = "Serial: /dev/ttyUSB0"
|
||||
mock_rm.meshcore = MagicMock()
|
||||
mock_rm.meshcore.self_info = {"public_key": "aabbccddeeff", "name": "MyRadio"}
|
||||
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await radio_stats._stats_sampling_loop()
|
||||
|
||||
payload = mock_fanout.broadcast_health_fanout.call_args[0][0]
|
||||
assert payload["connected"] is True
|
||||
assert payload["public_key"] == "aabbccddeeff"
|
||||
assert payload["name"] == "MyRadio"
|
||||
assert payload["battery_mv"] == 4100
|
||||
assert payload["noise_floor_dbm"] == -118
|
||||
assert payload["packets_recv"] == 500
|
||||
|
||||
|
||||
class TestSampleAllStats:
|
||||
@pytest.mark.asyncio
|
||||
async def test_clears_cache_when_disconnected(self):
|
||||
"""Stats cache should be empty when radio is disconnected."""
|
||||
radio_stats._latest_stats = {"old": "data"}
|
||||
|
||||
async def test_returns_empty_when_disconnected(self):
|
||||
"""Should return empty dict when radio is disconnected."""
|
||||
with patch.object(radio_stats, "radio_manager") as mock_rm:
|
||||
mock_rm.is_connected = False
|
||||
await radio_stats._sample_all_stats()
|
||||
result = await radio_stats._sample_all_stats()
|
||||
|
||||
assert radio_stats._latest_stats == {}
|
||||
assert result == {}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_partial_stats_still_records_available_data(self):
|
||||
"""If core stats return ERROR but radio/packet stats succeed, noise floor
|
||||
is still sampled and available fields are cached."""
|
||||
is still sampled and available fields are returned."""
|
||||
from meshcore import EventType
|
||||
|
||||
radio_stats._latest_stats = {}
|
||||
radio_stats._noise_floor_samples.clear()
|
||||
|
||||
core_event = _make_event(EventType.ERROR, {"reason": "unsupported"})
|
||||
@@ -122,9 +179,8 @@ class TestSampleAllStats:
|
||||
with patch.object(radio_stats, "radio_manager") as mock_rm:
|
||||
mock_rm.is_connected = True
|
||||
mock_rm.radio_operation = MagicMock(return_value=mock_ctx)
|
||||
await radio_stats._sample_all_stats()
|
||||
snapshot = await radio_stats._sample_all_stats()
|
||||
|
||||
snapshot = radio_stats._latest_stats
|
||||
# Core fields missing (ERROR), but radio + packet fields present
|
||||
assert "battery_mv" not in snapshot
|
||||
assert snapshot["noise_floor"] == -118
|
||||
@@ -134,10 +190,9 @@ class TestSampleAllStats:
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_stats_succeed(self):
|
||||
"""All three stats commands succeed — full snapshot cached."""
|
||||
"""All three stats commands succeed — full snapshot returned."""
|
||||
from meshcore import EventType
|
||||
|
||||
radio_stats._latest_stats = {}
|
||||
radio_stats._noise_floor_samples.clear()
|
||||
|
||||
core_event = _make_event(
|
||||
@@ -178,21 +233,18 @@ class TestSampleAllStats:
|
||||
with patch.object(radio_stats, "radio_manager") as mock_rm:
|
||||
mock_rm.is_connected = True
|
||||
mock_rm.radio_operation = MagicMock(return_value=mock_ctx)
|
||||
await radio_stats._sample_all_stats()
|
||||
snapshot = await radio_stats._sample_all_stats()
|
||||
|
||||
snapshot = radio_stats._latest_stats
|
||||
assert snapshot["battery_mv"] == 4100
|
||||
assert snapshot["noise_floor"] == -120
|
||||
assert snapshot["packets"]["sent"] == 250
|
||||
assert len(radio_stats._noise_floor_samples) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_all_errors_clears_cache(self):
|
||||
"""If every stats command returns ERROR, cache is empty."""
|
||||
async def test_all_errors_returns_empty(self):
|
||||
"""If every stats command returns ERROR, result is empty."""
|
||||
from meshcore import EventType
|
||||
|
||||
radio_stats._latest_stats = {"old": "stale"}
|
||||
|
||||
error = _make_event(EventType.ERROR, {"reason": "unsupported"})
|
||||
|
||||
mock_mc = AsyncMock()
|
||||
@@ -207,6 +259,6 @@ class TestSampleAllStats:
|
||||
with patch.object(radio_stats, "radio_manager") as mock_rm:
|
||||
mock_rm.is_connected = True
|
||||
mock_rm.radio_operation = MagicMock(return_value=mock_ctx)
|
||||
await radio_stats._sample_all_stats()
|
||||
snapshot = await radio_stats._sample_all_stats()
|
||||
|
||||
assert radio_stats._latest_stats == {}
|
||||
assert snapshot == {}
|
||||
|
||||
Reference in New Issue
Block a user