diff --git a/app/AGENTS.md b/app/AGENTS.md index 413a5bb..1bc77bd 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -40,7 +40,7 @@ app/ │ ├── contact_reconciliation.py # Prefix-claim, sender-key backfill, name-history wiring │ ├── radio_lifecycle.py # Post-connect setup and reconnect/setup helpers │ ├── radio_commands.py # Radio config/private-key command workflows -│ ├── radio_noise_floor.py # In-memory local radio noise-floor sampling/history +│ ├── radio_stats.py # In-memory local radio stats sampling and noise-floor history │ └── radio_runtime.py # Router/dependency seam over the global RadioManager ├── radio.py # RadioManager transport/session state + lock management ├── radio_sync.py # Polling, sync, periodic advertisement loop @@ -161,10 +161,12 @@ app/ - All external integrations (MQTT, bots, webhooks, Apprise, SQS) are managed through the fanout bus (`app/fanout/`). - Configs stored in `fanout_configs` table, managed via `GET/POST/PATCH/DELETE /api/fanout`. -- `broadcast_event()` in `websocket.py` dispatches to the fanout manager for `message` and `raw_packet` events. -- Each integration is a `FanoutModule` with scope-based filtering. +- `broadcast_event()` in `websocket.py` dispatches to the fanout manager for `message`, `raw_packet`, and `contact` events. +- `on_message` and `on_raw` are scope-gated. `on_contact`, `on_telemetry`, and `on_health` are dispatched to all modules unconditionally (modules filter internally). +- Repeater telemetry broadcasts are emitted after `RepeaterTelemetryRepository.record()` in both `radio_sync.py` (auto-collect) and `routers/repeaters.py` (manual fetch). +- The 60-second radio stats sampling loop in `radio_stats.py` dispatches an enriched health snapshot (radio identity + full stats) to all fanout modules after each sample. - Community MQTT publishes raw packets only, but its derived `path` field for direct packets is emitted as comma-separated hop identifiers, not flat path bytes. -- See `app/fanout/AGENTS_fanout.md` for full architecture details. +- See `app/fanout/AGENTS_fanout.md` for full architecture details and event payload shapes. ## API Surface (all under `/api`) diff --git a/app/fanout/AGENTS_fanout.md b/app/fanout/AGENTS_fanout.md index 56280ac..3285319 100644 --- a/app/fanout/AGENTS_fanout.md +++ b/app/fanout/AGENTS_fanout.md @@ -1,6 +1,6 @@ # Fanout Bus Architecture -The fanout bus is a unified system for dispatching mesh radio events (decoded messages and raw packets) to external integrations. It replaces the previous scattered singleton MQTT publishers with a modular, configurable framework. +The fanout bus is a unified system for dispatching mesh radio events to external integrations. It replaces the previous scattered singleton MQTT publishers with a modular, configurable framework. ## Core Concepts @@ -8,10 +8,15 @@ The fanout bus is a unified system for dispatching mesh radio events (decoded me Base class that all integration modules extend: - `__init__(config_id, config, *, name="")` — constructor; receives the config UUID, the type-specific config dict, and the user-assigned name - `start()` / `stop()` — async lifecycle (e.g. open/close connections) -- `on_message(data)` — receive decoded messages (DM/channel) -- `on_raw(data)` — receive raw RF packets +- `on_message(data)` — receive decoded messages (scope-gated) +- `on_raw(data)` — receive raw RF packets (scope-gated) +- `on_contact(data)` — receive contact upserts; dispatched to all modules +- `on_telemetry(data)` — receive repeater telemetry snapshots; dispatched to all modules +- `on_health(data)` — receive periodic radio health snapshots; dispatched to all modules - `status` property (**must override**) — return `"connected"`, `"disconnected"`, or `"error"` +All five event hooks are no-ops by default; modules override only the ones they care about. + ### FanoutManager (manager.py) Singleton that owns all active modules and dispatches events: - `load_from_db()` — startup: load enabled configs, instantiate modules @@ -19,6 +24,9 @@ Singleton that owns all active modules and dispatches events: - `remove_config(id)` — delete: stop and remove - `broadcast_message(data)` — scope-check + dispatch `on_message` - `broadcast_raw(data)` — scope-check + dispatch `on_raw` +- `broadcast_contact(data)` — dispatch `on_contact` to all modules +- `broadcast_telemetry(data)` — dispatch `on_telemetry` to all modules +- `broadcast_health_fanout(data)` — dispatch `on_health` to all modules - `stop_all()` — shutdown - `get_statuses()` — health endpoint data @@ -33,19 +41,65 @@ Each config has a `scope` JSON blob controlling what events reach it: ``` Community MQTT always enforces `{"messages": "none", "raw_packets": "all"}`. +Scope only gates `on_message` and `on_raw`. The `on_contact`, `on_telemetry`, and `on_health` hooks are dispatched to all modules unconditionally — modules that care about specific contacts or repeaters filter internally based on their own config. + ## Event Flow ``` Radio Event -> packet_processor / event_handler - -> broadcast_event("message"|"raw_packet", data, realtime=True) + -> broadcast_event("message"|"raw_packet"|"contact", data, realtime=True) -> WebSocket broadcast (always) - -> FanoutManager.broadcast_message/raw (only if realtime=True) - -> scope check per module - -> module.on_message / on_raw + -> FanoutManager.broadcast_message/raw/contact (only if realtime=True) + -> scope check per module (message/raw only) + -> module.on_message / on_raw / on_contact + +Telemetry collect (radio_sync.py / routers/repeaters.py) + -> RepeaterTelemetryRepository.record(...) + -> FanoutManager.broadcast_telemetry(data) + -> module.on_telemetry (all modules, unconditional) + +Health fanout (radio_stats.py, piggybacks on 60s stats sampling loop) + -> FanoutManager.broadcast_health_fanout(data) + -> module.on_health (all modules, unconditional) ``` Setting `realtime=False` (used during historical decryption) skips fanout dispatch entirely. +## Event Payloads + +### on_message(data) +`Message.model_dump()` — the full Pydantic message model. Key fields: +- `type` (`"PRIV"` | `"CHAN"`), `conversation_key`, `text`, `sender_name`, `sender_key` +- `outgoing`, `acked`, `paths`, `sender_timestamp`, `received_at` + +### on_raw(data) +Raw packet dict from `packet_processor.py`. Key fields: +- `id` (storage row ID), `observation_id` (per-arrival), `raw` (hex), `timestamp` +- `decrypted_info` (optional: `channel_key`, `contact_key`, `text`) + +### on_contact(data) +`Contact.model_dump()` — the full Pydantic contact model. Key fields: +- `public_key`, `name`, `type` (0=unknown, 1=client, 2=repeater, 3=room, 4=sensor) +- `lat`, `lon`, `last_seen`, `first_seen`, `on_radio` + +### on_telemetry(data) +Repeater telemetry snapshot, broadcast after successful `RepeaterTelemetryRepository.record()`. +Identical shape from both auto-collect (`radio_sync.py`) and manual fetch (`routers/repeaters.py`): +- `public_key`, `name`, `timestamp` +- `battery_volts`, `noise_floor_dbm`, `last_rssi_dbm`, `last_snr_db` +- `packets_received`, `packets_sent`, `airtime_seconds`, `rx_airtime_seconds` +- `uptime_seconds`, `sent_flood`, `sent_direct`, `recv_flood`, `recv_direct` +- `flood_dups`, `direct_dups`, `full_events`, `tx_queue_len` + +### on_health(data) +Radio health + stats snapshot, broadcast every 60s by the stats sampling loop in `radio_stats.py`: +- `connected` (bool), `connection_info` (str | None) +- `public_key` (str | None), `name` (str | None) +- `noise_floor_dbm`, `battery_mv`, `uptime_secs` (int | None) +- `last_rssi` (int | None), `last_snr` (float | None) +- `tx_air_secs`, `rx_air_secs` (int | None) +- `packets_recv`, `packets_sent`, `flood_tx`, `direct_tx`, `flood_rx`, `direct_rx` (int | None) + ## Current Module Types ### mqtt_private (mqtt_private.py) diff --git a/app/fanout/base.py b/app/fanout/base.py index efe2e49..a66e299 100644 --- a/app/fanout/base.py +++ b/app/fanout/base.py @@ -38,6 +38,15 @@ class FanoutModule: async def on_raw(self, data: dict) -> None: """Called for raw RF packets. Override if needed.""" + async def on_contact(self, data: dict) -> None: + """Called for contact upserts (adverts, sync). Override if needed.""" + + async def on_telemetry(self, data: dict) -> None: + """Called for repeater telemetry snapshots. Override if needed.""" + + async def on_health(self, data: dict) -> None: + """Called for periodic radio health snapshots. Override if needed.""" + @property def status(self) -> str: """Return 'connected', 'disconnected', or 'error'.""" diff --git a/app/fanout/manager.py b/app/fanout/manager.py index dcb7353..cd8d1b5 100644 --- a/app/fanout/manager.py +++ b/app/fanout/manager.py @@ -86,6 +86,11 @@ def _scope_matches_raw(scope: dict, _data: dict) -> bool: return scope.get("raw_packets", "none") == "all" +def _always_match(_scope: dict, _data: dict) -> bool: + """Match all modules unconditionally (filtering is module-internal).""" + return True + + class FanoutManager: """Owns all active fanout modules and dispatches events.""" @@ -270,6 +275,33 @@ class FanoutManager: log_label="on_raw", ) + async def broadcast_contact(self, data: dict) -> None: + """Dispatch a contact upsert to all modules.""" + await self._dispatch_matching( + data, + matcher=_always_match, + handler_name="on_contact", + log_label="on_contact", + ) + + async def broadcast_telemetry(self, data: dict) -> None: + """Dispatch a repeater telemetry snapshot to all modules.""" + await self._dispatch_matching( + data, + matcher=_always_match, + handler_name="on_telemetry", + log_label="on_telemetry", + ) + + async def broadcast_health_fanout(self, data: dict) -> None: + """Dispatch a radio health snapshot to all modules.""" + await self._dispatch_matching( + data, + matcher=_always_match, + handler_name="on_health", + log_label="on_health", + ) + async def stop_all(self) -> None: """Shutdown all modules.""" for config_id, (module, _) in list(self._modules.items()): diff --git a/app/radio_sync.py b/app/radio_sync.py index 868ee08..822a046 100644 --- a/app/radio_sync.py +++ b/app/radio_sync.py @@ -1585,9 +1585,10 @@ async def _collect_repeater_telemetry(mc: MeshCore, contact: Contact) -> bool: } try: + timestamp = int(time.time()) await RepeaterTelemetryRepository.record( public_key=contact.public_key, - timestamp=int(time.time()), + timestamp=timestamp, data=data, ) logger.info( @@ -1595,6 +1596,21 @@ async def _collect_repeater_telemetry(mc: MeshCore, contact: Contact) -> bool: contact.name or contact.public_key[:12], contact.public_key[:12], ) + + # Dispatch to fanout modules (e.g. HA MQTT discovery) + from app.fanout.manager import fanout_manager + + asyncio.create_task( + fanout_manager.broadcast_telemetry( + { + "public_key": contact.public_key, + "name": contact.name or contact.public_key[:12], + "timestamp": timestamp, + **data, + } + ) + ) + return True except Exception as e: logger.warning( diff --git a/app/routers/repeaters.py b/app/routers/repeaters.py index 1bf7c3a..b149bd4 100644 --- a/app/routers/repeaters.py +++ b/app/routers/repeaters.py @@ -1,3 +1,4 @@ +import asyncio import logging import time @@ -133,6 +134,20 @@ async def repeater_status(public_key: str) -> RepeaterStatusResponse: timestamp=now, data=status_dict, ) + + # Dispatch to fanout modules (e.g. HA MQTT discovery) + from app.fanout.manager import fanout_manager + + asyncio.create_task( + fanout_manager.broadcast_telemetry( + { + "public_key": contact.public_key, + "name": contact.name or contact.public_key[:12], + "timestamp": now, + **status_dict, + } + ) + ) except Exception as e: logger.warning("Failed to record telemetry history: %s", e) diff --git a/app/services/radio_stats.py b/app/services/radio_stats.py index 3fc1f4d..911b63d 100644 --- a/app/services/radio_stats.py +++ b/app/services/radio_stats.py @@ -1,12 +1,19 @@ """In-memory local-radio stats sampling. A single 60s loop fetches core, radio, and packet stats from the connected -radio in one radio-lock acquisition and caches everything in memory. The -noise-floor 24h history deque is maintained as a side effect. +radio in one radio-lock acquisition. The noise-floor 24h history deque is +maintained as a side effect. + +After each sample the loop: +1. Broadcasts a WS ``health`` frame so frontend dashboards refresh. +2. Dispatches a ``broadcast_health_fanout`` event carrying the full stats + snapshot plus radio identity, so fanout modules (e.g. HA MQTT) can + publish sensor state without a second radio poll. Consumers: - GET /api/health → get_latest_radio_stats() (battery, uptime, etc.) - GET /api/statistics → get_noise_floor_history() (24h noise-floor chart) +- Fanout on_health → _build_fanout_payload() (identity + stats) """ import asyncio @@ -31,24 +38,25 @@ _noise_floor_samples: deque[tuple[int, int]] = deque(maxlen=MAX_NOISE_FLOOR_SAMP _latest_stats: dict[str, Any] = {} -async def _sample_all_stats() -> None: - """Fetch core, radio, and packet stats in one radio operation.""" - global _latest_stats +async def _sample_all_stats() -> dict[str, Any]: + """Fetch core, radio, and packet stats in one radio operation. + Returns the snapshot dict (may be empty if the radio is disconnected or + all commands errored). + """ if not radio_manager.is_connected: - _latest_stats = {} - return + return {} try: - async with radio_manager.radio_operation("radio_stats_sample") as mc: + async with radio_manager.radio_operation("radio_stats_sample", blocking=False) as mc: core_event = await mc.commands.get_stats_core() radio_event = await mc.commands.get_stats_radio() packet_event = await mc.commands.get_stats_packets() except (RadioDisconnectedError, RadioOperationBusyError): - return + return {} except Exception as exc: logger.debug("Radio stats sampling failed: %s", exc) - return + return {} now = int(time.time()) snapshot: dict[str, Any] = {"timestamp": now} @@ -66,16 +74,62 @@ async def _sample_all_stats() -> None: snapshot["packets"] = packet_event.payload has_any_data = len(snapshot) > 1 - _latest_stats = snapshot if has_any_data else {} + return snapshot if has_any_data else {} + + +def _build_fanout_payload(stats: dict[str, Any]) -> dict: + """Build the health fanout payload from a stats snapshot + radio identity. + + Includes radio identity (public_key, name), connection state, and the + full stats snapshot so fanout modules can publish rich sensor data + without a second radio poll. + """ + mc = radio_manager.meshcore + self_info = mc.self_info if mc else None + + payload: dict = { + "connected": radio_manager.is_connected, + "connection_info": radio_manager.connection_info, + "public_key": (self_info.get("public_key") or None) if self_info else None, + "name": (self_info.get("name") or None) if self_info else None, + } + + if stats: + payload["noise_floor_dbm"] = stats.get("noise_floor") + payload["battery_mv"] = stats.get("battery_mv") + payload["uptime_secs"] = stats.get("uptime_secs") + payload["last_rssi"] = stats.get("last_rssi") + payload["last_snr"] = stats.get("last_snr") + payload["tx_air_secs"] = stats.get("tx_air_secs") + payload["rx_air_secs"] = stats.get("rx_air_secs") + packets = stats.get("packets") or {} + payload["packets_recv"] = packets.get("recv") + payload["packets_sent"] = packets.get("sent") + payload["flood_tx"] = packets.get("flood_tx") + payload["direct_tx"] = packets.get("direct_tx") + payload["flood_rx"] = packets.get("flood_rx") + payload["direct_rx"] = packets.get("direct_rx") + + return payload async def _stats_sampling_loop() -> None: + global _latest_stats while True: try: - await _sample_all_stats() + snapshot = await _sample_all_stats() + if snapshot: + _latest_stats = snapshot + elif not radio_manager.is_connected: + _latest_stats = {} from app.websocket import broadcast_health broadcast_health(radio_manager.is_connected, radio_manager.connection_info) + + # Dispatch enriched health snapshot to fanout modules + from app.fanout.manager import fanout_manager + + await fanout_manager.broadcast_health_fanout(_build_fanout_payload(snapshot)) except asyncio.CancelledError: raise except Exception: @@ -137,5 +191,5 @@ def get_noise_floor_history() -> dict: def get_latest_radio_stats() -> dict[str, Any]: - """Return the most recent radio stats snapshot.""" + """Return the most recent radio stats snapshot (for health endpoint).""" return dict(_latest_stats) diff --git a/app/websocket.py b/app/websocket.py index 81e67c8..399678d 100644 --- a/app/websocket.py +++ b/app/websocket.py @@ -110,6 +110,8 @@ def broadcast_event(event_type: str, data: dict, *, realtime: bool = True) -> No asyncio.create_task(fanout_manager.broadcast_message(data)) elif event_type == "raw_packet": asyncio.create_task(fanout_manager.broadcast_raw(data)) + elif event_type == "contact": + asyncio.create_task(fanout_manager.broadcast_contact(data)) def broadcast_error(message: str, details: str | None = None) -> None: diff --git a/tests/test_fanout.py b/tests/test_fanout.py index 9a63ccf..c70b0d8 100644 --- a/tests/test_fanout.py +++ b/tests/test_fanout.py @@ -101,6 +101,9 @@ class StubModule(FanoutModule): super().__init__("stub", {}) self.message_calls: list[dict] = [] self.raw_calls: list[dict] = [] + self.contact_calls: list[dict] = [] + self.telemetry_calls: list[dict] = [] + self.health_calls: list[dict] = [] self._status = "connected" async def start(self) -> None: @@ -115,6 +118,15 @@ class StubModule(FanoutModule): async def on_raw(self, data: dict) -> None: self.raw_calls.append(data) + async def on_contact(self, data: dict) -> None: + self.contact_calls.append(data) + + async def on_telemetry(self, data: dict) -> None: + self.telemetry_calls.append(data) + + async def on_health(self, data: dict) -> None: + self.health_calls.append(data) + @property def status(self) -> str: return self._status @@ -301,6 +313,113 @@ class TestFanoutManagerDispatch: assert statuses["test-id"]["last_error"] == "ConnectionError: broker down" +# --------------------------------------------------------------------------- +# New event dispatch (contact, telemetry, health) +# --------------------------------------------------------------------------- + + +class TestFanoutManagerNewEventDispatch: + @pytest.mark.asyncio + async def test_broadcast_contact_dispatches_to_all_modules(self): + manager = FanoutManager() + mod = StubModule() + manager._modules["test-id"] = (mod, {}) + + await manager.broadcast_contact({"public_key": "aabb", "name": "Alice"}) + + assert len(mod.contact_calls) == 1 + assert mod.contact_calls[0]["public_key"] == "aabb" + + @pytest.mark.asyncio + async def test_broadcast_contact_ignores_scope(self): + """Contact dispatch is unconditional — scope doesn't affect it.""" + manager = FanoutManager() + mod = StubModule() + manager._modules["test-id"] = (mod, {"messages": "none", "raw_packets": "none"}) + + await manager.broadcast_contact({"public_key": "aabb"}) + + assert len(mod.contact_calls) == 1 + + @pytest.mark.asyncio + async def test_broadcast_telemetry_dispatches_to_all_modules(self): + manager = FanoutManager() + mod = StubModule() + manager._modules["test-id"] = (mod, {}) + + await manager.broadcast_telemetry( + {"public_key": "ccdd", "battery_volts": 4.1, "timestamp": 1000} + ) + + assert len(mod.telemetry_calls) == 1 + assert mod.telemetry_calls[0]["battery_volts"] == 4.1 + + @pytest.mark.asyncio + async def test_broadcast_health_fanout_dispatches_to_all_modules(self): + manager = FanoutManager() + mod = StubModule() + manager._modules["test-id"] = (mod, {}) + + await manager.broadcast_health_fanout({"connected": True, "noise_floor_dbm": -112}) + + assert len(mod.health_calls) == 1 + assert mod.health_calls[0]["connected"] is True + + @pytest.mark.asyncio + async def test_new_events_do_not_affect_message_or_raw(self): + """Verify new dispatch paths are independent of message/raw.""" + manager = FanoutManager() + mod = StubModule() + manager._modules["test-id"] = (mod, {"messages": "all", "raw_packets": "all"}) + + await manager.broadcast_contact({"public_key": "aabb"}) + await manager.broadcast_telemetry({"public_key": "ccdd", "battery_volts": 3.8}) + await manager.broadcast_health_fanout({"connected": False}) + + assert len(mod.message_calls) == 0 + assert len(mod.raw_calls) == 0 + assert len(mod.contact_calls) == 1 + assert len(mod.telemetry_calls) == 1 + assert len(mod.health_calls) == 1 + + @pytest.mark.asyncio + async def test_base_module_no_ops_do_not_raise(self): + """Default FanoutModule no-ops accept data without error.""" + manager = FanoutManager() + + class MinimalModule(FanoutModule): + @property + def status(self) -> str: + return "connected" + + mod = MinimalModule("test", {}) + manager._modules["test-id"] = (mod, {}) + + # Should not raise — base class no-ops silently accept + await manager.broadcast_contact({"public_key": "aabb"}) + await manager.broadcast_telemetry({"public_key": "ccdd"}) + await manager.broadcast_health_fanout({"connected": True}) + + @pytest.mark.asyncio + async def test_error_in_one_module_does_not_block_others(self): + manager = FanoutManager() + + bad_mod = StubModule() + + async def fail(data): + raise RuntimeError("boom") + + bad_mod.on_contact = fail + + good_mod = StubModule() + manager._modules["bad"] = (bad_mod, {}) + manager._modules["good"] = (good_mod, {}) + + await manager.broadcast_contact({"public_key": "aabb"}) + + assert len(good_mod.contact_calls) == 1 + + # --------------------------------------------------------------------------- # Repository tests # --------------------------------------------------------------------------- @@ -476,6 +595,47 @@ class TestBroadcastEventRealtime: mock_ws.broadcast.assert_called_once() mock_fm.broadcast_message.assert_called_once() + @pytest.mark.asyncio + async def test_contact_event_dispatches_to_fanout(self): + """broadcast_event for 'contact' should trigger fanout contact dispatch.""" + from app.websocket import broadcast_event + + with ( + patch("app.websocket.ws_manager") as mock_ws, + patch("app.fanout.manager.fanout_manager") as mock_fm, + ): + mock_ws.broadcast = AsyncMock() + mock_fm.broadcast_contact = AsyncMock() + + broadcast_event("contact", {"public_key": "aabb"}, realtime=True) + + import asyncio + + await asyncio.sleep(0) + + mock_ws.broadcast.assert_called_once() + mock_fm.broadcast_contact.assert_called_once() + + @pytest.mark.asyncio + async def test_contact_event_skipped_when_not_realtime(self): + """broadcast_event('contact', ..., realtime=False) should skip fanout.""" + from app.websocket import broadcast_event + + with ( + patch("app.websocket.ws_manager") as mock_ws, + patch("app.fanout.manager.fanout_manager") as mock_fm, + ): + mock_ws.broadcast = AsyncMock() + + broadcast_event("contact", {"public_key": "aabb"}, realtime=False) + + import asyncio + + await asyncio.sleep(0) + + mock_ws.broadcast.assert_called_once() + mock_fm.broadcast_contact.assert_not_called() + # --------------------------------------------------------------------------- # Webhook module unit tests diff --git a/tests/test_radio_stats.py b/tests/test_radio_stats.py index 51cd518..010bc7e 100644 --- a/tests/test_radio_stats.py +++ b/tests/test_radio_stats.py @@ -17,11 +17,12 @@ class TestRadioStatsSamplingLoop: sample_calls = 0 sleep_calls = 0 - async def fake_sample() -> None: + async def fake_sample(): nonlocal sample_calls sample_calls += 1 if sample_calls == 1: raise RuntimeError("boom") + return {} async def fake_sleep(_seconds: int) -> None: nonlocal sleep_calls @@ -29,10 +30,14 @@ class TestRadioStatsSamplingLoop: if sleep_calls >= 2: raise asyncio.CancelledError() + mock_fanout = MagicMock() + mock_fanout.broadcast_health_fanout = AsyncMock() + with ( patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample), patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep), patch.object(radio_stats.logger, "exception") as mock_exception, + patch("app.fanout.manager.fanout_manager", mock_fanout), ): with pytest.raises(asyncio.CancelledError): await radio_stats._stats_sampling_loop() @@ -43,11 +48,11 @@ class TestRadioStatsSamplingLoop: @pytest.mark.asyncio async def test_broadcasts_health_every_cycle(self): - """The loop should push a WS health broadcast after every iteration.""" + """The loop should push a WS health broadcast and fanout after every iteration.""" sleep_calls = 0 - async def fake_sample() -> None: - pass # no-op; just testing that broadcast fires + async def fake_sample(): + return {} async def fake_sleep(_seconds: int) -> None: nonlocal sleep_calls @@ -55,36 +60,88 @@ class TestRadioStatsSamplingLoop: if sleep_calls >= 2: raise asyncio.CancelledError() + mock_fanout = MagicMock() + mock_fanout.broadcast_health_fanout = AsyncMock() + with ( patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample), patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep), patch("app.websocket.broadcast_health") as mock_broadcast, + patch("app.fanout.manager.fanout_manager", mock_fanout), ): with pytest.raises(asyncio.CancelledError): await radio_stats._stats_sampling_loop() assert mock_broadcast.call_count == 2 + assert mock_fanout.broadcast_health_fanout.call_count == 2 + + @pytest.mark.asyncio + async def test_fanout_receives_enriched_payload(self): + """The health fanout payload should include radio identity + stats.""" + sleep_calls = 0 + fake_snapshot = { + "timestamp": 1700000000, + "battery_mv": 4100, + "uptime_secs": 3600, + "noise_floor": -118, + "last_rssi": -85, + "last_snr": 9.5, + "tx_air_secs": 100, + "rx_air_secs": 200, + "packets": {"recv": 500, "sent": 250}, + } + + async def fake_sample(): + return dict(fake_snapshot) + + async def fake_sleep(_seconds: int) -> None: + nonlocal sleep_calls + sleep_calls += 1 + raise asyncio.CancelledError() + + mock_fanout = MagicMock() + mock_fanout.broadcast_health_fanout = AsyncMock() + + with ( + patch.object(radio_stats, "_sample_all_stats", side_effect=fake_sample), + patch.object(radio_stats.asyncio, "sleep", side_effect=fake_sleep), + patch("app.websocket.broadcast_health"), + patch("app.fanout.manager.fanout_manager", mock_fanout), + patch.object(radio_stats, "radio_manager") as mock_rm, + ): + mock_rm.is_connected = True + mock_rm.connection_info = "Serial: /dev/ttyUSB0" + mock_rm.meshcore = MagicMock() + mock_rm.meshcore.self_info = {"public_key": "aabbccddeeff", "name": "MyRadio"} + + with pytest.raises(asyncio.CancelledError): + await radio_stats._stats_sampling_loop() + + payload = mock_fanout.broadcast_health_fanout.call_args[0][0] + assert payload["connected"] is True + assert payload["public_key"] == "aabbccddeeff" + assert payload["name"] == "MyRadio" + assert payload["battery_mv"] == 4100 + assert payload["noise_floor_dbm"] == -118 + assert payload["packets_recv"] == 500 class TestSampleAllStats: @pytest.mark.asyncio - async def test_clears_cache_when_disconnected(self): - """Stats cache should be empty when radio is disconnected.""" - radio_stats._latest_stats = {"old": "data"} - + async def test_returns_empty_when_disconnected(self): + """Should return empty dict when radio is disconnected.""" with patch.object(radio_stats, "radio_manager") as mock_rm: mock_rm.is_connected = False - await radio_stats._sample_all_stats() + result = await radio_stats._sample_all_stats() - assert radio_stats._latest_stats == {} + assert result == {} @pytest.mark.asyncio async def test_partial_stats_still_records_available_data(self): """If core stats return ERROR but radio/packet stats succeed, noise floor - is still sampled and available fields are cached.""" + is still sampled and available fields are returned.""" from meshcore import EventType - radio_stats._latest_stats = {} radio_stats._noise_floor_samples.clear() core_event = _make_event(EventType.ERROR, {"reason": "unsupported"}) @@ -122,9 +179,8 @@ class TestSampleAllStats: with patch.object(radio_stats, "radio_manager") as mock_rm: mock_rm.is_connected = True mock_rm.radio_operation = MagicMock(return_value=mock_ctx) - await radio_stats._sample_all_stats() + snapshot = await radio_stats._sample_all_stats() - snapshot = radio_stats._latest_stats # Core fields missing (ERROR), but radio + packet fields present assert "battery_mv" not in snapshot assert snapshot["noise_floor"] == -118 @@ -134,10 +190,9 @@ class TestSampleAllStats: @pytest.mark.asyncio async def test_all_stats_succeed(self): - """All three stats commands succeed — full snapshot cached.""" + """All three stats commands succeed — full snapshot returned.""" from meshcore import EventType - radio_stats._latest_stats = {} radio_stats._noise_floor_samples.clear() core_event = _make_event( @@ -178,21 +233,18 @@ class TestSampleAllStats: with patch.object(radio_stats, "radio_manager") as mock_rm: mock_rm.is_connected = True mock_rm.radio_operation = MagicMock(return_value=mock_ctx) - await radio_stats._sample_all_stats() + snapshot = await radio_stats._sample_all_stats() - snapshot = radio_stats._latest_stats assert snapshot["battery_mv"] == 4100 assert snapshot["noise_floor"] == -120 assert snapshot["packets"]["sent"] == 250 assert len(radio_stats._noise_floor_samples) == 1 @pytest.mark.asyncio - async def test_all_errors_clears_cache(self): - """If every stats command returns ERROR, cache is empty.""" + async def test_all_errors_returns_empty(self): + """If every stats command returns ERROR, result is empty.""" from meshcore import EventType - radio_stats._latest_stats = {"old": "stale"} - error = _make_event(EventType.ERROR, {"reason": "unsupported"}) mock_mc = AsyncMock() @@ -207,6 +259,6 @@ class TestSampleAllStats: with patch.object(radio_stats, "radio_manager") as mock_rm: mock_rm.is_connected = True mock_rm.radio_operation = MagicMock(return_value=mock_ctx) - await radio_stats._sample_all_stats() + snapshot = await radio_stats._sample_all_stats() - assert radio_stats._latest_stats == {} + assert snapshot == {}