diff --git a/data/mesh_ingestor/daemon.py b/data/mesh_ingestor/daemon.py index b49820d..64b25bf 100644 --- a/data/mesh_ingestor/daemon.py +++ b/data/mesh_ingestor/daemon.py @@ -16,6 +16,7 @@ from __future__ import annotations +import dataclasses import inspect import signal import threading @@ -239,6 +240,278 @@ def _connected_state(candidate) -> bool | None: return None +# --------------------------------------------------------------------------- +# Loop state container +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class _DaemonState: + """All mutable state for the :func:`main` daemon loop.""" + + provider: Provider + stop: threading.Event + configured_port: str | None + inactivity_reconnect_secs: float + energy_saving_enabled: bool + energy_online_secs: float + energy_sleep_secs: float + retry_delay: float + last_seen_packet_monotonic: float | None + active_candidate: str | None + + iface: object = None + resolved_target: str | None = None + initial_snapshot_sent: bool = False + energy_session_deadline: float | None = None + iface_connected_at: float | None = None + last_inactivity_reconnect: float | None = None + ingestor_announcement_sent: bool = False + announced_target: bool = False + + +# --------------------------------------------------------------------------- +# Per-iteration helpers (each returns True when the caller should `continue`) +# --------------------------------------------------------------------------- + + +def _advance_retry_delay(current: float) -> float: + """Return the next exponential-backoff retry delay.""" + + if config._RECONNECT_MAX_DELAY_SECS <= 0: + return current + next_delay = current * 2 if current else config._RECONNECT_INITIAL_DELAY_SECS + return min(next_delay, config._RECONNECT_MAX_DELAY_SECS) + + +def _energy_sleep(state: _DaemonState, reason: str) -> None: + """Sleep for the configured energy-saving interval.""" + + if not state.energy_saving_enabled or state.energy_sleep_secs <= 0: + return + if config.DEBUG: + config._debug_log( + f"energy saving: {reason}; sleeping for {state.energy_sleep_secs:g}s" + ) + state.stop.wait(state.energy_sleep_secs) + + +def _try_connect(state: _DaemonState) -> bool: + """Attempt to establish the mesh interface. + + Returns: + ``True`` when connected and the loop should proceed; ``False`` when + the connection failed and the caller should ``continue``. + """ + + try: + state.iface, state.resolved_target, state.active_candidate = ( + state.provider.connect(active_candidate=state.active_candidate) + ) + handlers.register_host_node_id(state.provider.extract_host_node_id(state.iface)) + ingestors.set_ingestor_node_id(handlers.host_node_id()) + state.retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS) + state.initial_snapshot_sent = False + if not state.announced_target and state.resolved_target: + config._debug_log( + "Using mesh interface", + context="daemon.interface", + severity="info", + target=state.resolved_target, + ) + state.announced_target = True + if state.energy_saving_enabled and state.energy_online_secs > 0: + state.energy_session_deadline = time.monotonic() + state.energy_online_secs + else: + state.energy_session_deadline = None + state.iface_connected_at = time.monotonic() + # Seed the inactivity tracking from the connection time so a + # reconnect is given a full inactivity window even when the + # handler still reports the previous packet timestamp. + state.last_seen_packet_monotonic = state.iface_connected_at + state.last_inactivity_reconnect = None + return True + except interfaces.NoAvailableMeshInterface as exc: + config._debug_log( + "No mesh interface available", + context="daemon.interface", + severity="error", + error_message=str(exc), + ) + _close_interface(state.iface) + raise SystemExit(1) from exc + except Exception as exc: + config._debug_log( + "Failed to create mesh interface", + context="daemon.interface", + severity="warn", + candidate=state.active_candidate or "auto", + error_class=exc.__class__.__name__, + error_message=str(exc), + ) + if state.configured_port is None: + state.active_candidate = None + state.announced_target = False + state.stop.wait(state.retry_delay) + state.retry_delay = _advance_retry_delay(state.retry_delay) + return False + + +def _check_energy_saving(state: _DaemonState) -> bool: + """Disconnect and sleep when energy-saving conditions are met. + + Returns: + ``True`` when the interface was closed and the caller should + ``continue``; ``False`` otherwise. + """ + + if not state.energy_saving_enabled or state.iface is None: + return False + + session_expired = ( + state.energy_session_deadline is not None + and time.monotonic() >= state.energy_session_deadline + ) + ble_dropped = ( + _is_ble_interface(state.iface) + and getattr(state.iface, "client", object()) is None + ) + + if not session_expired and not ble_dropped: + return False + + reason = "disconnected after session" if session_expired else "BLE client disconnected" + log_msg = "Energy saving disconnect" if session_expired else "Energy saving BLE disconnect" + config._debug_log(log_msg, context="daemon.energy", severity="info") + _close_interface(state.iface) + state.iface = None + state.announced_target = False + state.initial_snapshot_sent = False + state.energy_session_deadline = None + _energy_sleep(state, reason) + return True + + +def _try_send_snapshot(state: _DaemonState) -> bool: + """Send the initial node snapshot via the provider. + + Returns: + ``True`` when the snapshot succeeded (or no nodes exist yet); ``False`` + when a hard error occurred and the caller should ``continue``. + """ + + try: + node_items = state.provider.node_snapshot_items(state.iface) + processed_any = False + for node_id, node in node_items: + processed_any = True + try: + handlers.upsert_node(node_id, node) + except Exception as exc: + config._debug_log( + "Failed to update node snapshot", + context="daemon.snapshot", + severity="warn", + node_id=node_id, + error_class=exc.__class__.__name__, + error_message=str(exc), + ) + if config.DEBUG: + config._debug_log( + "Snapshot node payload", + context="daemon.snapshot", + node=node, + ) + if processed_any: + state.initial_snapshot_sent = True + return True + except Exception as exc: + config._debug_log( + "Snapshot refresh failed", + context="daemon.snapshot", + severity="warn", + error_class=exc.__class__.__name__, + error_message=str(exc), + ) + _close_interface(state.iface) + state.iface = None + state.stop.wait(state.retry_delay) + state.retry_delay = _advance_retry_delay(state.retry_delay) + return False + + +def _check_inactivity_reconnect(state: _DaemonState) -> bool: + """Reconnect when the interface has been silent for too long. + + Returns: + ``True`` when a reconnect was triggered and the caller should + ``continue``; ``False`` otherwise. + """ + + if state.iface is None or state.inactivity_reconnect_secs <= 0: + return False + + now = time.monotonic() + iface_activity = handlers.last_packet_monotonic() + + if ( + iface_activity is not None + and state.iface_connected_at is not None + and iface_activity < state.iface_connected_at + ): + iface_activity = state.iface_connected_at + + if iface_activity is not None and ( + state.last_seen_packet_monotonic is None + or iface_activity > state.last_seen_packet_monotonic + ): + state.last_seen_packet_monotonic = iface_activity + state.last_inactivity_reconnect = None + + latest_activity = iface_activity + if latest_activity is None and state.iface_connected_at is not None: + latest_activity = state.iface_connected_at + if latest_activity is None: + latest_activity = now + + inactivity_elapsed = now - latest_activity + believed_disconnected = _connected_state(getattr(state.iface, "isConnected", None)) is False + + if not believed_disconnected and inactivity_elapsed < state.inactivity_reconnect_secs: + return False + + if ( + state.last_inactivity_reconnect is not None + and now - state.last_inactivity_reconnect < state.inactivity_reconnect_secs + ): + return False + + reason = ( + "disconnected" + if believed_disconnected + else f"no data for {inactivity_elapsed:.0f}s" + ) + config._debug_log( + "Mesh interface inactivity detected", + context="daemon.interface", + severity="warn", + reason=reason, + ) + state.last_inactivity_reconnect = now + _close_interface(state.iface) + state.iface = None + state.announced_target = False + state.initial_snapshot_sent = False + state.energy_session_deadline = None + state.iface_connected_at = None + return True + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + def main(*, provider: Provider | None = None) -> None: """Run the mesh ingestion daemon until interrupted.""" @@ -255,307 +528,83 @@ def main(*, provider: Provider | None = None) -> None: topics=subscribed, ) - iface = None - resolved_target = None - retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS) - - stop = threading.Event() - initial_snapshot_sent = False - energy_session_deadline = None - iface_connected_at: float | None = None - last_seen_packet_monotonic = handlers.last_packet_monotonic() - last_inactivity_reconnect: float | None = None - inactivity_reconnect_secs = max( - 0.0, getattr(config, "_INACTIVITY_RECONNECT_SECS", 0.0) + state = _DaemonState( + provider=provider, + stop=threading.Event(), + configured_port=config.CONNECTION, + inactivity_reconnect_secs=max( + 0.0, getattr(config, "_INACTIVITY_RECONNECT_SECS", 0.0) + ), + energy_saving_enabled=config.ENERGY_SAVING, + energy_online_secs=max(0.0, config._ENERGY_ONLINE_DURATION_SECS), + energy_sleep_secs=max(0.0, config._ENERGY_SLEEP_SECS), + retry_delay=max(0.0, config._RECONNECT_INITIAL_DELAY_SECS), + last_seen_packet_monotonic=handlers.last_packet_monotonic(), + active_candidate=config.CONNECTION, ) - ingestor_announcement_sent = False - - energy_saving_enabled = config.ENERGY_SAVING - energy_online_secs = max(0.0, config._ENERGY_ONLINE_DURATION_SECS) - energy_sleep_secs = max(0.0, config._ENERGY_SLEEP_SECS) - - def _energy_sleep(reason: str) -> None: - if not energy_saving_enabled or energy_sleep_secs <= 0: - return - if config.DEBUG: - config._debug_log( - f"energy saving: {reason}; sleeping for {energy_sleep_secs:g}s" - ) - stop.wait(energy_sleep_secs) def handle_sigterm(*_args) -> None: - stop.set() + state.stop.set() def handle_sigint(signum, frame) -> None: - if stop.is_set(): + if state.stop.is_set(): signal.default_int_handler(signum, frame) return - stop.set() + state.stop.set() if threading.current_thread() == threading.main_thread(): signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGTERM, handle_sigterm) - target = config.INSTANCE or "(no INSTANCE_DOMAIN configured)" - configured_port = config.CONNECTION - active_candidate = configured_port - announced_target = False config._debug_log( "Mesh daemon starting", context="daemon.main", severity="info", - target=target, - port=configured_port or "auto", + target=config.INSTANCE or "(no INSTANCE_DOMAIN configured)", + port=config.CONNECTION or "auto", channel=config.CHANNEL_INDEX, ) + try: - while not stop.is_set(): - if iface is None: - try: - iface, resolved_target, active_candidate = provider.connect( - active_candidate=active_candidate - ) - handlers.register_host_node_id( - provider.extract_host_node_id(iface) - ) - ingestors.set_ingestor_node_id(handlers.host_node_id()) - retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS) - initial_snapshot_sent = False - if not announced_target and resolved_target: - config._debug_log( - "Using mesh interface", - context="daemon.interface", - severity="info", - target=resolved_target, - ) - announced_target = True - if energy_saving_enabled and energy_online_secs > 0: - energy_session_deadline = time.monotonic() + energy_online_secs - else: - energy_session_deadline = None - iface_connected_at = time.monotonic() - # Seed the inactivity tracking from the connection time so a - # reconnect is given a full inactivity window even when the - # handler still reports the previous packet timestamp. - last_seen_packet_monotonic = iface_connected_at - last_inactivity_reconnect = None - except interfaces.NoAvailableMeshInterface as exc: - config._debug_log( - "No mesh interface available", - context="daemon.interface", - severity="error", - error_message=str(exc), - ) - _close_interface(iface) - raise SystemExit(1) from exc - except Exception as exc: - candidate_desc = active_candidate or "auto" - config._debug_log( - "Failed to create mesh interface", - context="daemon.interface", - severity="warn", - candidate=candidate_desc, - error_class=exc.__class__.__name__, - error_message=str(exc), - ) - if configured_port is None: - active_candidate = None - announced_target = False - stop.wait(retry_delay) - if config._RECONNECT_MAX_DELAY_SECS > 0: - retry_delay = min( - ( - retry_delay * 2 - if retry_delay - else config._RECONNECT_INITIAL_DELAY_SECS - ), - config._RECONNECT_MAX_DELAY_SECS, - ) - continue - - if energy_saving_enabled and iface is not None: - if ( - energy_session_deadline is not None - and time.monotonic() >= energy_session_deadline - ): - config._debug_log( - "Energy saving disconnect", - context="daemon.energy", - severity="info", - ) - _close_interface(iface) - iface = None - announced_target = False - initial_snapshot_sent = False - energy_session_deadline = None - _energy_sleep("disconnected after session") - continue - if ( - _is_ble_interface(iface) - and getattr(iface, "client", object()) is None - ): - config._debug_log( - "Energy saving BLE disconnect", - context="daemon.energy", - severity="info", - ) - _close_interface(iface) - iface = None - announced_target = False - initial_snapshot_sent = False - energy_session_deadline = None - _energy_sleep("BLE client disconnected") - continue - - if not initial_snapshot_sent: - try: - raw_snapshot = list(provider.node_snapshot_items(iface)) - node_items = _node_items_snapshot(dict(raw_snapshot)) - if node_items is None: - config._debug_log( - "Skipping node snapshot due to concurrent modification", - context="daemon.snapshot", - ) - else: - processed_snapshot_item = False - for node_id, node in node_items: - processed_snapshot_item = True - try: - handlers.upsert_node(node_id, node) - except Exception as exc: - config._debug_log( - "Failed to update node snapshot", - context="daemon.snapshot", - severity="warn", - node_id=node_id, - error_class=exc.__class__.__name__, - error_message=str(exc), - ) - if config.DEBUG: - config._debug_log( - "Snapshot node payload", - context="daemon.snapshot", - node=node, - ) - if processed_snapshot_item: - initial_snapshot_sent = True - except Exception as exc: - config._debug_log( - "Snapshot refresh failed", - context="daemon.snapshot", - severity="warn", - error_class=exc.__class__.__name__, - error_message=str(exc), - ) - _close_interface(iface) - iface = None - stop.wait(retry_delay) - if config._RECONNECT_MAX_DELAY_SECS > 0: - retry_delay = min( - ( - retry_delay * 2 - if retry_delay - else config._RECONNECT_INITIAL_DELAY_SECS - ), - config._RECONNECT_MAX_DELAY_SECS, - ) - continue - - if iface is not None and inactivity_reconnect_secs > 0: - now_monotonic = time.monotonic() - iface_activity = handlers.last_packet_monotonic() - if ( - iface_activity is not None - and iface_connected_at is not None - and iface_activity < iface_connected_at - ): - iface_activity = iface_connected_at - if iface_activity is not None and ( - last_seen_packet_monotonic is None - or iface_activity > last_seen_packet_monotonic - ): - last_seen_packet_monotonic = iface_activity - last_inactivity_reconnect = None - - latest_activity = iface_activity - if latest_activity is None and iface_connected_at is not None: - latest_activity = iface_connected_at - if latest_activity is None: - latest_activity = now_monotonic - - inactivity_elapsed = now_monotonic - latest_activity - - connected_attr = getattr(iface, "isConnected", None) - believed_disconnected = False - connected_state = _connected_state(connected_attr) - if connected_state is None: - if callable(connected_attr): - try: - believed_disconnected = not bool(connected_attr()) - except Exception: - believed_disconnected = False - elif connected_attr is not None: - try: - believed_disconnected = not bool(connected_attr) - except Exception: # pragma: no cover - defensive guard - believed_disconnected = False - else: - believed_disconnected = not connected_state - - should_reconnect = believed_disconnected or ( - inactivity_elapsed >= inactivity_reconnect_secs - ) - - if should_reconnect: - if ( - last_inactivity_reconnect is None - or now_monotonic - last_inactivity_reconnect - >= inactivity_reconnect_secs - ): - reason = ( - "disconnected" - if believed_disconnected - else f"no data for {inactivity_elapsed:.0f}s" - ) - config._debug_log( - "Mesh interface inactivity detected", - context="daemon.interface", - severity="warn", - reason=reason, - ) - last_inactivity_reconnect = now_monotonic - _close_interface(iface) - iface = None - announced_target = False - initial_snapshot_sent = False - energy_session_deadline = None - iface_connected_at = None - continue - - ingestor_announcement_sent = _process_ingestor_heartbeat( - iface, ingestor_announcement_sent=ingestor_announcement_sent + while not state.stop.is_set(): + if state.iface is None and not _try_connect(state): + continue + if _check_energy_saving(state): + continue + if not state.initial_snapshot_sent and not _try_send_snapshot(state): + continue + if _check_inactivity_reconnect(state): + continue + state.ingestor_announcement_sent = _process_ingestor_heartbeat( + state.iface, ingestor_announcement_sent=state.ingestor_announcement_sent ) - - retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS) - stop.wait(config.SNAPSHOT_SECS) + state.retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS) + state.stop.wait(config.SNAPSHOT_SECS) except KeyboardInterrupt: # pragma: no cover - interactive only config._debug_log( "Received KeyboardInterrupt; shutting down", context="daemon.main", severity="info", ) - stop.set() + state.stop.set() finally: - _close_interface(iface) + _close_interface(state.iface) __all__ = [ "_RECEIVE_TOPICS", - "_event_wait_allows_default_timeout", - "_node_items_snapshot", - "_subscribe_receive_topics", - "_is_ble_interface", - "_process_ingestor_heartbeat", + "_DaemonState", + "_advance_retry_delay", + "_check_energy_saving", + "_check_inactivity_reconnect", "_connected_state", + "_energy_sleep", + "_event_wait_allows_default_timeout", + "_is_ble_interface", + "_node_items_snapshot", + "_process_ingestor_heartbeat", + "_subscribe_receive_topics", + "_try_connect", + "_try_send_snapshot", "main", ] diff --git a/data/mesh_ingestor/providers/meshtastic.py b/data/mesh_ingestor/providers/meshtastic.py index 3f4d9ac..6e71a6e 100644 --- a/data/mesh_ingestor/providers/meshtastic.py +++ b/data/mesh_ingestor/providers/meshtastic.py @@ -16,9 +16,10 @@ from __future__ import annotations +import time from collections.abc import Iterable -from .. import daemon as _daemon, interfaces +from .. import config, daemon as _daemon, interfaces class MeshtasticProvider: @@ -62,9 +63,20 @@ class MeshtasticProvider: def extract_host_node_id(self, iface: object) -> str | None: return interfaces._extract_host_node_id(iface) - def node_snapshot_items(self, iface: object) -> Iterable[tuple[str, object]]: + def node_snapshot_items(self, iface: object) -> list[tuple[str, object]]: nodes = getattr(iface, "nodes", {}) or {} - return list(nodes.items()) + for _ in range(3): + try: + return list(nodes.items()) + except RuntimeError as err: + if "dictionary changed size during iteration" not in str(err): + raise + time.sleep(0) + config._debug_log( + "Skipping node snapshot due to concurrent modification", + context="meshtastic.snapshot", + ) + return [] __all__ = ["MeshtasticProvider"] diff --git a/tests/test_events_unit.py b/tests/test_events_unit.py new file mode 100644 index 0000000..227c4c3 --- /dev/null +++ b/tests/test_events_unit.py @@ -0,0 +1,167 @@ +# Copyright © 2025-26 l5yth & contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for :mod:`data.mesh_ingestor.events`.""" + +from __future__ import annotations + +import sys +from pathlib import Path + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from data.mesh_ingestor.events import ( # noqa: E402 - path setup + IngestorHeartbeat, + MessageEvent, + NeighborEntry, + NeighborsSnapshot, + PositionEvent, + TelemetryEvent, + TraceEvent, +) + + +def test_message_event_requires_id_rx_time_rx_iso(): + event: MessageEvent = {"id": 1, "rx_time": 1700000000, "rx_iso": "2023-11-14T00:00:00Z"} + assert event["id"] == 1 + assert event["rx_time"] == 1700000000 + assert event["rx_iso"] == "2023-11-14T00:00:00Z" + + +def test_message_event_accepts_optional_fields(): + event: MessageEvent = { + "id": 2, + "rx_time": 1700000001, + "rx_iso": "2023-11-14T00:00:01Z", + "text": "hello", + "from_id": "!aabbccdd", + "snr": 4.5, + "rssi": -90, + } + assert event["text"] == "hello" + assert event["snr"] == 4.5 + + +def test_position_event_required_fields(): + event: PositionEvent = {"id": 10, "rx_time": 1700000002, "rx_iso": "2023-11-14T00:00:02Z"} + assert event["id"] == 10 + + +def test_position_event_optional_fields(): + event: PositionEvent = { + "id": 11, + "rx_time": 1700000003, + "rx_iso": "2023-11-14T00:00:03Z", + "latitude": 37.7749, + "longitude": -122.4194, + "altitude": 10.0, + "node_id": "!aabbccdd", + } + assert event["latitude"] == 37.7749 + + +def test_telemetry_event_required_fields(): + event: TelemetryEvent = {"id": 20, "rx_time": 1700000004, "rx_iso": "2023-11-14T00:00:04Z"} + assert event["id"] == 20 + + +def test_telemetry_event_optional_fields(): + event: TelemetryEvent = { + "id": 21, + "rx_time": 1700000005, + "rx_iso": "2023-11-14T00:00:05Z", + "channel": 0, + "payload_b64": "AAEC", + "snr": 3.0, + } + assert event["payload_b64"] == "AAEC" + + +def test_neighbor_entry_required_fields(): + entry: NeighborEntry = {"rx_time": 1700000006, "rx_iso": "2023-11-14T00:00:06Z"} + assert entry["rx_time"] == 1700000006 + + +def test_neighbor_entry_optional_fields(): + entry: NeighborEntry = { + "rx_time": 1700000007, + "rx_iso": "2023-11-14T00:00:07Z", + "neighbor_id": "!11223344", + "snr": 6.0, + } + assert entry["neighbor_id"] == "!11223344" + + +def test_neighbors_snapshot_required_fields(): + snap: NeighborsSnapshot = { + "node_id": "!aabbccdd", + "rx_time": 1700000008, + "rx_iso": "2023-11-14T00:00:08Z", + } + assert snap["node_id"] == "!aabbccdd" + + +def test_neighbors_snapshot_optional_fields(): + snap: NeighborsSnapshot = { + "node_id": "!aabbccdd", + "rx_time": 1700000009, + "rx_iso": "2023-11-14T00:00:09Z", + "neighbors": [], + "node_broadcast_interval_secs": 900, + } + assert snap["node_broadcast_interval_secs"] == 900 + + +def test_trace_event_required_fields(): + event: TraceEvent = { + "hops": [1, 2, 3], + "rx_time": 1700000010, + "rx_iso": "2023-11-14T00:00:10Z", + } + assert event["hops"] == [1, 2, 3] + + +def test_trace_event_optional_fields(): + event: TraceEvent = { + "hops": [4, 5], + "rx_time": 1700000011, + "rx_iso": "2023-11-14T00:00:11Z", + "elapsed_ms": 42, + "snr": 2.5, + } + assert event["elapsed_ms"] == 42 + + +def test_ingestor_heartbeat_all_fields(): + hb: IngestorHeartbeat = { + "node_id": "!aabbccdd", + "start_time": 1700000000, + "last_seen_time": 1700000012, + "version": "0.5.11", + "lora_freq": 906875, + "modem_preset": "LONG_FAST", + } + assert hb["version"] == "0.5.11" + assert hb["lora_freq"] == 906875 + + +def test_ingestor_heartbeat_without_optional_fields(): + hb: IngestorHeartbeat = { + "node_id": "!aabbccdd", + "start_time": 1700000000, + "last_seen_time": 1700000013, + "version": "0.5.11", + } + assert "lora_freq" not in hb diff --git a/tests/test_provider_unit.py b/tests/test_provider_unit.py index 5b7e579..843a60a 100644 --- a/tests/test_provider_unit.py +++ b/tests/test_provider_unit.py @@ -107,3 +107,43 @@ def test_daemon_main_uses_provider_connect(monkeypatch): daemon.main(provider=FakeProvider()) assert calls["connect"] >= 1 + +def test_node_snapshot_items_retries_on_concurrent_mutation(monkeypatch): + """node_snapshot_items must retry on dict-mutation RuntimeError, not raise.""" + from data.mesh_ingestor.providers.meshtastic import MeshtasticProvider + + attempt = {"n": 0} + + class MutatingNodes: + def items(self): + attempt["n"] += 1 + if attempt["n"] < 3: + raise RuntimeError("dictionary changed size during iteration") + return [("!aabbccdd", {"num": 1})] + + class FakeIface: + nodes = MutatingNodes() + + monkeypatch.setattr("time.sleep", lambda _: None) + result = MeshtasticProvider().node_snapshot_items(FakeIface()) + assert result == [("!aabbccdd", {"num": 1})] + assert attempt["n"] == 3 + + +def test_node_snapshot_items_returns_empty_after_retry_exhaustion(monkeypatch): + """node_snapshot_items returns [] (non-fatal) when all retries fail.""" + from data.mesh_ingestor.providers.meshtastic import MeshtasticProvider + import data.mesh_ingestor.providers.meshtastic as _mod + + class AlwaysMutating: + def items(self): + raise RuntimeError("dictionary changed size during iteration") + + class FakeIface: + nodes = AlwaysMutating() + + monkeypatch.setattr("time.sleep", lambda _: None) + monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + result = MeshtasticProvider().node_snapshot_items(FakeIface()) + assert result == [] +