mirror of
https://github.com/l5yth/potato-mesh.git
synced 2026-03-28 17:42:48 +01:00
data: resolve circular dependency of deamon.py
This commit is contained in:
@@ -25,7 +25,6 @@ from pubsub import pub
|
||||
|
||||
from . import config, handlers, ingestors, interfaces
|
||||
from .provider import Provider
|
||||
from .providers.meshtastic import MeshtasticProvider
|
||||
|
||||
_RECEIVE_TOPICS = (
|
||||
"meshtastic.receive",
|
||||
@@ -199,11 +198,6 @@ def _process_ingestor_heartbeat(iface, *, ingestor_announcement_sent: bool) -> b
|
||||
if heartbeat_sent and not ingestor_announcement_sent:
|
||||
return True
|
||||
return ingestor_announcement_sent
|
||||
iface_cls = getattr(iface_obj, "__class__", None)
|
||||
if iface_cls is None:
|
||||
return False
|
||||
module_name = getattr(iface_cls, "__module__", "") or ""
|
||||
return "ble_interface" in module_name
|
||||
|
||||
|
||||
def _connected_state(candidate) -> bool | None:
|
||||
@@ -245,10 +239,12 @@ def _connected_state(candidate) -> bool | None:
|
||||
return None
|
||||
|
||||
|
||||
def main(existing_interface=None, *, provider: Provider | None = None) -> None:
|
||||
def main(*, provider: Provider | None = None) -> None:
|
||||
"""Run the mesh ingestion daemon until interrupted."""
|
||||
|
||||
provider = provider or MeshtasticProvider()
|
||||
if provider is None:
|
||||
from .providers.meshtastic import MeshtasticProvider
|
||||
provider = MeshtasticProvider()
|
||||
|
||||
subscribed = provider.subscribe()
|
||||
if subscribed:
|
||||
@@ -259,7 +255,7 @@ def main(existing_interface=None, *, provider: Provider | None = None) -> None:
|
||||
topics=subscribed,
|
||||
)
|
||||
|
||||
iface = existing_interface
|
||||
iface = None
|
||||
resolved_target = None
|
||||
retry_delay = max(0.0, config._RECONNECT_INITIAL_DELAY_SECS)
|
||||
|
||||
@@ -413,8 +409,8 @@ def main(existing_interface=None, *, provider: Provider | None = None) -> None:
|
||||
|
||||
if not initial_snapshot_sent:
|
||||
try:
|
||||
node_items = list(provider.node_snapshot_items(iface))
|
||||
node_items = _node_items_snapshot(dict(node_items))
|
||||
raw_snapshot = list(provider.node_snapshot_items(iface))
|
||||
node_items = _node_items_snapshot(dict(raw_snapshot))
|
||||
if node_items is None:
|
||||
config._debug_log(
|
||||
"Skipping node snapshot due to concurrent modification",
|
||||
|
||||
@@ -27,10 +27,13 @@ from __future__ import annotations
|
||||
from typing import NotRequired, TypedDict
|
||||
|
||||
|
||||
class MessageEvent(TypedDict, total=False):
|
||||
class _MessageEventRequired(TypedDict):
|
||||
id: int
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
|
||||
|
||||
class MessageEvent(_MessageEventRequired, total=False):
|
||||
from_id: object
|
||||
to_id: object
|
||||
channel: int
|
||||
@@ -48,15 +51,18 @@ class MessageEvent(TypedDict, total=False):
|
||||
modem_preset: str
|
||||
|
||||
|
||||
class PositionEvent(TypedDict, total=False):
|
||||
class _PositionEventRequired(TypedDict):
|
||||
id: int
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
|
||||
|
||||
class PositionEvent(_PositionEventRequired, total=False):
|
||||
node_id: str
|
||||
node_num: int | None
|
||||
num: int | None
|
||||
from_id: str | None
|
||||
to_id: object
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
latitude: float | None
|
||||
longitude: float | None
|
||||
altitude: float | None
|
||||
@@ -78,14 +84,17 @@ class PositionEvent(TypedDict, total=False):
|
||||
modem_preset: str
|
||||
|
||||
|
||||
class TelemetryEvent(TypedDict, total=False):
|
||||
class _TelemetryEventRequired(TypedDict):
|
||||
id: int
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
|
||||
|
||||
class TelemetryEvent(_TelemetryEventRequired, total=False):
|
||||
node_id: str | None
|
||||
node_num: int | None
|
||||
from_id: object
|
||||
to_id: object
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
telemetry_time: int | None
|
||||
channel: int
|
||||
portnum: str | None
|
||||
@@ -102,20 +111,26 @@ class TelemetryEvent(TypedDict, total=False):
|
||||
# evolves over time.
|
||||
|
||||
|
||||
class NeighborEntry(TypedDict, total=False):
|
||||
class _NeighborEntryRequired(TypedDict):
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
|
||||
|
||||
class NeighborEntry(_NeighborEntryRequired, total=False):
|
||||
neighbor_id: str
|
||||
neighbor_num: int | None
|
||||
snr: float | None
|
||||
|
||||
|
||||
class _NeighborsSnapshotRequired(TypedDict):
|
||||
node_id: str
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
|
||||
|
||||
class NeighborsSnapshot(TypedDict, total=False):
|
||||
node_id: str
|
||||
class NeighborsSnapshot(_NeighborsSnapshotRequired, total=False):
|
||||
node_num: int | None
|
||||
neighbors: list[NeighborEntry]
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
node_broadcast_interval_secs: int | None
|
||||
last_sent_by_id: str | None
|
||||
ingestor: str | None
|
||||
@@ -123,14 +138,17 @@ class NeighborsSnapshot(TypedDict, total=False):
|
||||
modem_preset: str
|
||||
|
||||
|
||||
class TraceEvent(TypedDict, total=False):
|
||||
class _TraceEventRequired(TypedDict):
|
||||
hops: list[int]
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
|
||||
|
||||
class TraceEvent(_TraceEventRequired, total=False):
|
||||
id: int | None
|
||||
request_id: int | None
|
||||
src: int | None
|
||||
dest: int | None
|
||||
rx_time: int
|
||||
rx_iso: str
|
||||
hops: list[int]
|
||||
rssi: int | None
|
||||
snr: float | None
|
||||
elapsed_ms: int | None
|
||||
|
||||
@@ -26,7 +26,7 @@ from __future__ import annotations
|
||||
from typing import Final
|
||||
|
||||
|
||||
_CANONICAL_PREFIX: Final[str] = "!"
|
||||
CANONICAL_PREFIX: Final[str] = "!"
|
||||
|
||||
|
||||
def canonical_node_id(value: object) -> str | None:
|
||||
@@ -48,7 +48,7 @@ def canonical_node_id(value: object) -> str | None:
|
||||
return None
|
||||
if num < 0:
|
||||
return None
|
||||
return f"{_CANONICAL_PREFIX}{num & 0xFFFFFFFF:08x}"
|
||||
return f"{CANONICAL_PREFIX}{num & 0xFFFFFFFF:08x}"
|
||||
if not isinstance(value, str):
|
||||
return None
|
||||
|
||||
@@ -59,13 +59,13 @@ def canonical_node_id(value: object) -> str | None:
|
||||
# Meshtastic special destinations like "^all" are not node ids; callers
|
||||
# that already accept them should keep passing them through unchanged.
|
||||
return trimmed
|
||||
if trimmed.startswith(_CANONICAL_PREFIX):
|
||||
if trimmed.startswith(CANONICAL_PREFIX):
|
||||
body = trimmed[1:]
|
||||
elif trimmed.lower().startswith("0x"):
|
||||
body = trimmed[2:]
|
||||
elif trimmed.isdigit():
|
||||
try:
|
||||
return f"{_CANONICAL_PREFIX}{int(trimmed, 10) & 0xFFFFFFFF:08x}"
|
||||
return f"{CANONICAL_PREFIX}{int(trimmed, 10) & 0xFFFFFFFF:08x}"
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
@@ -74,7 +74,7 @@ def canonical_node_id(value: object) -> str | None:
|
||||
if not body:
|
||||
return None
|
||||
try:
|
||||
return f"{_CANONICAL_PREFIX}{int(body, 16) & 0xFFFFFFFF:08x}"
|
||||
return f"{CANONICAL_PREFIX}{int(body, 16) & 0xFFFFFFFF:08x}"
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
@@ -96,7 +96,7 @@ def node_num_from_id(node_id: object) -> int | None:
|
||||
trimmed = node_id.strip()
|
||||
if not trimmed:
|
||||
return None
|
||||
if trimmed.startswith(_CANONICAL_PREFIX):
|
||||
if trimmed.startswith(CANONICAL_PREFIX):
|
||||
trimmed = trimmed[1:]
|
||||
if trimmed.lower().startswith("0x"):
|
||||
trimmed = trimmed[2:]
|
||||
@@ -110,6 +110,7 @@ def node_num_from_id(node_id: object) -> int | None:
|
||||
|
||||
|
||||
__all__ = [
|
||||
"CANONICAL_PREFIX",
|
||||
"canonical_node_id",
|
||||
"node_num_from_id",
|
||||
]
|
||||
|
||||
@@ -25,6 +25,7 @@ import enum
|
||||
from collections.abc import Iterable
|
||||
from typing import Protocol
|
||||
|
||||
|
||||
class ProviderCapability(enum.Flag):
|
||||
"""Feature flags describing what a provider can supply."""
|
||||
|
||||
@@ -37,7 +38,6 @@ class Provider(Protocol):
|
||||
"""Abstract source of mesh observations."""
|
||||
|
||||
name: str
|
||||
capabilities: ProviderCapability
|
||||
|
||||
def subscribe(self) -> list[str]:
|
||||
"""Subscribe to any async receive callbacks and return topic names."""
|
||||
|
||||
@@ -18,15 +18,13 @@ from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
|
||||
from .. import interfaces
|
||||
from ..provider import ProviderCapability
|
||||
from .. import daemon as _daemon, interfaces
|
||||
|
||||
|
||||
class MeshtasticProvider:
|
||||
"""Meshtastic ingestion provider (current default)."""
|
||||
|
||||
name = "meshtastic"
|
||||
capabilities = ProviderCapability.NODE_SNAPSHOT | ProviderCapability.HEARTBEATS
|
||||
|
||||
def __init__(self):
|
||||
self._subscribed: list[str] = []
|
||||
@@ -37,12 +35,7 @@ class MeshtasticProvider:
|
||||
if self._subscribed:
|
||||
return list(self._subscribed)
|
||||
|
||||
# Delegate to the historical subscription helper in `daemon.py` so unit
|
||||
# tests can monkeypatch the subscription mechanism via `daemon.pub`.
|
||||
from .. import daemon as _daemon # local import avoids module cycles
|
||||
|
||||
topics = _daemon._subscribe_receive_topics()
|
||||
|
||||
self._subscribed = topics
|
||||
return list(topics)
|
||||
|
||||
@@ -71,13 +64,7 @@ class MeshtasticProvider:
|
||||
|
||||
def node_snapshot_items(self, iface: object) -> Iterable[tuple[str, object]]:
|
||||
nodes = getattr(iface, "nodes", {}) or {}
|
||||
items_callable = getattr(nodes, "items", None)
|
||||
if callable(items_callable):
|
||||
return list(items_callable())
|
||||
if hasattr(nodes, "__iter__") and hasattr(nodes, "__getitem__"):
|
||||
keys = list(nodes)
|
||||
return [(key, nodes[key]) for key in keys]
|
||||
return []
|
||||
return list(nodes.items())
|
||||
|
||||
|
||||
__all__ = ["MeshtasticProvider"]
|
||||
|
||||
@@ -52,3 +52,25 @@ def test_node_num_from_id_parses_canonical_and_hex():
|
||||
assert node_num_from_id(123) == 123
|
||||
|
||||
|
||||
def test_canonical_node_id_rejects_none_and_empty():
|
||||
assert canonical_node_id(None) is None
|
||||
assert canonical_node_id("") is None
|
||||
assert canonical_node_id(" ") is None
|
||||
|
||||
|
||||
def test_canonical_node_id_rejects_negative():
|
||||
assert canonical_node_id(-1) is None
|
||||
assert canonical_node_id(-0xABCDEF01) is None
|
||||
|
||||
|
||||
def test_canonical_node_id_truncates_overflow():
|
||||
# Values wider than 32 bits are masked, not rejected.
|
||||
assert canonical_node_id(0x1_ABCDEF01) == "!abcdef01"
|
||||
|
||||
|
||||
def test_node_num_from_id_rejects_none_and_empty():
|
||||
assert node_num_from_id(None) is None
|
||||
assert node_num_from_id("") is None
|
||||
assert node_num_from_id("not-hex") is None
|
||||
|
||||
|
||||
|
||||
@@ -26,11 +26,19 @@ if str(REPO_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
from data.mesh_ingestor import daemon # noqa: E402 - path setup
|
||||
from data.mesh_ingestor.provider import Provider # noqa: E402 - path setup
|
||||
from data.mesh_ingestor.providers.meshtastic import ( # noqa: E402 - path setup
|
||||
MeshtasticProvider,
|
||||
)
|
||||
|
||||
|
||||
def test_meshtastic_provider_satisfies_protocol():
|
||||
"""MeshtasticProvider must structurally satisfy the Provider Protocol."""
|
||||
required = {"name", "subscribe", "connect", "extract_host_node_id", "node_snapshot_items"}
|
||||
missing = required - set(dir(MeshtasticProvider))
|
||||
assert not missing, f"MeshtasticProvider is missing Protocol members: {missing}"
|
||||
|
||||
|
||||
def test_daemon_main_uses_provider_connect(monkeypatch):
|
||||
calls = {"connect": 0}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user