Compare commits

..

2 Commits

Author SHA1 Message Date
l5y
e9b1c102f5 address review comments 2026-03-28 17:41:29 +01:00
l5y
29be258b57 data: resolve circular dependency of deamon.py 2026-03-28 17:11:38 +01:00
9 changed files with 616 additions and 316 deletions

View File

@@ -16,6 +16,7 @@
from __future__ import annotations
import dataclasses
import inspect
import signal
import threading
@@ -25,7 +26,6 @@ from pubsub import pub
from . import config, handlers, ingestors, interfaces
from .provider import Provider
from .providers.meshtastic import MeshtasticProvider
_RECEIVE_TOPICS = (
"meshtastic.receive",
@@ -199,11 +199,6 @@ def _process_ingestor_heartbeat(iface, *, ingestor_announcement_sent: bool) -> b
if heartbeat_sent and not ingestor_announcement_sent:
return True
return ingestor_announcement_sent
iface_cls = getattr(iface_obj, "__class__", None)
if iface_cls is None:
return False
module_name = getattr(iface_cls, "__module__", "") or ""
return "ble_interface" in module_name
def _connected_state(candidate) -> bool | None:
@@ -245,10 +240,284 @@ def _connected_state(candidate) -> bool | None:
return None
def main(existing_interface=None, *, provider: Provider | None = None) -> None:
# ---------------------------------------------------------------------------
# Loop state container
# ---------------------------------------------------------------------------
@dataclasses.dataclass
class _DaemonState:
"""All mutable state for the :func:`main` daemon loop."""
provider: Provider
stop: threading.Event
configured_port: str | None
inactivity_reconnect_secs: float
energy_saving_enabled: bool
energy_online_secs: float
energy_sleep_secs: float
retry_delay: float
last_seen_packet_monotonic: float | None
active_candidate: str | None
iface: object = None
resolved_target: str | None = None
initial_snapshot_sent: bool = False
energy_session_deadline: float | None = None
iface_connected_at: float | None = None
last_inactivity_reconnect: float | None = None
ingestor_announcement_sent: bool = False
announced_target: bool = False
# ---------------------------------------------------------------------------
# Per-iteration helpers (each returns True when the caller should `continue`)
# ---------------------------------------------------------------------------
def _advance_retry_delay(current: float) -> float:
"""Return the next exponential-backoff retry delay."""
if config._RECONNECT_MAX_DELAY_SECS <= 0:
return current
next_delay = current * 2 if current else config._RECONNECT_INITIAL_DELAY_SECS
return min(next_delay, config._RECONNECT_MAX_DELAY_SECS)
def _energy_sleep(state: _DaemonState, reason: str) -> None:
"""Sleep for the configured energy-saving interval."""
if not state.energy_saving_enabled or state.energy_sleep_secs <= 0:
return
if config.DEBUG:
config._debug_log(
f"energy saving: {reason}; sleeping for {state.energy_sleep_secs:g}s"
)
state.stop.wait(state.energy_sleep_secs)
def _try_connect(state: _DaemonState) -> bool:
"""Attempt to establish the mesh interface.
Returns:
``True`` when connected and the loop should proceed; ``False`` when
the connection failed and the caller should ``continue``.
"""
try:
state.iface, state.resolved_target, state.active_candidate = (
state.provider.connect(active_candidate=state.active_candidate)
)
handlers.register_host_node_id(state.provider.extract_host_node_id(state.iface))
ingestors.set_ingestor_node_id(handlers.host_node_id())
state.retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS)
state.initial_snapshot_sent = False
if not state.announced_target and state.resolved_target:
config._debug_log(
"Using mesh interface",
context="daemon.interface",
severity="info",
target=state.resolved_target,
)
state.announced_target = True
if state.energy_saving_enabled and state.energy_online_secs > 0:
state.energy_session_deadline = time.monotonic() + state.energy_online_secs
else:
state.energy_session_deadline = None
state.iface_connected_at = time.monotonic()
# Seed the inactivity tracking from the connection time so a
# reconnect is given a full inactivity window even when the
# handler still reports the previous packet timestamp.
state.last_seen_packet_monotonic = state.iface_connected_at
state.last_inactivity_reconnect = None
return True
except interfaces.NoAvailableMeshInterface as exc:
config._debug_log(
"No mesh interface available",
context="daemon.interface",
severity="error",
error_message=str(exc),
)
_close_interface(state.iface)
raise SystemExit(1) from exc
except Exception as exc:
config._debug_log(
"Failed to create mesh interface",
context="daemon.interface",
severity="warn",
candidate=state.active_candidate or "auto",
error_class=exc.__class__.__name__,
error_message=str(exc),
)
if state.configured_port is None:
state.active_candidate = None
state.announced_target = False
state.stop.wait(state.retry_delay)
state.retry_delay = _advance_retry_delay(state.retry_delay)
return False
def _check_energy_saving(state: _DaemonState) -> bool:
"""Disconnect and sleep when energy-saving conditions are met.
Returns:
``True`` when the interface was closed and the caller should
``continue``; ``False`` otherwise.
"""
if not state.energy_saving_enabled or state.iface is None:
return False
session_expired = (
state.energy_session_deadline is not None
and time.monotonic() >= state.energy_session_deadline
)
ble_dropped = (
_is_ble_interface(state.iface)
and getattr(state.iface, "client", object()) is None
)
if not session_expired and not ble_dropped:
return False
reason = "disconnected after session" if session_expired else "BLE client disconnected"
log_msg = "Energy saving disconnect" if session_expired else "Energy saving BLE disconnect"
config._debug_log(log_msg, context="daemon.energy", severity="info")
_close_interface(state.iface)
state.iface = None
state.announced_target = False
state.initial_snapshot_sent = False
state.energy_session_deadline = None
_energy_sleep(state, reason)
return True
def _try_send_snapshot(state: _DaemonState) -> bool:
"""Send the initial node snapshot via the provider.
Returns:
``True`` when the snapshot succeeded (or no nodes exist yet); ``False``
when a hard error occurred and the caller should ``continue``.
"""
try:
node_items = state.provider.node_snapshot_items(state.iface)
processed_any = False
for node_id, node in node_items:
processed_any = True
try:
handlers.upsert_node(node_id, node)
except Exception as exc:
config._debug_log(
"Failed to update node snapshot",
context="daemon.snapshot",
severity="warn",
node_id=node_id,
error_class=exc.__class__.__name__,
error_message=str(exc),
)
if config.DEBUG:
config._debug_log(
"Snapshot node payload",
context="daemon.snapshot",
node=node,
)
if processed_any:
state.initial_snapshot_sent = True
return True
except Exception as exc:
config._debug_log(
"Snapshot refresh failed",
context="daemon.snapshot",
severity="warn",
error_class=exc.__class__.__name__,
error_message=str(exc),
)
_close_interface(state.iface)
state.iface = None
state.stop.wait(state.retry_delay)
state.retry_delay = _advance_retry_delay(state.retry_delay)
return False
def _check_inactivity_reconnect(state: _DaemonState) -> bool:
"""Reconnect when the interface has been silent for too long.
Returns:
``True`` when a reconnect was triggered and the caller should
``continue``; ``False`` otherwise.
"""
if state.iface is None or state.inactivity_reconnect_secs <= 0:
return False
now = time.monotonic()
iface_activity = handlers.last_packet_monotonic()
if (
iface_activity is not None
and state.iface_connected_at is not None
and iface_activity < state.iface_connected_at
):
iface_activity = state.iface_connected_at
if iface_activity is not None and (
state.last_seen_packet_monotonic is None
or iface_activity > state.last_seen_packet_monotonic
):
state.last_seen_packet_monotonic = iface_activity
state.last_inactivity_reconnect = None
latest_activity = iface_activity
if latest_activity is None and state.iface_connected_at is not None:
latest_activity = state.iface_connected_at
if latest_activity is None:
latest_activity = now
inactivity_elapsed = now - latest_activity
believed_disconnected = _connected_state(getattr(state.iface, "isConnected", None)) is False
if not believed_disconnected and inactivity_elapsed < state.inactivity_reconnect_secs:
return False
if (
state.last_inactivity_reconnect is not None
and now - state.last_inactivity_reconnect < state.inactivity_reconnect_secs
):
return False
reason = (
"disconnected"
if believed_disconnected
else f"no data for {inactivity_elapsed:.0f}s"
)
config._debug_log(
"Mesh interface inactivity detected",
context="daemon.interface",
severity="warn",
reason=reason,
)
state.last_inactivity_reconnect = now
_close_interface(state.iface)
state.iface = None
state.announced_target = False
state.initial_snapshot_sent = False
state.energy_session_deadline = None
state.iface_connected_at = None
return True
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(*, provider: Provider | None = None) -> None:
"""Run the mesh ingestion daemon until interrupted."""
provider = provider or MeshtasticProvider()
if provider is None:
from .providers.meshtastic import MeshtasticProvider
provider = MeshtasticProvider()
subscribed = provider.subscribe()
if subscribed:
@@ -259,307 +528,83 @@ def main(existing_interface=None, *, provider: Provider | None = None) -> None:
topics=subscribed,
)
iface = existing_interface
resolved_target = None
retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS)
stop = threading.Event()
initial_snapshot_sent = False
energy_session_deadline = None
iface_connected_at: float | None = None
last_seen_packet_monotonic = handlers.last_packet_monotonic()
last_inactivity_reconnect: float | None = None
inactivity_reconnect_secs = max(
0.0, getattr(config, "_INACTIVITY_RECONNECT_SECS", 0.0)
state = _DaemonState(
provider=provider,
stop=threading.Event(),
configured_port=config.CONNECTION,
inactivity_reconnect_secs=max(
0.0, getattr(config, "_INACTIVITY_RECONNECT_SECS", 0.0)
),
energy_saving_enabled=config.ENERGY_SAVING,
energy_online_secs=max(0.0, config._ENERGY_ONLINE_DURATION_SECS),
energy_sleep_secs=max(0.0, config._ENERGY_SLEEP_SECS),
retry_delay=max(0.0, config._RECONNECT_INITIAL_DELAY_SECS),
last_seen_packet_monotonic=handlers.last_packet_monotonic(),
active_candidate=config.CONNECTION,
)
ingestor_announcement_sent = False
energy_saving_enabled = config.ENERGY_SAVING
energy_online_secs = max(0.0, config._ENERGY_ONLINE_DURATION_SECS)
energy_sleep_secs = max(0.0, config._ENERGY_SLEEP_SECS)
def _energy_sleep(reason: str) -> None:
if not energy_saving_enabled or energy_sleep_secs <= 0:
return
if config.DEBUG:
config._debug_log(
f"energy saving: {reason}; sleeping for {energy_sleep_secs:g}s"
)
stop.wait(energy_sleep_secs)
def handle_sigterm(*_args) -> None:
stop.set()
state.stop.set()
def handle_sigint(signum, frame) -> None:
if stop.is_set():
if state.stop.is_set():
signal.default_int_handler(signum, frame)
return
stop.set()
state.stop.set()
if threading.current_thread() == threading.main_thread():
signal.signal(signal.SIGINT, handle_sigint)
signal.signal(signal.SIGTERM, handle_sigterm)
target = config.INSTANCE or "(no INSTANCE_DOMAIN configured)"
configured_port = config.CONNECTION
active_candidate = configured_port
announced_target = False
config._debug_log(
"Mesh daemon starting",
context="daemon.main",
severity="info",
target=target,
port=configured_port or "auto",
target=config.INSTANCE or "(no INSTANCE_DOMAIN configured)",
port=config.CONNECTION or "auto",
channel=config.CHANNEL_INDEX,
)
try:
while not stop.is_set():
if iface is None:
try:
iface, resolved_target, active_candidate = provider.connect(
active_candidate=active_candidate
)
handlers.register_host_node_id(
provider.extract_host_node_id(iface)
)
ingestors.set_ingestor_node_id(handlers.host_node_id())
retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS)
initial_snapshot_sent = False
if not announced_target and resolved_target:
config._debug_log(
"Using mesh interface",
context="daemon.interface",
severity="info",
target=resolved_target,
)
announced_target = True
if energy_saving_enabled and energy_online_secs > 0:
energy_session_deadline = time.monotonic() + energy_online_secs
else:
energy_session_deadline = None
iface_connected_at = time.monotonic()
# Seed the inactivity tracking from the connection time so a
# reconnect is given a full inactivity window even when the
# handler still reports the previous packet timestamp.
last_seen_packet_monotonic = iface_connected_at
last_inactivity_reconnect = None
except interfaces.NoAvailableMeshInterface as exc:
config._debug_log(
"No mesh interface available",
context="daemon.interface",
severity="error",
error_message=str(exc),
)
_close_interface(iface)
raise SystemExit(1) from exc
except Exception as exc:
candidate_desc = active_candidate or "auto"
config._debug_log(
"Failed to create mesh interface",
context="daemon.interface",
severity="warn",
candidate=candidate_desc,
error_class=exc.__class__.__name__,
error_message=str(exc),
)
if configured_port is None:
active_candidate = None
announced_target = False
stop.wait(retry_delay)
if config._RECONNECT_MAX_DELAY_SECS > 0:
retry_delay = min(
(
retry_delay * 2
if retry_delay
else config._RECONNECT_INITIAL_DELAY_SECS
),
config._RECONNECT_MAX_DELAY_SECS,
)
continue
if energy_saving_enabled and iface is not None:
if (
energy_session_deadline is not None
and time.monotonic() >= energy_session_deadline
):
config._debug_log(
"Energy saving disconnect",
context="daemon.energy",
severity="info",
)
_close_interface(iface)
iface = None
announced_target = False
initial_snapshot_sent = False
energy_session_deadline = None
_energy_sleep("disconnected after session")
continue
if (
_is_ble_interface(iface)
and getattr(iface, "client", object()) is None
):
config._debug_log(
"Energy saving BLE disconnect",
context="daemon.energy",
severity="info",
)
_close_interface(iface)
iface = None
announced_target = False
initial_snapshot_sent = False
energy_session_deadline = None
_energy_sleep("BLE client disconnected")
continue
if not initial_snapshot_sent:
try:
node_items = list(provider.node_snapshot_items(iface))
node_items = _node_items_snapshot(dict(node_items))
if node_items is None:
config._debug_log(
"Skipping node snapshot due to concurrent modification",
context="daemon.snapshot",
)
else:
processed_snapshot_item = False
for node_id, node in node_items:
processed_snapshot_item = True
try:
handlers.upsert_node(node_id, node)
except Exception as exc:
config._debug_log(
"Failed to update node snapshot",
context="daemon.snapshot",
severity="warn",
node_id=node_id,
error_class=exc.__class__.__name__,
error_message=str(exc),
)
if config.DEBUG:
config._debug_log(
"Snapshot node payload",
context="daemon.snapshot",
node=node,
)
if processed_snapshot_item:
initial_snapshot_sent = True
except Exception as exc:
config._debug_log(
"Snapshot refresh failed",
context="daemon.snapshot",
severity="warn",
error_class=exc.__class__.__name__,
error_message=str(exc),
)
_close_interface(iface)
iface = None
stop.wait(retry_delay)
if config._RECONNECT_MAX_DELAY_SECS > 0:
retry_delay = min(
(
retry_delay * 2
if retry_delay
else config._RECONNECT_INITIAL_DELAY_SECS
),
config._RECONNECT_MAX_DELAY_SECS,
)
continue
if iface is not None and inactivity_reconnect_secs > 0:
now_monotonic = time.monotonic()
iface_activity = handlers.last_packet_monotonic()
if (
iface_activity is not None
and iface_connected_at is not None
and iface_activity < iface_connected_at
):
iface_activity = iface_connected_at
if iface_activity is not None and (
last_seen_packet_monotonic is None
or iface_activity > last_seen_packet_monotonic
):
last_seen_packet_monotonic = iface_activity
last_inactivity_reconnect = None
latest_activity = iface_activity
if latest_activity is None and iface_connected_at is not None:
latest_activity = iface_connected_at
if latest_activity is None:
latest_activity = now_monotonic
inactivity_elapsed = now_monotonic - latest_activity
connected_attr = getattr(iface, "isConnected", None)
believed_disconnected = False
connected_state = _connected_state(connected_attr)
if connected_state is None:
if callable(connected_attr):
try:
believed_disconnected = not bool(connected_attr())
except Exception:
believed_disconnected = False
elif connected_attr is not None:
try:
believed_disconnected = not bool(connected_attr)
except Exception: # pragma: no cover - defensive guard
believed_disconnected = False
else:
believed_disconnected = not connected_state
should_reconnect = believed_disconnected or (
inactivity_elapsed >= inactivity_reconnect_secs
)
if should_reconnect:
if (
last_inactivity_reconnect is None
or now_monotonic - last_inactivity_reconnect
>= inactivity_reconnect_secs
):
reason = (
"disconnected"
if believed_disconnected
else f"no data for {inactivity_elapsed:.0f}s"
)
config._debug_log(
"Mesh interface inactivity detected",
context="daemon.interface",
severity="warn",
reason=reason,
)
last_inactivity_reconnect = now_monotonic
_close_interface(iface)
iface = None
announced_target = False
initial_snapshot_sent = False
energy_session_deadline = None
iface_connected_at = None
continue
ingestor_announcement_sent = _process_ingestor_heartbeat(
iface, ingestor_announcement_sent=ingestor_announcement_sent
while not state.stop.is_set():
if state.iface is None and not _try_connect(state):
continue
if _check_energy_saving(state):
continue
if not state.initial_snapshot_sent and not _try_send_snapshot(state):
continue
if _check_inactivity_reconnect(state):
continue
state.ingestor_announcement_sent = _process_ingestor_heartbeat(
state.iface, ingestor_announcement_sent=state.ingestor_announcement_sent
)
retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS)
stop.wait(config.SNAPSHOT_SECS)
state.retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS)
state.stop.wait(config.SNAPSHOT_SECS)
except KeyboardInterrupt: # pragma: no cover - interactive only
config._debug_log(
"Received KeyboardInterrupt; shutting down",
context="daemon.main",
severity="info",
)
stop.set()
state.stop.set()
finally:
_close_interface(iface)
_close_interface(state.iface)
__all__ = [
"_RECEIVE_TOPICS",
"_event_wait_allows_default_timeout",
"_node_items_snapshot",
"_subscribe_receive_topics",
"_is_ble_interface",
"_process_ingestor_heartbeat",
"_DaemonState",
"_advance_retry_delay",
"_check_energy_saving",
"_check_inactivity_reconnect",
"_connected_state",
"_energy_sleep",
"_event_wait_allows_default_timeout",
"_is_ble_interface",
"_node_items_snapshot",
"_process_ingestor_heartbeat",
"_subscribe_receive_topics",
"_try_connect",
"_try_send_snapshot",
"main",
]

View File

@@ -27,10 +27,13 @@ from __future__ import annotations
from typing import NotRequired, TypedDict
class MessageEvent(TypedDict, total=False):
class _MessageEventRequired(TypedDict):
id: int
rx_time: int
rx_iso: str
class MessageEvent(_MessageEventRequired, total=False):
from_id: object
to_id: object
channel: int
@@ -48,15 +51,18 @@ class MessageEvent(TypedDict, total=False):
modem_preset: str
class PositionEvent(TypedDict, total=False):
class _PositionEventRequired(TypedDict):
id: int
rx_time: int
rx_iso: str
class PositionEvent(_PositionEventRequired, total=False):
node_id: str
node_num: int | None
num: int | None
from_id: str | None
to_id: object
rx_time: int
rx_iso: str
latitude: float | None
longitude: float | None
altitude: float | None
@@ -78,14 +84,17 @@ class PositionEvent(TypedDict, total=False):
modem_preset: str
class TelemetryEvent(TypedDict, total=False):
class _TelemetryEventRequired(TypedDict):
id: int
rx_time: int
rx_iso: str
class TelemetryEvent(_TelemetryEventRequired, total=False):
node_id: str | None
node_num: int | None
from_id: object
to_id: object
rx_time: int
rx_iso: str
telemetry_time: int | None
channel: int
portnum: str | None
@@ -102,20 +111,26 @@ class TelemetryEvent(TypedDict, total=False):
# evolves over time.
class NeighborEntry(TypedDict, total=False):
class _NeighborEntryRequired(TypedDict):
rx_time: int
rx_iso: str
class NeighborEntry(_NeighborEntryRequired, total=False):
neighbor_id: str
neighbor_num: int | None
snr: float | None
class _NeighborsSnapshotRequired(TypedDict):
node_id: str
rx_time: int
rx_iso: str
class NeighborsSnapshot(TypedDict, total=False):
node_id: str
class NeighborsSnapshot(_NeighborsSnapshotRequired, total=False):
node_num: int | None
neighbors: list[NeighborEntry]
rx_time: int
rx_iso: str
node_broadcast_interval_secs: int | None
last_sent_by_id: str | None
ingestor: str | None
@@ -123,14 +138,17 @@ class NeighborsSnapshot(TypedDict, total=False):
modem_preset: str
class TraceEvent(TypedDict, total=False):
class _TraceEventRequired(TypedDict):
hops: list[int]
rx_time: int
rx_iso: str
class TraceEvent(_TraceEventRequired, total=False):
id: int | None
request_id: int | None
src: int | None
dest: int | None
rx_time: int
rx_iso: str
hops: list[int]
rssi: int | None
snr: float | None
elapsed_ms: int | None

View File

@@ -26,7 +26,7 @@ from __future__ import annotations
from typing import Final
_CANONICAL_PREFIX: Final[str] = "!"
CANONICAL_PREFIX: Final[str] = "!"
def canonical_node_id(value: object) -> str | None:
@@ -48,7 +48,7 @@ def canonical_node_id(value: object) -> str | None:
return None
if num < 0:
return None
return f"{_CANONICAL_PREFIX}{num & 0xFFFFFFFF:08x}"
return f"{CANONICAL_PREFIX}{num & 0xFFFFFFFF:08x}"
if not isinstance(value, str):
return None
@@ -59,13 +59,13 @@ def canonical_node_id(value: object) -> str | None:
# Meshtastic special destinations like "^all" are not node ids; callers
# that already accept them should keep passing them through unchanged.
return trimmed
if trimmed.startswith(_CANONICAL_PREFIX):
if trimmed.startswith(CANONICAL_PREFIX):
body = trimmed[1:]
elif trimmed.lower().startswith("0x"):
body = trimmed[2:]
elif trimmed.isdigit():
try:
return f"{_CANONICAL_PREFIX}{int(trimmed, 10) & 0xFFFFFFFF:08x}"
return f"{CANONICAL_PREFIX}{int(trimmed, 10) & 0xFFFFFFFF:08x}"
except ValueError:
return None
else:
@@ -74,7 +74,7 @@ def canonical_node_id(value: object) -> str | None:
if not body:
return None
try:
return f"{_CANONICAL_PREFIX}{int(body, 16) & 0xFFFFFFFF:08x}"
return f"{CANONICAL_PREFIX}{int(body, 16) & 0xFFFFFFFF:08x}"
except ValueError:
return None
@@ -96,7 +96,7 @@ def node_num_from_id(node_id: object) -> int | None:
trimmed = node_id.strip()
if not trimmed:
return None
if trimmed.startswith(_CANONICAL_PREFIX):
if trimmed.startswith(CANONICAL_PREFIX):
trimmed = trimmed[1:]
if trimmed.lower().startswith("0x"):
trimmed = trimmed[2:]
@@ -110,6 +110,7 @@ def node_num_from_id(node_id: object) -> int | None:
__all__ = [
"CANONICAL_PREFIX",
"canonical_node_id",
"node_num_from_id",
]

View File

@@ -25,6 +25,7 @@ import enum
from collections.abc import Iterable
from typing import Protocol
class ProviderCapability(enum.Flag):
"""Feature flags describing what a provider can supply."""
@@ -37,7 +38,6 @@ class Provider(Protocol):
"""Abstract source of mesh observations."""
name: str
capabilities: ProviderCapability
def subscribe(self) -> list[str]:
"""Subscribe to any async receive callbacks and return topic names."""

View File

@@ -16,17 +16,16 @@
from __future__ import annotations
import time
from collections.abc import Iterable
from .. import interfaces
from ..provider import ProviderCapability
from .. import config, daemon as _daemon, interfaces
class MeshtasticProvider:
"""Meshtastic ingestion provider (current default)."""
name = "meshtastic"
capabilities = ProviderCapability.NODE_SNAPSHOT | ProviderCapability.HEARTBEATS
def __init__(self):
self._subscribed: list[str] = []
@@ -37,12 +36,7 @@ class MeshtasticProvider:
if self._subscribed:
return list(self._subscribed)
# Delegate to the historical subscription helper in `daemon.py` so unit
# tests can monkeypatch the subscription mechanism via `daemon.pub`.
from .. import daemon as _daemon # local import avoids module cycles
topics = _daemon._subscribe_receive_topics()
self._subscribed = topics
return list(topics)
@@ -69,14 +63,19 @@ class MeshtasticProvider:
def extract_host_node_id(self, iface: object) -> str | None:
return interfaces._extract_host_node_id(iface)
def node_snapshot_items(self, iface: object) -> Iterable[tuple[str, object]]:
def node_snapshot_items(self, iface: object) -> list[tuple[str, object]]:
nodes = getattr(iface, "nodes", {}) or {}
items_callable = getattr(nodes, "items", None)
if callable(items_callable):
return list(items_callable())
if hasattr(nodes, "__iter__") and hasattr(nodes, "__getitem__"):
keys = list(nodes)
return [(key, nodes[key]) for key in keys]
for _ in range(3):
try:
return list(nodes.items())
except RuntimeError as err:
if "dictionary changed size during iteration" not in str(err):
raise
time.sleep(0)
config._debug_log(
"Skipping node snapshot due to concurrent modification",
context="meshtastic.snapshot",
)
return []

167
tests/test_events_unit.py Normal file
View File

@@ -0,0 +1,167 @@
# Copyright © 2025-26 l5yth & contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for :mod:`data.mesh_ingestor.events`."""
from __future__ import annotations
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from data.mesh_ingestor.events import ( # noqa: E402 - path setup
IngestorHeartbeat,
MessageEvent,
NeighborEntry,
NeighborsSnapshot,
PositionEvent,
TelemetryEvent,
TraceEvent,
)
def test_message_event_requires_id_rx_time_rx_iso():
event: MessageEvent = {"id": 1, "rx_time": 1700000000, "rx_iso": "2023-11-14T00:00:00Z"}
assert event["id"] == 1
assert event["rx_time"] == 1700000000
assert event["rx_iso"] == "2023-11-14T00:00:00Z"
def test_message_event_accepts_optional_fields():
event: MessageEvent = {
"id": 2,
"rx_time": 1700000001,
"rx_iso": "2023-11-14T00:00:01Z",
"text": "hello",
"from_id": "!aabbccdd",
"snr": 4.5,
"rssi": -90,
}
assert event["text"] == "hello"
assert event["snr"] == 4.5
def test_position_event_required_fields():
event: PositionEvent = {"id": 10, "rx_time": 1700000002, "rx_iso": "2023-11-14T00:00:02Z"}
assert event["id"] == 10
def test_position_event_optional_fields():
event: PositionEvent = {
"id": 11,
"rx_time": 1700000003,
"rx_iso": "2023-11-14T00:00:03Z",
"latitude": 37.7749,
"longitude": -122.4194,
"altitude": 10.0,
"node_id": "!aabbccdd",
}
assert event["latitude"] == 37.7749
def test_telemetry_event_required_fields():
event: TelemetryEvent = {"id": 20, "rx_time": 1700000004, "rx_iso": "2023-11-14T00:00:04Z"}
assert event["id"] == 20
def test_telemetry_event_optional_fields():
event: TelemetryEvent = {
"id": 21,
"rx_time": 1700000005,
"rx_iso": "2023-11-14T00:00:05Z",
"channel": 0,
"payload_b64": "AAEC",
"snr": 3.0,
}
assert event["payload_b64"] == "AAEC"
def test_neighbor_entry_required_fields():
entry: NeighborEntry = {"rx_time": 1700000006, "rx_iso": "2023-11-14T00:00:06Z"}
assert entry["rx_time"] == 1700000006
def test_neighbor_entry_optional_fields():
entry: NeighborEntry = {
"rx_time": 1700000007,
"rx_iso": "2023-11-14T00:00:07Z",
"neighbor_id": "!11223344",
"snr": 6.0,
}
assert entry["neighbor_id"] == "!11223344"
def test_neighbors_snapshot_required_fields():
snap: NeighborsSnapshot = {
"node_id": "!aabbccdd",
"rx_time": 1700000008,
"rx_iso": "2023-11-14T00:00:08Z",
}
assert snap["node_id"] == "!aabbccdd"
def test_neighbors_snapshot_optional_fields():
snap: NeighborsSnapshot = {
"node_id": "!aabbccdd",
"rx_time": 1700000009,
"rx_iso": "2023-11-14T00:00:09Z",
"neighbors": [],
"node_broadcast_interval_secs": 900,
}
assert snap["node_broadcast_interval_secs"] == 900
def test_trace_event_required_fields():
event: TraceEvent = {
"hops": [1, 2, 3],
"rx_time": 1700000010,
"rx_iso": "2023-11-14T00:00:10Z",
}
assert event["hops"] == [1, 2, 3]
def test_trace_event_optional_fields():
event: TraceEvent = {
"hops": [4, 5],
"rx_time": 1700000011,
"rx_iso": "2023-11-14T00:00:11Z",
"elapsed_ms": 42,
"snr": 2.5,
}
assert event["elapsed_ms"] == 42
def test_ingestor_heartbeat_all_fields():
hb: IngestorHeartbeat = {
"node_id": "!aabbccdd",
"start_time": 1700000000,
"last_seen_time": 1700000012,
"version": "0.5.11",
"lora_freq": 906875,
"modem_preset": "LONG_FAST",
}
assert hb["version"] == "0.5.11"
assert hb["lora_freq"] == 906875
def test_ingestor_heartbeat_without_optional_fields():
hb: IngestorHeartbeat = {
"node_id": "!aabbccdd",
"start_time": 1700000000,
"last_seen_time": 1700000013,
"version": "0.5.11",
}
assert "lora_freq" not in hb

View File

@@ -52,3 +52,25 @@ def test_node_num_from_id_parses_canonical_and_hex():
assert node_num_from_id(123) == 123
def test_canonical_node_id_rejects_none_and_empty():
assert canonical_node_id(None) is None
assert canonical_node_id("") is None
assert canonical_node_id(" ") is None
def test_canonical_node_id_rejects_negative():
assert canonical_node_id(-1) is None
assert canonical_node_id(-0xABCDEF01) is None
def test_canonical_node_id_truncates_overflow():
# Values wider than 32 bits are masked, not rejected.
assert canonical_node_id(0x1_ABCDEF01) == "!abcdef01"
def test_node_num_from_id_rejects_none_and_empty():
assert node_num_from_id(None) is None
assert node_num_from_id("") is None
assert node_num_from_id("not-hex") is None

View File

@@ -26,11 +26,19 @@ if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
from data.mesh_ingestor import daemon # noqa: E402 - path setup
from data.mesh_ingestor.provider import Provider # noqa: E402 - path setup
from data.mesh_ingestor.providers.meshtastic import ( # noqa: E402 - path setup
MeshtasticProvider,
)
def test_meshtastic_provider_satisfies_protocol():
"""MeshtasticProvider must structurally satisfy the Provider Protocol."""
required = {"name", "subscribe", "connect", "extract_host_node_id", "node_snapshot_items"}
missing = required - set(dir(MeshtasticProvider))
assert not missing, f"MeshtasticProvider is missing Protocol members: {missing}"
def test_daemon_main_uses_provider_connect(monkeypatch):
calls = {"connect": 0}
@@ -99,3 +107,43 @@ def test_daemon_main_uses_provider_connect(monkeypatch):
daemon.main(provider=FakeProvider())
assert calls["connect"] >= 1
def test_node_snapshot_items_retries_on_concurrent_mutation(monkeypatch):
"""node_snapshot_items must retry on dict-mutation RuntimeError, not raise."""
from data.mesh_ingestor.providers.meshtastic import MeshtasticProvider
attempt = {"n": 0}
class MutatingNodes:
def items(self):
attempt["n"] += 1
if attempt["n"] < 3:
raise RuntimeError("dictionary changed size during iteration")
return [("!aabbccdd", {"num": 1})]
class FakeIface:
nodes = MutatingNodes()
monkeypatch.setattr("time.sleep", lambda _: None)
result = MeshtasticProvider().node_snapshot_items(FakeIface())
assert result == [("!aabbccdd", {"num": 1})]
assert attempt["n"] == 3
def test_node_snapshot_items_returns_empty_after_retry_exhaustion(monkeypatch):
"""node_snapshot_items returns [] (non-fatal) when all retries fail."""
from data.mesh_ingestor.providers.meshtastic import MeshtasticProvider
import data.mesh_ingestor.providers.meshtastic as _mod
class AlwaysMutating:
def items(self):
raise RuntimeError("dictionary changed size during iteration")
class FakeIface:
nodes = AlwaysMutating()
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None)
result = MeshtasticProvider().node_snapshot_items(FakeIface())
assert result == []