diff --git a/tests/test_daemon_unit.py b/tests/test_daemon_unit.py new file mode 100644 index 0000000..2924477 --- /dev/null +++ b/tests/test_daemon_unit.py @@ -0,0 +1,437 @@ +# Copyright © 2025-26 l5yth & contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for :mod:`data.mesh_ingestor.daemon`.""" + +from __future__ import annotations + +import sys +import threading +import types +from pathlib import Path +from typing import Any + +import pytest + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + +from data.mesh_ingestor import daemon + + +class FakeEvent: + """Test double for :class:`threading.Event` that can auto-set itself.""" + + instances: list["FakeEvent"] = [] + + def __init__(self, *, auto_set_on_wait: bool = False): + self._is_set = False + self._auto_set_on_wait = auto_set_on_wait + self.wait_calls: list[Any] = [] + FakeEvent.instances.append(self) + + def set(self) -> None: + """Mark the event as set.""" + + self._is_set = True + + def is_set(self) -> bool: + """Return whether the event is currently set.""" + + return self._is_set + + def wait(self, timeout: float | None = None) -> bool: + """Record waits and optionally auto-set the flag.""" + + self.wait_calls.append(timeout) + if self._auto_set_on_wait: + self._is_set = True + return self._is_set + + +class AutoSetEvent(FakeEvent): + """Event variant that automatically sets on each wait call.""" + + def __init__(self): # noqa: D401 - short initializer docstring handled by class + super().__init__(auto_set_on_wait=True) + + +@pytest.fixture(autouse=True) +def reset_fake_events(): + """Ensure :class:`FakeEvent` registry is cleared between tests.""" + + FakeEvent.instances.clear() + yield + FakeEvent.instances.clear() + + +def test_event_wait_default_detection(monkeypatch): + """``_event_wait_allows_default_timeout`` matches defaulted signatures.""" + + assert daemon._event_wait_allows_default_timeout() is True + + class _NoDefaultEvent: + def wait(self, timeout): # type: ignore[override] + return bool(timeout) + + monkeypatch.setattr( + daemon, "threading", types.SimpleNamespace(Event=_NoDefaultEvent) + ) + assert daemon._event_wait_allows_default_timeout() is False + + +def test_subscribe_receive_topics(monkeypatch): + """Subscribing to receive topics returns the exact topic list.""" + + subscribed: list[str] = [] + + def _record_subscription(_handler, topic): + subscribed.append(topic) + + monkeypatch.setattr( + daemon, "pub", types.SimpleNamespace(subscribe=_record_subscription) + ) + assert daemon._subscribe_receive_topics() == list(daemon._RECEIVE_TOPICS) + assert subscribed == list(daemon._RECEIVE_TOPICS) + + +def test_node_items_snapshot_handles_mutation(monkeypatch): + """Snapshots tolerate temporary runtime errors while iterating.""" + + class MutatingMapping(dict): + def __bool__(self): + return True + + def items(self): # type: ignore[override] + raise RuntimeError("dictionary changed size during iteration") + + monkeypatch.setattr(daemon.time, "sleep", lambda _: None) + assert daemon._node_items_snapshot({"a": 1}) == [("a", 1)] + assert daemon._node_items_snapshot(MutatingMapping(), retries=1) is None + + class IteratingMapping: + def __init__(self): + self.calls = 0 + self._data = {"x": 10, "y": 20} + + def __iter__(self): + self.calls += 1 + if self.calls == 1: + raise RuntimeError("dictionary changed size during iteration") + return iter(self._data) + + def __getitem__(self, key): + return self._data[key] + + mapping = IteratingMapping() + assert daemon._node_items_snapshot(mapping, retries=2) == [("x", 10), ("y", 20)] + + +def test_close_interface_respects_timeout(monkeypatch): + """Long-running close calls emit a timeout debug log.""" + + log_calls = [] + monkeypatch.setattr(daemon.config, "_CLOSE_TIMEOUT_SECS", 0.01) + monkeypatch.setattr( + daemon.config, "_debug_log", lambda *args, **kwargs: log_calls.append(kwargs) + ) + blocker = threading.Event() + + class SlowInterface: + def close(self): + blocker.wait(timeout=0.1) + + daemon._close_interface(SlowInterface()) + assert any("timeout_seconds" in entry for entry in log_calls) + + +def test_close_interface_immediate_path(monkeypatch): + """A zero timeout calls ``close`` inline without threading.""" + + flags = {"called": False} + monkeypatch.setattr(daemon.config, "_CLOSE_TIMEOUT_SECS", 0) + + class ImmediateInterface: + def close(self): + flags["called"] = True + + daemon._close_interface(ImmediateInterface()) + assert flags["called"] is True + + +def test_ble_interface_detection(): + """Detect BLE module names reliably.""" + + class BLE: + __module__ = "meshtastic.ble_interface" + + class NonBLE: + __module__ = "meshtastic.serial" + + assert daemon._is_ble_interface(BLE()) is True + assert daemon._is_ble_interface(NonBLE()) is False + assert daemon._is_ble_interface(None) is False + + +def test_process_ingestor_heartbeat_with_extracted_host(monkeypatch): + """Host id extraction triggers heartbeat announcement flag updates.""" + + host_ids: list[str | None] = [None] + ingestor_ids: list[str | None] = [] + queued: list[bool] = [] + + monkeypatch.setattr(daemon.handlers, "host_node_id", lambda: host_ids[0]) + monkeypatch.setattr( + daemon.interfaces, "_extract_host_node_id", lambda iface: "!abcd" + ) + monkeypatch.setattr( + daemon.handlers, + "register_host_node_id", + lambda node: host_ids.__setitem__(0, node), + ) + monkeypatch.setattr(daemon.ingestors, "set_ingestor_node_id", ingestor_ids.append) + monkeypatch.setattr( + daemon.ingestors, + "queue_ingestor_heartbeat", + lambda force: queued.append(force) or True, + ) + + assert ( + daemon._process_ingestor_heartbeat(object(), ingestor_announcement_sent=False) + is True + ) + assert host_ids[0] == "!abcd" + assert ingestor_ids[-1] == "!abcd" + assert queued[-1] is True + + monkeypatch.setattr(daemon.handlers, "host_node_id", lambda: "!abcd") + monkeypatch.setattr( + daemon.ingestors, + "queue_ingestor_heartbeat", + lambda force: queued.append(force) or False, + ) + assert ( + daemon._process_ingestor_heartbeat(object(), ingestor_announcement_sent=True) + is True + ) + assert queued[-1] is False + + +def test_connected_state_branches(monkeypatch): + """Connection state resolves across multiple attribute forms.""" + + event = threading.Event() + event.set() + assert daemon._connected_state(event) is True + + class CallableCandidate: + def __call__(self): + return False + + assert daemon._connected_state(CallableCandidate()) is False + + class BooleanCandidate: + def __bool__(self): + raise RuntimeError("cannot bool") + + assert daemon._connected_state(BooleanCandidate()) is None + + class HasIsSet: + def is_set(self): + raise RuntimeError("broken") + + assert daemon._connected_state(HasIsSet()) is None + + +def _configure_common_defaults( + monkeypatch, *, energy_saving: bool = False, inactivity: float = 0.0 +): + """Set fast configuration defaults shared by daemon integration tests.""" + + monkeypatch.setattr(daemon.config, "SNAPSHOT_SECS", 0) + monkeypatch.setattr(daemon.config, "_RECONNECT_INITIAL_DELAY_SECS", 0) + monkeypatch.setattr(daemon.config, "_RECONNECT_MAX_DELAY_SECS", 0) + monkeypatch.setattr(daemon.config, "_CLOSE_TIMEOUT_SECS", 0) + monkeypatch.setattr(daemon.config, "ENERGY_SAVING", energy_saving) + monkeypatch.setattr( + daemon.config, "_ENERGY_ONLINE_DURATION_SECS", 0 if energy_saving else 0.0 + ) + monkeypatch.setattr(daemon.config, "_ENERGY_SLEEP_SECS", 0.0) + monkeypatch.setattr(daemon.config, "_INGESTOR_HEARTBEAT_SECS", 0) + monkeypatch.setattr(daemon.config, "_INACTIVITY_RECONNECT_SECS", inactivity) + monkeypatch.setattr(daemon.config, "CONNECTION", "serial0") + + +class DummyInterface: + """Lightweight mesh interface stand-in used for daemon integration tests.""" + + def __init__(self, *, nodes=None, is_connected=True, client_present=True): + self.nodes = nodes if nodes is not None else {"!node": {"id": 1}} + self.isConnected = is_connected + self.client = object() if client_present else None + + def close(self): + return None + + +def test_main_happy_path(monkeypatch): + """The main loop processes snapshots and heartbeats once before stopping.""" + + _configure_common_defaults(monkeypatch) + monkeypatch.setattr( + daemon, + "threading", + types.SimpleNamespace( + Event=AutoSetEvent, + current_thread=threading.current_thread, + main_thread=threading.main_thread, + ), + ) + monkeypatch.setattr( + daemon, "pub", types.SimpleNamespace(subscribe=lambda *_args, **_kwargs: None) + ) + monkeypatch.setattr( + daemon.interfaces, + "_create_serial_interface", + lambda candidate: (DummyInterface(), candidate), + ) + monkeypatch.setattr(daemon.interfaces, "_ensure_radio_metadata", lambda iface: None) + monkeypatch.setattr( + daemon.interfaces, "_ensure_channel_metadata", lambda iface: None + ) + monkeypatch.setattr( + daemon.interfaces, "_extract_host_node_id", lambda iface: "!host" + ) + + host_id = {"value": None} + monkeypatch.setattr( + daemon.handlers, + "register_host_node_id", + lambda node: host_id.__setitem__("value", node), + ) + monkeypatch.setattr(daemon.handlers, "host_node_id", lambda: host_id["value"]) + monkeypatch.setattr(daemon.handlers, "upsert_node", lambda *_args, **_kwargs: None) + monkeypatch.setattr(daemon.handlers, "last_packet_monotonic", lambda: None) + + heartbeats: list[bool] = [] + monkeypatch.setattr( + daemon.ingestors, "set_ingestor_node_id", lambda *_args, **_kwargs: None + ) + monkeypatch.setattr( + daemon.ingestors, + "queue_ingestor_heartbeat", + lambda force: heartbeats.append(force) or True, + ) + + daemon.main() + assert heartbeats + assert host_id["value"] == "!host" + assert FakeEvent.instances and FakeEvent.instances[0].is_set() is True + + +def test_main_energy_saving_disconnect(monkeypatch): + """Energy saving mode disconnects and sleeps when deadlines expire.""" + + _configure_common_defaults(monkeypatch, energy_saving=True) + monkeypatch.setattr( + daemon, + "threading", + types.SimpleNamespace( + Event=AutoSetEvent, + current_thread=threading.current_thread, + main_thread=threading.main_thread, + ), + ) + monkeypatch.setattr( + daemon, "pub", types.SimpleNamespace(subscribe=lambda *_args, **_kwargs: None) + ) + monkeypatch.setattr( + daemon.interfaces, + "_create_serial_interface", + lambda candidate: (DummyInterface(), candidate), + ) + monkeypatch.setattr(daemon.interfaces, "_ensure_radio_metadata", lambda iface: None) + monkeypatch.setattr( + daemon.interfaces, "_ensure_channel_metadata", lambda iface: None + ) + monkeypatch.setattr( + daemon.interfaces, "_extract_host_node_id", lambda iface: "!host" + ) + monkeypatch.setattr( + daemon.handlers, "register_host_node_id", lambda *_args, **_kwargs: None + ) + monkeypatch.setattr(daemon.handlers, "host_node_id", lambda: "!host") + monkeypatch.setattr(daemon.handlers, "upsert_node", lambda *_args, **_kwargs: None) + monkeypatch.setattr(daemon.handlers, "last_packet_monotonic", lambda: None) + monkeypatch.setattr( + daemon.ingestors, "set_ingestor_node_id", lambda *_args, **_kwargs: None + ) + monkeypatch.setattr( + daemon.ingestors, "queue_ingestor_heartbeat", lambda *_args, **_kwargs: True + ) + + daemon.main() + assert FakeEvent.instances and FakeEvent.instances[0].is_set() is True + + +def test_main_inactivity_reconnect(monkeypatch): + """Inactivity triggers reconnect attempts and respects stop events.""" + + _configure_common_defaults(monkeypatch, inactivity=0.5) + monkeypatch.setattr( + daemon, + "threading", + types.SimpleNamespace( + Event=AutoSetEvent, + current_thread=threading.current_thread, + main_thread=threading.main_thread, + ), + ) + monkeypatch.setattr( + daemon, "pub", types.SimpleNamespace(subscribe=lambda *_args, **_kwargs: None) + ) + + interface_cycle = iter( + [DummyInterface(is_connected=False), DummyInterface(is_connected=True)] + ) + monkeypatch.setattr( + daemon.interfaces, + "_create_serial_interface", + lambda candidate: (next(interface_cycle), candidate), + ) + monkeypatch.setattr(daemon.interfaces, "_ensure_radio_metadata", lambda iface: None) + monkeypatch.setattr( + daemon.interfaces, "_ensure_channel_metadata", lambda iface: None + ) + monkeypatch.setattr( + daemon.interfaces, "_extract_host_node_id", lambda iface: "!host" + ) + monkeypatch.setattr( + daemon.handlers, "register_host_node_id", lambda *_args, **_kwargs: None + ) + monkeypatch.setattr(daemon.handlers, "host_node_id", lambda: "!host") + monkeypatch.setattr(daemon.handlers, "upsert_node", lambda *_args, **_kwargs: None) + + monotonic_calls = iter([0.0, 1.0, 2.0, 3.0, 4.0]) + monkeypatch.setattr(daemon.time, "monotonic", lambda: next(monotonic_calls)) + monkeypatch.setattr(daemon.handlers, "last_packet_monotonic", lambda: 0.0) + monkeypatch.setattr( + daemon.ingestors, "set_ingestor_node_id", lambda *_args, **_kwargs: None + ) + monkeypatch.setattr( + daemon.ingestors, "queue_ingestor_heartbeat", lambda *_args, **_kwargs: True + ) + + daemon.main() + assert any(event.is_set() for event in FakeEvent.instances)