mirror of
https://github.com/l5yth/potato-mesh.git
synced 2026-03-28 17:42:48 +01:00
* ingestor: report self id per packet * ingestor: address review comments * ingestor: address review comments * ingestor: address review comments * ingestor: address review comments
3472 lines
106 KiB
Python
3472 lines
106 KiB
Python
# Copyright © 2025-26 l5yth & contributors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import base64
|
|
import enum
|
|
import importlib
|
|
import json
|
|
import re
|
|
import sys
|
|
import threading
|
|
import types
|
|
import time
|
|
|
|
"""End-to-end tests covering the mesh ingestion package."""
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
from meshtastic_protobuf_stub import build as build_protobuf_stub
|
|
|
|
import pytest
|
|
|
|
|
|
@pytest.fixture
|
|
def mesh_module(monkeypatch):
|
|
"""Import :mod:`data.mesh` with stubbed dependencies."""
|
|
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
monkeypatch.syspath_prepend(str(repo_root))
|
|
|
|
try:
|
|
import meshtastic as real_meshtastic # type: ignore
|
|
except Exception: # pragma: no cover - dependency may be unavailable in CI
|
|
real_meshtastic = None
|
|
|
|
real_protobuf = (
|
|
getattr(real_meshtastic, "protobuf", None) if real_meshtastic else None
|
|
)
|
|
|
|
# Prefer real google.protobuf modules when available, otherwise provide stubs
|
|
try:
|
|
from google.protobuf import json_format as json_format_mod # type: ignore
|
|
from google.protobuf import message as message_mod # type: ignore
|
|
except Exception: # pragma: no cover - protobuf may be missing in CI
|
|
json_format_mod = types.ModuleType("google.protobuf.json_format")
|
|
|
|
def message_to_dict(obj, *_, **__):
|
|
if hasattr(obj, "to_dict"):
|
|
return obj.to_dict()
|
|
if hasattr(obj, "__dict__"):
|
|
return dict(obj.__dict__)
|
|
return {}
|
|
|
|
json_format_mod.MessageToDict = message_to_dict
|
|
|
|
message_mod = types.ModuleType("google.protobuf.message")
|
|
|
|
class DummyProtoMessage:
|
|
pass
|
|
|
|
class DummyDecodeError(Exception):
|
|
pass
|
|
|
|
message_mod.Message = DummyProtoMessage
|
|
message_mod.DecodeError = DummyDecodeError
|
|
|
|
protobuf_mod = types.ModuleType("google.protobuf")
|
|
protobuf_mod.json_format = json_format_mod
|
|
protobuf_mod.message = message_mod
|
|
|
|
google_mod = types.ModuleType("google")
|
|
google_mod.protobuf = protobuf_mod
|
|
|
|
monkeypatch.setitem(sys.modules, "google", google_mod)
|
|
monkeypatch.setitem(sys.modules, "google.protobuf", protobuf_mod)
|
|
monkeypatch.setitem(sys.modules, "google.protobuf.json_format", json_format_mod)
|
|
monkeypatch.setitem(sys.modules, "google.protobuf.message", message_mod)
|
|
else:
|
|
monkeypatch.setitem(sys.modules, "google.protobuf.json_format", json_format_mod)
|
|
monkeypatch.setitem(sys.modules, "google.protobuf.message", message_mod)
|
|
|
|
message_module = sys.modules.get("google.protobuf.message", message_mod)
|
|
|
|
# Stub meshtastic.serial_interface.SerialInterface
|
|
serial_interface_mod = types.ModuleType("meshtastic.serial_interface")
|
|
|
|
class DummySerialInterface:
|
|
def __init__(self, *_, **__):
|
|
self.closed = False
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
serial_interface_mod.SerialInterface = DummySerialInterface
|
|
|
|
tcp_interface_mod = types.ModuleType("meshtastic.tcp_interface")
|
|
|
|
class DummyTCPInterface:
|
|
def __init__(self, *_, **__):
|
|
self.closed = False
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
tcp_interface_mod.TCPInterface = DummyTCPInterface
|
|
|
|
ble_interface_mod = types.ModuleType("meshtastic.ble_interface")
|
|
|
|
class DummyBLEInterface:
|
|
def __init__(self, *_, **__):
|
|
self.closed = False
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
ble_interface_mod.BLEInterface = DummyBLEInterface
|
|
|
|
meshtastic_mod = types.ModuleType("meshtastic")
|
|
meshtastic_mod.serial_interface = serial_interface_mod
|
|
meshtastic_mod.tcp_interface = tcp_interface_mod
|
|
meshtastic_mod.ble_interface = ble_interface_mod
|
|
|
|
mesh_interface_mod = types.ModuleType("meshtastic.mesh_interface")
|
|
|
|
def _default_nodeinfo_callback(iface, packet):
|
|
iface.nodes[packet["id"]] = packet
|
|
return packet["id"]
|
|
|
|
class DummyNodeInfoHandler:
|
|
"""Stub that mimics Meshtastic's NodeInfo handler semantics."""
|
|
|
|
def __init__(self):
|
|
self.callback = getattr(
|
|
meshtastic_mod, "_onNodeInfoReceive", _default_nodeinfo_callback
|
|
)
|
|
|
|
def onReceive(self, iface, packet):
|
|
nodes = getattr(iface, "nodes", None)
|
|
if isinstance(nodes, dict):
|
|
nodes[packet["id"]] = packet
|
|
return self.callback(iface, packet)
|
|
|
|
mesh_interface_mod.NodeInfoHandler = DummyNodeInfoHandler
|
|
meshtastic_mod.mesh_interface = mesh_interface_mod
|
|
monkeypatch.setitem(sys.modules, "meshtastic.mesh_interface", mesh_interface_mod)
|
|
|
|
meshtastic_mod._onNodeInfoReceive = _default_nodeinfo_callback
|
|
if real_protobuf is not None:
|
|
meshtastic_mod.protobuf = real_protobuf
|
|
else:
|
|
serialization_mod = sys.modules.get("data.mesh_ingestor.serialization")
|
|
proto_base = getattr(serialization_mod, "ProtoMessage", message_module.Message)
|
|
decode_error = getattr(message_module, "DecodeError", Exception)
|
|
config_pb2_mod, mesh_pb2_mod = build_protobuf_stub(
|
|
proto_base,
|
|
decode_error,
|
|
)
|
|
protobuf_pkg = types.ModuleType("meshtastic.protobuf")
|
|
protobuf_pkg.config_pb2 = config_pb2_mod
|
|
protobuf_pkg.mesh_pb2 = mesh_pb2_mod
|
|
meshtastic_mod.protobuf = protobuf_pkg
|
|
monkeypatch.setitem(sys.modules, "meshtastic.protobuf", protobuf_pkg)
|
|
monkeypatch.setitem(
|
|
sys.modules, "meshtastic.protobuf.config_pb2", config_pb2_mod
|
|
)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.protobuf.mesh_pb2", mesh_pb2_mod)
|
|
|
|
monkeypatch.setitem(sys.modules, "meshtastic", meshtastic_mod)
|
|
monkeypatch.setitem(
|
|
sys.modules, "meshtastic.serial_interface", serial_interface_mod
|
|
)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.tcp_interface", tcp_interface_mod)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.ble_interface", ble_interface_mod)
|
|
if real_protobuf is not None:
|
|
monkeypatch.setitem(sys.modules, "meshtastic.protobuf", real_protobuf)
|
|
|
|
# Stub pubsub.pub
|
|
pubsub_mod = types.ModuleType("pubsub")
|
|
|
|
class DummyPub:
|
|
def __init__(self):
|
|
self.subscriptions = []
|
|
|
|
def subscribe(self, *args, **kwargs):
|
|
self.subscriptions.append((args, kwargs))
|
|
|
|
pubsub_mod.pub = DummyPub()
|
|
monkeypatch.setitem(sys.modules, "pubsub", pubsub_mod)
|
|
|
|
module_name = "data.mesh_ingestor"
|
|
if module_name in sys.modules:
|
|
module = importlib.reload(sys.modules[module_name])
|
|
else:
|
|
module = importlib.import_module(module_name)
|
|
|
|
if hasattr(module, "_clear_post_queue"):
|
|
module._clear_post_queue()
|
|
|
|
# Ensure radio metadata starts unset for each test run.
|
|
module.config.LORA_FREQ = None
|
|
module.config.MODEM_PRESET = None
|
|
for attr in ("LORA_FREQ", "MODEM_PRESET"):
|
|
if attr in module.__dict__:
|
|
delattr(module, attr)
|
|
module.channels._reset_channel_cache()
|
|
module.ingestors.STATE.start_time = int(time.time())
|
|
module.ingestors.STATE.last_heartbeat = None
|
|
module.ingestors.STATE.node_id = None
|
|
|
|
yield module
|
|
|
|
# Ensure a clean import for the next test
|
|
if hasattr(module, "_clear_post_queue"):
|
|
module._clear_post_queue()
|
|
sys.modules.pop(module_name, None)
|
|
|
|
|
|
def test_instance_domain_prefers_primary_env(mesh_module, monkeypatch):
|
|
"""Ensure the ingestor prefers ``INSTANCE_DOMAIN`` over the legacy variable."""
|
|
|
|
monkeypatch.setenv("INSTANCE_DOMAIN", "https://new.example")
|
|
monkeypatch.setenv("POTATOMESH_INSTANCE", "https://legacy.example")
|
|
|
|
try:
|
|
refreshed_instance = mesh_module.config._resolve_instance_domain()
|
|
mesh_module.config.INSTANCE = refreshed_instance
|
|
mesh_module.INSTANCE = refreshed_instance
|
|
|
|
assert refreshed_instance == "https://new.example"
|
|
assert mesh_module.INSTANCE == "https://new.example"
|
|
finally:
|
|
monkeypatch.delenv("INSTANCE_DOMAIN", raising=False)
|
|
monkeypatch.delenv("POTATOMESH_INSTANCE", raising=False)
|
|
mesh_module.config.INSTANCE = mesh_module.config._resolve_instance_domain()
|
|
mesh_module.INSTANCE = mesh_module.config.INSTANCE
|
|
|
|
|
|
def test_instance_domain_falls_back_to_legacy(mesh_module, monkeypatch):
|
|
"""Verify ``POTATOMESH_INSTANCE`` is used when ``INSTANCE_DOMAIN`` is unset."""
|
|
|
|
monkeypatch.delenv("INSTANCE_DOMAIN", raising=False)
|
|
monkeypatch.setenv("POTATOMESH_INSTANCE", "https://legacy-only.example")
|
|
|
|
try:
|
|
refreshed_instance = mesh_module.config._resolve_instance_domain()
|
|
mesh_module.config.INSTANCE = refreshed_instance
|
|
mesh_module.INSTANCE = refreshed_instance
|
|
|
|
assert refreshed_instance == "https://legacy-only.example"
|
|
assert mesh_module.INSTANCE == "https://legacy-only.example"
|
|
finally:
|
|
monkeypatch.delenv("POTATOMESH_INSTANCE", raising=False)
|
|
mesh_module.config.INSTANCE = mesh_module.config._resolve_instance_domain()
|
|
mesh_module.INSTANCE = mesh_module.config.INSTANCE
|
|
|
|
|
|
def test_instance_domain_infers_scheme_for_hostnames(mesh_module, monkeypatch):
|
|
"""Ensure bare hostnames are promoted to HTTPS URLs for ingestion."""
|
|
|
|
monkeypatch.setenv("INSTANCE_DOMAIN", "mesh.example.org")
|
|
monkeypatch.delenv("POTATOMESH_INSTANCE", raising=False)
|
|
|
|
try:
|
|
refreshed_instance = mesh_module.config._resolve_instance_domain()
|
|
mesh_module.config.INSTANCE = refreshed_instance
|
|
mesh_module.INSTANCE = refreshed_instance
|
|
|
|
assert refreshed_instance == "https://mesh.example.org"
|
|
assert mesh_module.INSTANCE == "https://mesh.example.org"
|
|
finally:
|
|
monkeypatch.delenv("INSTANCE_DOMAIN", raising=False)
|
|
mesh_module.config.INSTANCE = mesh_module.config._resolve_instance_domain()
|
|
mesh_module.INSTANCE = mesh_module.config.INSTANCE
|
|
|
|
|
|
def test_parse_channel_names_applies_allowlist(mesh_module):
|
|
"""Ensure allowlists reuse the shared channel parser."""
|
|
|
|
mesh = mesh_module
|
|
previous_allowed = mesh.ALLOWED_CHANNELS
|
|
|
|
try:
|
|
parsed = mesh.config._parse_channel_names(" Primary ,Chat ,primary , Ops ")
|
|
mesh.ALLOWED_CHANNELS = parsed
|
|
|
|
assert parsed == ("Primary", "Chat", "Ops")
|
|
assert mesh.channels.allowed_channel_names() == ("Primary", "Chat", "Ops")
|
|
assert mesh.channels.is_allowed_channel("chat")
|
|
assert mesh.channels.is_allowed_channel(" ops ")
|
|
assert not mesh.channels.is_allowed_channel("unknown")
|
|
assert not mesh.channels.is_allowed_channel(None)
|
|
assert mesh.config._parse_channel_names("") == ()
|
|
finally:
|
|
mesh.ALLOWED_CHANNELS = previous_allowed
|
|
|
|
|
|
def test_allowed_channel_defaults_allow_all(mesh_module):
|
|
"""Ensure unset allowlists do not block any channels."""
|
|
|
|
mesh = mesh_module
|
|
previous_allowed = mesh.ALLOWED_CHANNELS
|
|
|
|
try:
|
|
mesh.ALLOWED_CHANNELS = ()
|
|
assert mesh.channels.is_allowed_channel("Any")
|
|
finally:
|
|
mesh.ALLOWED_CHANNELS = previous_allowed
|
|
|
|
|
|
def test_parse_hidden_channels_deduplicates_names(mesh_module):
|
|
"""Ensure hidden channel parsing strips blanks and deduplicates."""
|
|
|
|
mesh = mesh_module
|
|
previous_hidden = mesh.HIDDEN_CHANNELS
|
|
|
|
try:
|
|
parsed = mesh.config._parse_hidden_channels(" Chat , ,Secret ,chat")
|
|
mesh.HIDDEN_CHANNELS = parsed
|
|
|
|
assert parsed == ("Chat", "Secret")
|
|
assert mesh.channels.hidden_channel_names() == ("Chat", "Secret")
|
|
assert mesh.channels.is_hidden_channel(" chat ")
|
|
assert not mesh.channels.is_hidden_channel("unknown")
|
|
assert mesh.config._parse_hidden_channels("") == ()
|
|
finally:
|
|
mesh.HIDDEN_CHANNELS = previous_hidden
|
|
|
|
|
|
def test_subscribe_receive_topics_covers_all_handlers(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
daemon_mod = sys.modules["data.mesh_ingestor.daemon"]
|
|
|
|
recorded_calls: list[tuple[tuple[object, ...], dict[str, object]]] = []
|
|
|
|
class RecordingPub:
|
|
def subscribe(self, *args, **kwargs):
|
|
recorded_calls.append((args, kwargs))
|
|
|
|
monkeypatch.setattr(daemon_mod, "pub", RecordingPub())
|
|
|
|
subscribed_topics = mesh._subscribe_receive_topics()
|
|
|
|
expected_topics = list(mesh._RECEIVE_TOPICS)
|
|
assert subscribed_topics == expected_topics
|
|
assert len(recorded_calls) == len(expected_topics)
|
|
|
|
for (args, kwargs), topic in zip(recorded_calls, expected_topics):
|
|
assert not kwargs
|
|
assert args[0] is mesh.on_receive
|
|
assert args[1] == topic
|
|
|
|
|
|
def test_snapshot_interval_defaults_to_60_seconds(mesh_module):
|
|
mesh = mesh_module
|
|
assert mesh.SNAPSHOT_SECS == 60
|
|
|
|
|
|
def test_extract_host_node_id_prefers_my_info_fields(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class DummyInterface:
|
|
def __init__(self):
|
|
self.myInfo = {"my_node_num": 0x9E95CF60}
|
|
|
|
iface = DummyInterface()
|
|
|
|
assert mesh._extract_host_node_id(iface) == "!9e95cf60"
|
|
|
|
|
|
def test_extract_host_node_id_from_nested_info(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class DummyInterface:
|
|
def __init__(self):
|
|
self.myInfo = {"info": {"id": "!cafebabe"}}
|
|
|
|
iface = DummyInterface()
|
|
|
|
assert mesh._extract_host_node_id(iface) == "!cafebabe"
|
|
|
|
|
|
def test_extract_host_node_id_from_callable(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class CallableNoDict:
|
|
__slots__ = ()
|
|
|
|
def __call__(self):
|
|
return {"id": "!f00ba4"}
|
|
|
|
class DummyInterface:
|
|
def __init__(self):
|
|
self.localNode = CallableNoDict()
|
|
|
|
iface = DummyInterface()
|
|
|
|
assert mesh._extract_host_node_id(iface) == "!00f00ba4"
|
|
|
|
|
|
def test_extract_host_node_id_from_my_node_num_attribute(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class DummyInterface:
|
|
def __init__(self):
|
|
self.myNodeNum = 0xDEADBEEF
|
|
|
|
iface = DummyInterface()
|
|
|
|
assert mesh._extract_host_node_id(iface) == "!deadbeef"
|
|
|
|
|
|
@pytest.mark.parametrize("value", ["mock", "Mock", " disabled "])
|
|
def test_create_serial_interface_allows_mock(mesh_module, value):
|
|
mesh = mesh_module
|
|
|
|
iface, resolved = mesh._create_serial_interface(value)
|
|
|
|
assert resolved == "mock"
|
|
assert isinstance(iface.nodes, dict)
|
|
iface.close()
|
|
|
|
|
|
def test_create_serial_interface_uses_serial_module(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
created = {}
|
|
sentinel = object()
|
|
|
|
def fake_interface(*, devPath):
|
|
created["devPath"] = devPath
|
|
return SimpleNamespace(nodes={"!foo": sentinel}, close=lambda: None)
|
|
|
|
monkeypatch.setattr(mesh, "SerialInterface", fake_interface)
|
|
|
|
iface, resolved = mesh._create_serial_interface("/dev/ttyTEST")
|
|
|
|
assert created["devPath"] == "/dev/ttyTEST"
|
|
assert resolved == "/dev/ttyTEST"
|
|
assert iface.nodes == {"!foo": sentinel}
|
|
|
|
|
|
def test_create_serial_interface_uses_tcp_for_ip(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
created = {}
|
|
|
|
def fake_tcp_interface(*, hostname, portNumber, **_):
|
|
created["hostname"] = hostname
|
|
created["portNumber"] = portNumber
|
|
return SimpleNamespace(nodes={}, close=lambda: None)
|
|
|
|
monkeypatch.setattr(mesh, "TCPInterface", fake_tcp_interface)
|
|
|
|
iface, resolved = mesh._create_serial_interface("192.168.1.25:4500")
|
|
|
|
assert created == {"hostname": "192.168.1.25", "portNumber": 4500}
|
|
assert resolved == "tcp://192.168.1.25:4500"
|
|
assert iface.nodes == {}
|
|
|
|
|
|
def test_create_serial_interface_defaults_tcp_port(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
created = {}
|
|
|
|
def fake_tcp_interface(*, hostname, portNumber, **_):
|
|
created["hostname"] = hostname
|
|
created["portNumber"] = portNumber
|
|
return SimpleNamespace(nodes={}, close=lambda: None)
|
|
|
|
monkeypatch.setattr(mesh, "TCPInterface", fake_tcp_interface)
|
|
|
|
_, resolved = mesh._create_serial_interface("tcp://10.20.30.40")
|
|
|
|
assert created["hostname"] == "10.20.30.40"
|
|
assert created["portNumber"] == mesh._DEFAULT_TCP_PORT
|
|
assert resolved == "tcp://10.20.30.40:4403"
|
|
|
|
|
|
def test_create_serial_interface_plain_ip(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
created = {}
|
|
|
|
def fake_tcp_interface(*, hostname, portNumber, **_):
|
|
created["hostname"] = hostname
|
|
created["portNumber"] = portNumber
|
|
return SimpleNamespace(nodes={}, close=lambda: None)
|
|
|
|
monkeypatch.setattr(mesh, "TCPInterface", fake_tcp_interface)
|
|
|
|
_, resolved = mesh._create_serial_interface(" 192.168.50.10 ")
|
|
|
|
assert created["hostname"] == "192.168.50.10"
|
|
assert created["portNumber"] == mesh._DEFAULT_TCP_PORT
|
|
assert resolved == "tcp://192.168.50.10:4403"
|
|
|
|
|
|
def test_create_serial_interface_ble(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
created = {}
|
|
|
|
def fake_ble_interface(*, address=None, **_):
|
|
created["address"] = address
|
|
return SimpleNamespace(nodes={}, close=lambda: None)
|
|
|
|
monkeypatch.setattr(mesh, "BLEInterface", fake_ble_interface)
|
|
|
|
iface, resolved = mesh._create_serial_interface("ed:4d:9e:95:cf:60")
|
|
|
|
assert created["address"] == "ED:4D:9E:95:CF:60"
|
|
assert resolved == "ED:4D:9E:95:CF:60"
|
|
assert iface.nodes == {}
|
|
|
|
|
|
def test_ensure_radio_metadata_extracts_config(mesh_module, capsys):
|
|
mesh = mesh_module
|
|
|
|
class DummyEnumValue:
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
|
|
class DummyEnum:
|
|
def __init__(self, mapping: dict[int, str]) -> None:
|
|
self.values_by_number = {
|
|
number: DummyEnumValue(name) for number, name in mapping.items()
|
|
}
|
|
|
|
class DummyField:
|
|
def __init__(self, enum_type=None) -> None:
|
|
self.enum_type = enum_type
|
|
|
|
class DummyDescriptor:
|
|
def __init__(self, fields: dict[str, DummyField]) -> None:
|
|
self.fields_by_name = fields
|
|
|
|
def make_lora(
|
|
region_value: int,
|
|
region_name: str,
|
|
preset_value: int,
|
|
preset_name: str,
|
|
*,
|
|
preset_field: str = "modem_preset",
|
|
):
|
|
descriptor = DummyDescriptor(
|
|
{
|
|
"region": DummyField(DummyEnum({region_value: region_name})),
|
|
preset_field: DummyField(DummyEnum({preset_value: preset_name})),
|
|
}
|
|
)
|
|
|
|
class DummyLora:
|
|
DESCRIPTOR = descriptor
|
|
|
|
def __init__(self) -> None:
|
|
self.region = region_value
|
|
setattr(self, preset_field, preset_value)
|
|
|
|
def HasField(self, name: str) -> bool: # noqa: D401 - simple proxy
|
|
return hasattr(self, name)
|
|
|
|
return DummyLora()
|
|
|
|
class DummyRadio:
|
|
def __init__(self, lora) -> None:
|
|
self.lora = lora
|
|
|
|
def HasField(self, name: str) -> bool:
|
|
return hasattr(self, name)
|
|
|
|
class DummyConfig:
|
|
def __init__(self, lora, *, expose_direct: bool) -> None:
|
|
if expose_direct:
|
|
self.lora = lora
|
|
else:
|
|
self.radio = DummyRadio(lora)
|
|
|
|
def HasField(self, name: str) -> bool: # noqa: D401 - mimics protobuf API
|
|
return hasattr(self, name)
|
|
|
|
class DummyLocalNode:
|
|
def __init__(self, config) -> None:
|
|
self.localConfig = config
|
|
|
|
class DummyInterface:
|
|
def __init__(self, local_config) -> None:
|
|
self.localNode = DummyLocalNode(local_config)
|
|
self.wait_calls = 0
|
|
|
|
def waitForConfig(self) -> None: # noqa: D401 - matches Meshtastic API
|
|
self.wait_calls += 1
|
|
|
|
primary_lora = make_lora(3, "EU_868", 4, "MEDIUM_FAST")
|
|
iface = DummyInterface(DummyConfig(primary_lora, expose_direct=False))
|
|
|
|
mesh._ensure_radio_metadata(iface)
|
|
first_log = capsys.readouterr().out
|
|
|
|
assert iface.wait_calls == 1
|
|
assert mesh.config.LORA_FREQ == 868
|
|
assert mesh.config.MODEM_PRESET == "MediumFast"
|
|
assert "Captured LoRa radio metadata" in first_log
|
|
assert "lora_freq=868" in first_log
|
|
assert "modem_preset='MediumFast'" in first_log
|
|
|
|
secondary_lora = make_lora(7, "US_915", 2, "LONG_FAST", preset_field="preset")
|
|
second_iface = DummyInterface(DummyConfig(secondary_lora, expose_direct=True))
|
|
|
|
mesh._ensure_radio_metadata(second_iface)
|
|
second_log = capsys.readouterr().out
|
|
|
|
assert second_iface.wait_calls == 1
|
|
assert mesh.config.LORA_FREQ == 868
|
|
assert mesh.config.MODEM_PRESET == "MediumFast"
|
|
assert second_log == ""
|
|
|
|
|
|
def test_capture_channels_from_interface_records_metadata(mesh_module, capsys):
|
|
mesh = mesh_module
|
|
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
mesh.channels._reset_channel_cache()
|
|
|
|
class DummyInterface:
|
|
def __init__(self) -> None:
|
|
self.wait_calls = 0
|
|
primary = SimpleNamespace(
|
|
role=1, settings=SimpleNamespace(name=" radioamator ")
|
|
)
|
|
secondary = SimpleNamespace(
|
|
role="SECONDARY",
|
|
index="7",
|
|
settings=SimpleNamespace(name="TestChannel"),
|
|
)
|
|
self.localNode = SimpleNamespace(channels=[primary, secondary])
|
|
|
|
def waitForConfig(self) -> None: # noqa: D401 - matches interface contract
|
|
self.wait_calls += 1
|
|
|
|
iface = DummyInterface()
|
|
|
|
mesh.channels.capture_from_interface(iface)
|
|
log_output = capsys.readouterr().out
|
|
|
|
assert iface.wait_calls == 1
|
|
assert mesh.channels.channel_mappings() == ((0, "radioamator"), (7, "TestChannel"))
|
|
assert mesh.channels.channel_name(7) == "TestChannel"
|
|
assert "Captured channel metadata" in log_output
|
|
assert "channels=((0, 'radioamator'), (7, 'TestChannel'))" in log_output
|
|
|
|
mesh.channels.capture_from_interface(SimpleNamespace(localNode=None))
|
|
assert mesh.channels.channel_mappings() == ((0, "radioamator"), (7, "TestChannel"))
|
|
|
|
|
|
def test_capture_channels_primary_falls_back_to_env(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
mesh.config.MODEM_PRESET = None
|
|
mesh.channels._reset_channel_cache()
|
|
monkeypatch.setenv("CHANNEL", "FallbackName")
|
|
|
|
class DummyInterface:
|
|
def __init__(self) -> None:
|
|
self.localNode = SimpleNamespace(
|
|
channels={"primary": SimpleNamespace(role="PRIMARY")}
|
|
)
|
|
|
|
def waitForConfig(self) -> None: # noqa: D401 - placeholder
|
|
return None
|
|
|
|
mesh.channels._reset_channel_cache()
|
|
mesh.channels.capture_from_interface(DummyInterface())
|
|
log_output = capsys.readouterr().out
|
|
|
|
assert mesh.channels.channel_mappings() == ((0, "FallbackName"),)
|
|
assert mesh.channels.channel_name(0) == "FallbackName"
|
|
assert "FallbackName" in log_output
|
|
|
|
|
|
def test_capture_channels_primary_falls_back_to_preset(mesh_module, capsys):
|
|
mesh = mesh_module
|
|
|
|
mesh.config.MODEM_PRESET = " MediumFast "
|
|
mesh.channels._reset_channel_cache()
|
|
|
|
class DummyInterface:
|
|
def __init__(self) -> None:
|
|
self.localNode = SimpleNamespace(
|
|
channels=[SimpleNamespace(role="PRIMARY", settings=SimpleNamespace())]
|
|
)
|
|
|
|
def waitForConfig(self) -> None: # noqa: D401 - matches interface contract
|
|
return None
|
|
|
|
mesh.channels.capture_from_interface(DummyInterface())
|
|
log_output = capsys.readouterr().out
|
|
|
|
assert mesh.channels.channel_mappings() == ((0, "MediumFast"),)
|
|
assert mesh.channels.channel_name(0) == "MediumFast"
|
|
assert "MediumFast" in log_output
|
|
|
|
|
|
def test_create_default_interface_falls_back_to_tcp(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
attempts = []
|
|
|
|
def fake_targets():
|
|
return ["/dev/ttyFAIL"]
|
|
|
|
def fake_create(port):
|
|
attempts.append(port)
|
|
if port.startswith("/dev/tty"):
|
|
raise RuntimeError("missing serial device")
|
|
return SimpleNamespace(nodes={}, close=lambda: None), "tcp://127.0.0.1:4403"
|
|
|
|
monkeypatch.setattr(mesh, "_default_serial_targets", fake_targets)
|
|
monkeypatch.setattr(mesh, "_create_serial_interface", fake_create)
|
|
|
|
iface, resolved = mesh._create_default_interface()
|
|
|
|
assert attempts == ["/dev/ttyFAIL", mesh._DEFAULT_TCP_TARGET]
|
|
assert resolved == "tcp://127.0.0.1:4403"
|
|
assert iface.nodes == {}
|
|
|
|
|
|
def test_create_default_interface_raises_when_unavailable(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "_default_serial_targets", lambda: ["/dev/ttyFAIL"])
|
|
|
|
def always_fail(port):
|
|
raise RuntimeError(f"boom for {port}")
|
|
|
|
monkeypatch.setattr(mesh, "_create_serial_interface", always_fail)
|
|
|
|
with pytest.raises(mesh.NoAvailableMeshInterface) as exc_info:
|
|
mesh._create_default_interface()
|
|
|
|
assert "/dev/ttyFAIL" in str(exc_info.value)
|
|
|
|
|
|
def test_node_to_dict_handles_nested_structures(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
@dataclass
|
|
class Child:
|
|
number: int
|
|
|
|
class DummyProto(mesh.ProtoMessage):
|
|
def __init__(self, **payload):
|
|
self._payload = payload
|
|
|
|
def to_dict(self):
|
|
return self._payload
|
|
|
|
@dataclass
|
|
class Node:
|
|
info: Child
|
|
proto: DummyProto
|
|
payload: bytes
|
|
seq: list
|
|
|
|
node = Node(Child(5), DummyProto(value=7), b"hi", [Child(1), DummyProto(value=9)])
|
|
|
|
result = mesh._node_to_dict(node)
|
|
assert result["info"] == {"number": 5}
|
|
assert result["proto"] == {"value": 7}
|
|
assert result["payload"] == "hi"
|
|
assert result["seq"] == [{"number": 1}, {"value": 9}]
|
|
|
|
|
|
def test_store_packet_dict_posts_text_message(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
mesh.register_host_node_id("!f00dbabe")
|
|
|
|
packet = {
|
|
"id": 123,
|
|
"rxTime": 1_700_000_000,
|
|
"fromId": "!abc",
|
|
"toId": "!def",
|
|
"channel": "2",
|
|
"hopLimit": "3",
|
|
"snr": "1.25",
|
|
"rxRssi": "-70",
|
|
"decoded": {
|
|
"payload": {"text": "hello"},
|
|
"portnum": "TEXT_MESSAGE_APP",
|
|
"channel": 4,
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected POST to be triggered for text message"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["id"] == 123
|
|
assert payload["channel"] == 4
|
|
assert payload["from_id"] == "!abc"
|
|
assert payload["to_id"] == "!def"
|
|
assert payload["text"] == "hello"
|
|
assert payload["portnum"] == "TEXT_MESSAGE_APP"
|
|
assert payload["rx_time"] == 1_700_000_000
|
|
assert payload["rx_iso"] == mesh._iso(1_700_000_000)
|
|
assert payload["hop_limit"] == 3
|
|
assert payload["snr"] == pytest.approx(1.25)
|
|
assert payload["rssi"] == -70
|
|
assert payload["reply_id"] is None
|
|
assert payload["emoji"] is None
|
|
assert payload["ingestor"] == "!f00dbabe"
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_posts_reaction_message(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
packet = {
|
|
"id": 999,
|
|
"rxTime": 1_700_100_000,
|
|
"fromId": "!reply",
|
|
"toId": "!root",
|
|
"decoded": {
|
|
"portnum": "REACTION_APP",
|
|
"data": {
|
|
"reply_id": "123",
|
|
"emoji": " 👍 ",
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected POST to be triggered for reaction message"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["id"] == 999
|
|
assert payload["from_id"] == "!reply"
|
|
assert payload["to_id"] == "!root"
|
|
assert payload["portnum"] == "REACTION_APP"
|
|
assert payload["text"] is None
|
|
assert payload["reply_id"] == 123
|
|
assert payload["emoji"] == "👍"
|
|
assert payload["rx_time"] == 1_700_100_000
|
|
assert payload["rx_iso"] == mesh._iso(1_700_100_000)
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_posts_position(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
mesh.register_host_node_id("!f00dbabe")
|
|
|
|
packet = {
|
|
"id": 200498337,
|
|
"rxTime": 1_758_624_186,
|
|
"fromId": "!b1fa2b07",
|
|
"toId": "^all",
|
|
"rxSnr": -9.5,
|
|
"rxRssi": -104,
|
|
"decoded": {
|
|
"portnum": "POSITION_APP",
|
|
"bitfield": 1,
|
|
"position": {
|
|
"latitudeI": int(52.518912 * 1e7),
|
|
"longitudeI": int(13.5512064 * 1e7),
|
|
"altitude": -16,
|
|
"time": 1_758_624_189,
|
|
"locationSource": "LOC_INTERNAL",
|
|
"precisionBits": 17,
|
|
"satsInView": 7,
|
|
"PDOP": 211,
|
|
"groundSpeed": 2,
|
|
"groundTrack": 0,
|
|
"raw": {
|
|
"latitude_i": int(52.518912 * 1e7),
|
|
"longitude_i": int(13.5512064 * 1e7),
|
|
"altitude": -16,
|
|
"time": 1_758_624_189,
|
|
},
|
|
},
|
|
"payload": {
|
|
"__bytes_b64__": "DQDATR8VAMATCBjw//////////8BJb150mgoAljTAXgCgAEAmAEHuAER",
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected POST to be triggered for position packet"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/positions"
|
|
assert priority == mesh._POSITION_POST_PRIORITY
|
|
assert payload["id"] == 200498337
|
|
assert payload["node_id"] == "!b1fa2b07"
|
|
assert payload["node_num"] == int("b1fa2b07", 16)
|
|
assert payload["num"] == payload["node_num"]
|
|
assert payload["rx_time"] == 1_758_624_186
|
|
assert payload["rx_iso"] == mesh._iso(1_758_624_186)
|
|
assert payload["latitude"] == pytest.approx(52.518912)
|
|
assert payload["longitude"] == pytest.approx(13.5512064)
|
|
assert payload["altitude"] == pytest.approx(-16)
|
|
assert payload["position_time"] == 1_758_624_189
|
|
assert payload["location_source"] == "LOC_INTERNAL"
|
|
assert payload["precision_bits"] == 17
|
|
assert payload["sats_in_view"] == 7
|
|
assert payload["pdop"] == pytest.approx(211.0)
|
|
assert payload["ground_speed"] == pytest.approx(2.0)
|
|
assert payload["ground_track"] == pytest.approx(0.0)
|
|
assert payload["snr"] == pytest.approx(-9.5)
|
|
assert payload["rssi"] == -104
|
|
assert payload["hop_limit"] is None
|
|
assert payload["bitfield"] == 1
|
|
assert (
|
|
payload["payload_b64"]
|
|
== "DQDATR8VAMATCBjw//////////8BJb150mgoAljTAXgCgAEAmAEHuAER"
|
|
)
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert payload["ingestor"] == "!f00dbabe"
|
|
assert payload["raw"]["time"] == 1_758_624_189
|
|
|
|
|
|
def test_store_packet_dict_posts_neighborinfo(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
mesh.register_host_node_id("!f00dbabe")
|
|
|
|
packet = {
|
|
"id": 2049886869,
|
|
"rxTime": 1_758_884_186,
|
|
"fromId": "!7c5b0920",
|
|
"decoded": {
|
|
"portnum": "NEIGHBORINFO_APP",
|
|
"neighborinfo": {
|
|
"nodeId": 0x7C5B0920,
|
|
"lastSentById": 0x9E3AA2F0,
|
|
"nodeBroadcastIntervalSecs": 1800,
|
|
"neighbors": [
|
|
{"nodeId": 0x2B2A4D51, "snr": -6.5},
|
|
{"nodeId": 0x437FE3E0, "snr": -2.75, "rxTime": 1_758_884_150},
|
|
{"nodeId": "!0badc0de", "snr": None},
|
|
],
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected POST to be triggered for neighbor info"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/neighbors"
|
|
assert priority == mesh._NEIGHBOR_POST_PRIORITY
|
|
assert payload["node_id"] == "!7c5b0920"
|
|
assert payload["node_num"] == 0x7C5B0920
|
|
assert payload["rx_time"] == 1_758_884_186
|
|
assert payload["node_broadcast_interval_secs"] == 1800
|
|
assert payload["last_sent_by_id"] == "!9e3aa2f0"
|
|
neighbors = payload["neighbors"]
|
|
assert len(neighbors) == 3
|
|
assert neighbors[0]["neighbor_id"] == "!2b2a4d51"
|
|
assert neighbors[0]["neighbor_num"] == 0x2B2A4D51
|
|
assert neighbors[0]["rx_time"] == 1_758_884_186
|
|
assert neighbors[0]["snr"] == pytest.approx(-6.5)
|
|
assert neighbors[1]["neighbor_id"] == "!437fe3e0"
|
|
assert neighbors[1]["rx_time"] == 1_758_884_150
|
|
assert neighbors[1]["snr"] == pytest.approx(-2.75)
|
|
assert neighbors[2]["neighbor_id"] == "!0badc0de"
|
|
assert neighbors[2]["neighbor_num"] == 0x0BAD_C0DE
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert payload["ingestor"] == "!f00dbabe"
|
|
|
|
|
|
def test_store_packet_dict_handles_nodeinfo_packet(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
from meshtastic.protobuf import config_pb2, mesh_pb2
|
|
|
|
node_info = mesh_pb2.NodeInfo()
|
|
node_info.num = 321
|
|
user = node_info.user
|
|
user.id = "!abcd1234"
|
|
user.short_name = "LoRa"
|
|
user.long_name = "LoRa Node"
|
|
user.role = config_pb2.Config.DeviceConfig.Role.Value("CLIENT")
|
|
user.hw_model = mesh_pb2.HardwareModel.Value("TBEAM")
|
|
node_info.device_metrics.battery_level = 87
|
|
node_info.device_metrics.voltage = 3.91
|
|
node_info.device_metrics.channel_utilization = 5.5
|
|
node_info.device_metrics.air_util_tx = 0.12
|
|
node_info.device_metrics.uptime_seconds = 4321
|
|
node_info.position.latitude_i = int(52.5 * 1e7)
|
|
node_info.position.longitude_i = int(13.4 * 1e7)
|
|
node_info.position.altitude = 48
|
|
node_info.position.time = 1_700_000_050
|
|
node_info.position.location_source = mesh_pb2.Position.LocSource.Value(
|
|
"LOC_INTERNAL"
|
|
)
|
|
node_info.snr = 9.5
|
|
node_info.last_heard = 1_700_000_040
|
|
node_info.hops_away = 2
|
|
node_info.is_favorite = True
|
|
|
|
payload_b64 = base64.b64encode(node_info.SerializeToString()).decode()
|
|
packet = {
|
|
"id": 999,
|
|
"rxTime": 1_700_000_200,
|
|
"from": int("abcd1234", 16),
|
|
"rxSnr": -5.5,
|
|
"decoded": {
|
|
"portnum": "NODEINFO_APP",
|
|
"payload": {"__bytes_b64__": payload_b64},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected nodeinfo packet to trigger POST"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/nodes"
|
|
assert priority == mesh._NODE_POST_PRIORITY
|
|
assert "!abcd1234" in payload
|
|
node_entry = payload["!abcd1234"]
|
|
assert node_entry["num"] == 321
|
|
assert node_entry["lastHeard"] == 1_700_000_200
|
|
assert node_entry["snr"] == pytest.approx(9.5)
|
|
assert node_entry["hopsAway"] == 2
|
|
assert node_entry["isFavorite"] is True
|
|
assert node_entry["user"]["shortName"] == "LoRa"
|
|
assert node_entry["deviceMetrics"]["batteryLevel"] == pytest.approx(87)
|
|
assert node_entry["deviceMetrics"]["voltage"] == pytest.approx(3.91)
|
|
assert node_entry["deviceMetrics"]["uptimeSeconds"] == 4321
|
|
assert node_entry["position"]["latitude"] == pytest.approx(52.5)
|
|
assert node_entry["position"]["longitude"] == pytest.approx(13.4)
|
|
assert node_entry["position"]["time"] == 1_700_000_050
|
|
assert node_entry["lora_freq"] == 868
|
|
assert node_entry["modem_preset"] == "MediumFast"
|
|
|
|
|
|
def test_store_packet_dict_handles_user_only_nodeinfo(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
user_msg = mesh_pb2.User()
|
|
user_msg.id = "!11223344"
|
|
user_msg.short_name = "Test"
|
|
user_msg.long_name = "Test Node"
|
|
|
|
payload_b64 = base64.b64encode(user_msg.SerializeToString()).decode()
|
|
packet = {
|
|
"id": 42,
|
|
"rxTime": 1_234,
|
|
"from": int("11223344", 16),
|
|
"decoded": {
|
|
"portnum": "NODEINFO_APP",
|
|
"payload": {"__bytes_b64__": payload_b64},
|
|
"user": {
|
|
"id": "!11223344",
|
|
"shortName": "Test",
|
|
"longName": "Test Node",
|
|
"hwModel": "HELTEC_V3",
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
_, payload, _ = captured[0]
|
|
node_entry = payload["!11223344"]
|
|
assert node_entry["lastHeard"] == 1_234
|
|
assert node_entry["user"]["longName"] == "Test Node"
|
|
assert "deviceMetrics" not in node_entry
|
|
assert node_entry["lora_freq"] == 868
|
|
assert node_entry["modem_preset"] == "MediumFast"
|
|
|
|
|
|
def test_store_packet_dict_nodeinfo_merges_proto_user(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
user_msg = mesh_pb2.User()
|
|
user_msg.id = "!44556677"
|
|
user_msg.short_name = "Proto"
|
|
user_msg.long_name = "Proto User"
|
|
|
|
node_info = mesh_pb2.NodeInfo()
|
|
node_info.snr = 2.5
|
|
|
|
payload_b64 = base64.b64encode(node_info.SerializeToString()).decode()
|
|
packet = {
|
|
"id": 73,
|
|
"rxTime": 5_000,
|
|
"fromId": "!44556677",
|
|
"decoded": {
|
|
"portnum": "NODEINFO_APP",
|
|
"payload": {"__bytes_b64__": payload_b64},
|
|
"user": user_msg,
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
_, payload, _ = captured[0]
|
|
node_entry = payload["!44556677"]
|
|
assert node_entry["lastHeard"] == 5_000
|
|
assert node_entry["user"]["shortName"] == "Proto"
|
|
assert node_entry["user"]["longName"] == "Proto User"
|
|
assert node_entry["lora_freq"] == 868
|
|
assert node_entry["modem_preset"] == "MediumFast"
|
|
|
|
|
|
def test_store_packet_dict_nodeinfo_sanitizes_nested_proto(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
user_msg = mesh_pb2.User()
|
|
user_msg.id = "!55667788"
|
|
user_msg.short_name = "Nested"
|
|
|
|
node_info = mesh_pb2.NodeInfo()
|
|
node_info.hops_away = 1
|
|
|
|
payload_b64 = base64.b64encode(node_info.SerializeToString()).decode()
|
|
packet = {
|
|
"id": 74,
|
|
"rxTime": 6_000,
|
|
"fromId": "!55667788",
|
|
"decoded": {
|
|
"portnum": "NODEINFO_APP",
|
|
"payload": {"__bytes_b64__": payload_b64},
|
|
"user": {
|
|
"id": "!55667788",
|
|
"shortName": "Nested",
|
|
"raw": user_msg,
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
_, payload, _ = captured[0]
|
|
node_entry = payload["!55667788"]
|
|
assert node_entry["user"]["shortName"] == "Nested"
|
|
assert isinstance(node_entry["user"]["raw"], dict)
|
|
assert node_entry["user"]["raw"]["id"] == "!55667788"
|
|
assert node_entry["lora_freq"] == 868
|
|
assert node_entry["modem_preset"] == "MediumFast"
|
|
|
|
|
|
def test_store_packet_dict_nodeinfo_uses_from_id_when_user_missing(
|
|
mesh_module, monkeypatch
|
|
):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
node_info = mesh_pb2.NodeInfo()
|
|
node_info.snr = 1.5
|
|
node_info.last_heard = 100
|
|
|
|
payload_b64 = base64.b64encode(node_info.SerializeToString()).decode()
|
|
packet = {
|
|
"id": 7,
|
|
"rxTime": 200,
|
|
"from": 0x01020304,
|
|
"decoded": {"portnum": 5, "payload": {"__bytes_b64__": payload_b64}},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
_, payload, _ = captured[0]
|
|
assert "!01020304" in payload
|
|
|
|
|
|
def test_nodeinfo_wrapper_infers_missing_identifier(mesh_module, monkeypatch):
|
|
"""Ensure the Meshtastic nodeinfo hook derives canonical IDs from payloads."""
|
|
|
|
_ = mesh_module
|
|
import meshtastic
|
|
from data.mesh_ingestor import interfaces
|
|
|
|
captured_packets: list[dict] = []
|
|
|
|
def _original_handler(iface, packet):
|
|
captured_packets.append(packet)
|
|
return packet["id"]
|
|
|
|
monkeypatch.setattr(
|
|
meshtastic, "_onNodeInfoReceive", _original_handler, raising=False
|
|
)
|
|
interfaces._patch_meshtastic_nodeinfo_handler()
|
|
|
|
safe_handler = meshtastic._onNodeInfoReceive
|
|
|
|
class DummyUser:
|
|
def __init__(self) -> None:
|
|
self.num = 0x88776655
|
|
|
|
class DummyDecoded:
|
|
def __init__(self) -> None:
|
|
self.user = DummyUser()
|
|
|
|
class DummyPacket:
|
|
def __init__(self) -> None:
|
|
self.decoded = DummyDecoded()
|
|
|
|
iface = types.SimpleNamespace(nodes={})
|
|
|
|
safe_handler(iface, DummyPacket())
|
|
|
|
assert captured_packets, "Expected wrapper to call the original handler"
|
|
packet = captured_packets[0]
|
|
assert packet["id"] == "!88776655"
|
|
|
|
|
|
def test_nodeinfo_handler_wrapper_prevents_key_error(mesh_module):
|
|
"""The NodeInfo handler should operate safely when the ID field is absent."""
|
|
|
|
import meshtastic
|
|
from data.mesh_ingestor import interfaces
|
|
|
|
interfaces._patch_meshtastic_nodeinfo_handler()
|
|
|
|
assert getattr(
|
|
meshtastic.mesh_interface.NodeInfoHandler,
|
|
"_potato_mesh_safe_wrapper",
|
|
False,
|
|
), "Expected NodeInfoHandler to be replaced with a safe subclass"
|
|
|
|
handler = meshtastic.mesh_interface.NodeInfoHandler()
|
|
iface = types.SimpleNamespace(nodes={})
|
|
|
|
packet = {"decoded": {"user": {"id": "!01020304"}}}
|
|
|
|
result = handler.onReceive(iface, packet)
|
|
|
|
assert iface.nodes["!01020304"]["id"] == "!01020304"
|
|
assert result == "!01020304"
|
|
|
|
|
|
def test_interfaces_patch_handles_preimported_serial():
|
|
"""Regression: importing serial module before patch still updates handler."""
|
|
|
|
preserved_modules: dict[str, types.ModuleType | None] = {}
|
|
module_names = [
|
|
"data.mesh_ingestor.interfaces",
|
|
"data.mesh_ingestor",
|
|
"meshtastic.serial_interface",
|
|
"meshtastic.tcp_interface",
|
|
"meshtastic.mesh_interface",
|
|
"meshtastic",
|
|
]
|
|
for name in module_names:
|
|
preserved_modules[name] = sys.modules.pop(name, None)
|
|
|
|
try:
|
|
|
|
def _default_nodeinfo_callback(_iface, packet):
|
|
return packet["id"]
|
|
|
|
mesh_interface_mod = types.ModuleType("meshtastic.mesh_interface")
|
|
|
|
class DummyNodeInfoHandler:
|
|
"""Stub that mirrors Meshtastic's original handler semantics."""
|
|
|
|
def __init__(self) -> None:
|
|
self.callback = _default_nodeinfo_callback
|
|
|
|
def onReceive(self, iface, packet): # noqa: D401 - simple passthrough
|
|
return self.callback(iface, packet)
|
|
|
|
mesh_interface_mod.NodeInfoHandler = DummyNodeInfoHandler
|
|
|
|
serial_interface_mod = types.ModuleType("meshtastic.serial_interface")
|
|
|
|
class DummySerialInterface:
|
|
def __init__(self, *_, **__):
|
|
self.nodes = {}
|
|
|
|
def close(self): # noqa: D401 - mimic Meshtastic close API
|
|
self.nodes.clear()
|
|
|
|
serial_interface_mod.SerialInterface = DummySerialInterface
|
|
serial_interface_mod.NodeInfoHandler = DummyNodeInfoHandler
|
|
|
|
tcp_interface_mod = types.ModuleType("meshtastic.tcp_interface")
|
|
|
|
class DummyTCPInterface:
|
|
def __init__(self, *_, **__):
|
|
self.nodes = {}
|
|
|
|
def close(self): # noqa: D401 - mimic Meshtastic close API
|
|
self.nodes.clear()
|
|
|
|
tcp_interface_mod.TCPInterface = DummyTCPInterface
|
|
|
|
meshtastic_mod = types.ModuleType("meshtastic")
|
|
meshtastic_mod.__path__ = [] # mark as package for import machinery
|
|
meshtastic_mod._onNodeInfoReceive = _default_nodeinfo_callback
|
|
meshtastic_mod.mesh_interface = mesh_interface_mod
|
|
meshtastic_mod.serial_interface = serial_interface_mod
|
|
meshtastic_mod.tcp_interface = tcp_interface_mod
|
|
|
|
sys.modules["meshtastic"] = meshtastic_mod
|
|
sys.modules["meshtastic.mesh_interface"] = mesh_interface_mod
|
|
sys.modules["meshtastic.serial_interface"] = serial_interface_mod
|
|
sys.modules["meshtastic.tcp_interface"] = tcp_interface_mod
|
|
|
|
serial_module = importlib.import_module("meshtastic.serial_interface")
|
|
assert serial_module.NodeInfoHandler is DummyNodeInfoHandler
|
|
|
|
interfaces = importlib.import_module("data.mesh_ingestor.interfaces")
|
|
|
|
patched_handler = serial_module.NodeInfoHandler
|
|
assert patched_handler is not DummyNodeInfoHandler
|
|
assert getattr(patched_handler, "_potato_mesh_safe_wrapper", False)
|
|
|
|
handler = patched_handler()
|
|
iface = types.SimpleNamespace(nodes={})
|
|
|
|
assert handler.onReceive(iface, {}) is None
|
|
assert iface.nodes == {}
|
|
|
|
patched_callback = getattr(meshtastic_mod, "_onNodeInfoReceive")
|
|
assert getattr(patched_callback, "_potato_mesh_safe_wrapper", False)
|
|
|
|
assert interfaces.SerialInterface is DummySerialInterface
|
|
finally:
|
|
for name in module_names:
|
|
sys.modules.pop(name, None)
|
|
for name, module in preserved_modules.items():
|
|
if module is not None:
|
|
sys.modules[name] = module
|
|
|
|
|
|
def test_store_packet_dict_ignores_non_text(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda *args, **kwargs: captured.append((args, kwargs)),
|
|
)
|
|
|
|
packet = {
|
|
"id": 456,
|
|
"rxTime": 1_700_000_100,
|
|
"fromId": "!abc",
|
|
"toId": "!def",
|
|
"decoded": {
|
|
"payload": {"text": "ignored"},
|
|
"portnum": "ENVIRONMENTAL_MEASUREMENT",
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert not captured, "Non-text messages should not be queued"
|
|
|
|
|
|
def test_node_items_snapshot_handles_transient_runtime_error(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class FlakyDict(dict):
|
|
def __init__(self):
|
|
super().__init__({"node": {"foo": "bar"}})
|
|
self.calls = 0
|
|
|
|
def items(self):
|
|
self.calls += 1
|
|
if self.calls == 1:
|
|
raise RuntimeError("dictionary changed size during iteration")
|
|
return super().items()
|
|
|
|
nodes = FlakyDict()
|
|
snapshot = mesh._node_items_snapshot(nodes, retries=3)
|
|
|
|
assert snapshot == [("node", {"foo": "bar"})]
|
|
assert nodes.calls == 2
|
|
|
|
|
|
def test_node_items_snapshot_returns_none_when_still_mutating(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class AlwaysChanging(dict):
|
|
def __init__(self):
|
|
super().__init__({"node": {"foo": "bar"}})
|
|
|
|
def items(self):
|
|
raise RuntimeError("dictionary changed size during iteration")
|
|
|
|
nodes = AlwaysChanging()
|
|
snapshot = mesh._node_items_snapshot(nodes, retries=2)
|
|
|
|
assert snapshot is None
|
|
|
|
|
|
def test_get_handles_dicts_and_objects(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class Dummy:
|
|
value = "obj"
|
|
|
|
assert mesh._get({"key": 1}, "key") == 1
|
|
assert mesh._get({"key": 1}, "missing", "fallback") == "fallback"
|
|
dummy = Dummy()
|
|
assert mesh._get(dummy, "value") == "obj"
|
|
assert mesh._get(dummy, "missing", "default") == "default"
|
|
|
|
|
|
def test_post_json_skips_without_instance(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
monkeypatch.setattr(mesh, "INSTANCE", "")
|
|
|
|
def fail_request(*_, **__):
|
|
raise AssertionError("Request should not be created when INSTANCE is empty")
|
|
|
|
monkeypatch.setattr(mesh.urllib.request, "Request", fail_request)
|
|
mesh._post_json("/ignored", {"foo": "bar"})
|
|
|
|
|
|
def test_post_json_sends_payload_with_token(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
monkeypatch.setattr(mesh, "INSTANCE", "https://example.test")
|
|
monkeypatch.setattr(mesh, "API_TOKEN", "secret")
|
|
|
|
captured = {}
|
|
|
|
def fake_urlopen(req, timeout=0):
|
|
captured["req"] = req
|
|
|
|
class DummyResponse:
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *exc):
|
|
return False
|
|
|
|
def read(self):
|
|
return b"ok"
|
|
|
|
return DummyResponse()
|
|
|
|
monkeypatch.setattr(mesh.urllib.request, "urlopen", fake_urlopen)
|
|
|
|
mesh._post_json("/api/test", {"hello": "world"})
|
|
|
|
req = captured["req"]
|
|
assert req.full_url == "https://example.test/api/test"
|
|
assert req.headers["Content-type"] == "application/json"
|
|
assert req.get_header("Authorization") == "Bearer secret"
|
|
assert mesh.json.loads(req.data.decode("utf-8")) == {"hello": "world"}
|
|
|
|
|
|
def test_node_to_dict_handles_non_utf8_bytes(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
@dataclass
|
|
class Node:
|
|
payload: bytes
|
|
other: object
|
|
|
|
class Custom:
|
|
def __str__(self):
|
|
return "custom!"
|
|
|
|
node = Node(b"\xff", Custom())
|
|
result = mesh._node_to_dict(node)
|
|
|
|
assert result["payload"] == "ff"
|
|
assert result["other"] == "custom!"
|
|
|
|
|
|
def test_first_prefers_first_non_empty_value(mesh_module):
|
|
mesh = mesh_module
|
|
data = {"primary": {"value": ""}, "secondary": {"value": "found"}}
|
|
|
|
assert mesh._first(data, "primary.value", "secondary.value") == "found"
|
|
assert mesh._first(data, "missing.path", default="fallback") == "fallback"
|
|
|
|
|
|
def test_first_handles_attribute_sources(mesh_module):
|
|
mesh = mesh_module
|
|
ns = SimpleNamespace(empty=None, value="attr")
|
|
|
|
assert mesh._first(ns, "empty", "value") == "attr"
|
|
|
|
|
|
def test_pkt_to_dict_handles_dict_and_proto(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._pkt_to_dict({"a": 1}) == {"a": 1}
|
|
|
|
class DummyProto(mesh.ProtoMessage):
|
|
def to_dict(self):
|
|
return {"value": 5}
|
|
|
|
assert mesh._pkt_to_dict(DummyProto()) == {"value": 5}
|
|
|
|
class Unknown:
|
|
pass
|
|
|
|
def broken_dumps(*_, **__):
|
|
raise TypeError("boom")
|
|
|
|
monkeypatch.setattr(mesh.json, "dumps", broken_dumps)
|
|
fallback = mesh._pkt_to_dict(Unknown())
|
|
assert set(fallback) == {"_unparsed"}
|
|
assert isinstance(fallback["_unparsed"], str)
|
|
|
|
|
|
def test_main_retries_interface_creation(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
attempts = []
|
|
|
|
class DummyEvent:
|
|
def __init__(self):
|
|
self.wait_calls = 0
|
|
|
|
def is_set(self):
|
|
return self.wait_calls >= 3
|
|
|
|
def set(self):
|
|
self.wait_calls = 3
|
|
|
|
def wait(self, timeout):
|
|
self.wait_calls += 1
|
|
return self.is_set()
|
|
|
|
class DummyInterface:
|
|
def __init__(self):
|
|
self.closed = False
|
|
self.nodes = {}
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
iface = DummyInterface()
|
|
|
|
def fake_create(port):
|
|
attempts.append(port)
|
|
if len(attempts) < 3:
|
|
raise RuntimeError("boom")
|
|
return iface, port
|
|
|
|
monkeypatch.setattr(mesh, "PORT", "/dev/ttyTEST")
|
|
monkeypatch.setattr(mesh, "_create_serial_interface", fake_create)
|
|
monkeypatch.setattr(mesh.threading, "Event", DummyEvent)
|
|
monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None)
|
|
monkeypatch.setattr(mesh, "SNAPSHOT_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_RECONNECT_INITIAL_DELAY_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_RECONNECT_MAX_DELAY_SECS", 0)
|
|
|
|
mesh.main()
|
|
|
|
assert len(attempts) == 3
|
|
assert iface.closed is True
|
|
|
|
|
|
def test_connected_state_handles_threading_event(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
event = mesh.threading.Event()
|
|
assert mesh._connected_state(event) is False
|
|
|
|
event.set()
|
|
assert mesh._connected_state(event) is True
|
|
|
|
|
|
def test_main_reconnects_when_connection_event_clears(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
attempts = []
|
|
interfaces = []
|
|
current_iface = {"obj": None}
|
|
import threading as real_threading_module
|
|
|
|
real_event_cls = real_threading_module.Event
|
|
|
|
class DummyInterface:
|
|
def __init__(self):
|
|
self.nodes = {}
|
|
self.isConnected = real_event_cls()
|
|
self.isConnected.set()
|
|
self.close_calls = 0
|
|
|
|
def close(self):
|
|
self.close_calls += 1
|
|
|
|
def fake_create(port):
|
|
iface = DummyInterface()
|
|
attempts.append(port)
|
|
interfaces.append(iface)
|
|
current_iface["obj"] = iface
|
|
return iface, port
|
|
|
|
class DummyStopEvent:
|
|
def __init__(self):
|
|
self._flag = False
|
|
self.wait_calls = 0
|
|
|
|
def is_set(self):
|
|
return self._flag
|
|
|
|
def set(self):
|
|
self._flag = True
|
|
|
|
def wait(self, timeout):
|
|
self.wait_calls += 1
|
|
if self.wait_calls == 1:
|
|
iface = current_iface["obj"]
|
|
assert iface is not None, "interface should be available"
|
|
iface.isConnected.clear()
|
|
return self._flag
|
|
self._flag = True
|
|
return True
|
|
|
|
monkeypatch.setattr(mesh, "PORT", "/dev/ttyTEST")
|
|
monkeypatch.setattr(mesh, "_create_serial_interface", fake_create)
|
|
monkeypatch.setattr(mesh.threading, "Event", DummyStopEvent)
|
|
monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None)
|
|
monkeypatch.setattr(mesh, "SNAPSHOT_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_RECONNECT_INITIAL_DELAY_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_RECONNECT_MAX_DELAY_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_CLOSE_TIMEOUT_SECS", 0)
|
|
|
|
mesh.main()
|
|
|
|
assert len(attempts) == 2
|
|
assert len(interfaces) == 2
|
|
assert interfaces[0].close_calls >= 1
|
|
assert interfaces[1].close_calls >= 1
|
|
|
|
|
|
def test_main_recreates_interface_after_snapshot_error(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
class DummyEvent:
|
|
def __init__(self):
|
|
self.wait_calls = 0
|
|
|
|
def is_set(self):
|
|
return self.wait_calls >= 2
|
|
|
|
def set(self):
|
|
self.wait_calls = 2
|
|
|
|
def wait(self, timeout):
|
|
self.wait_calls += 1
|
|
return self.is_set()
|
|
|
|
interfaces = []
|
|
|
|
def fake_create(port):
|
|
fail_first = not interfaces
|
|
|
|
class FlakyInterface:
|
|
def __init__(self, should_fail):
|
|
self.closed = False
|
|
self._should_fail = should_fail
|
|
self._calls = 0
|
|
|
|
@property
|
|
def nodes(self):
|
|
self._calls += 1
|
|
if self._should_fail and self._calls == 1:
|
|
raise RuntimeError("temporary failure")
|
|
return {"!node": {"id": 1}}
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
interface = FlakyInterface(fail_first)
|
|
interfaces.append(interface)
|
|
return interface, port
|
|
|
|
upsert_calls = []
|
|
|
|
def record_upsert(node_id, node):
|
|
upsert_calls.append(node_id)
|
|
|
|
monkeypatch.setattr(mesh, "PORT", "/dev/ttyTEST")
|
|
monkeypatch.setattr(mesh, "_create_serial_interface", fake_create)
|
|
monkeypatch.setattr(mesh, "upsert_node", record_upsert)
|
|
monkeypatch.setattr(mesh.threading, "Event", DummyEvent)
|
|
monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None)
|
|
monkeypatch.setattr(mesh, "SNAPSHOT_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_RECONNECT_INITIAL_DELAY_SECS", 0)
|
|
monkeypatch.setattr(mesh, "_RECONNECT_MAX_DELAY_SECS", 0)
|
|
|
|
mesh.main()
|
|
|
|
assert len(interfaces) >= 2
|
|
assert interfaces[0].closed is True
|
|
assert upsert_calls == ["!node"]
|
|
|
|
|
|
def test_main_exits_when_defaults_unavailable(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
def fail_default():
|
|
raise mesh.NoAvailableMeshInterface("no interface available")
|
|
|
|
monkeypatch.setattr(mesh, "PORT", None)
|
|
monkeypatch.setattr(mesh, "_create_default_interface", fail_default)
|
|
monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
mesh.main()
|
|
|
|
assert exc_info.value.code == 1
|
|
|
|
|
|
def test_store_packet_dict_uses_top_level_channel(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
packet = {
|
|
"id": "789",
|
|
"rxTime": 123456,
|
|
"from": "!abc",
|
|
"to": "!def",
|
|
"channel": "5",
|
|
"decoded": {"text": "hi", "portnum": 1},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected message to be stored"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["channel"] == 5
|
|
assert payload["portnum"] == "1"
|
|
assert payload["text"] == "hi"
|
|
assert payload["encrypted"] is None
|
|
assert payload["snr"] is None and payload["rssi"] is None
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_handles_invalid_channel(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
packet = {
|
|
"id": 321,
|
|
"rxTime": 999,
|
|
"fromId": "!abc",
|
|
"decoded": {
|
|
"payload": {"text": "hello"},
|
|
"portnum": "TEXT_MESSAGE_APP",
|
|
"channel": "not-a-number",
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["channel"] == 0
|
|
assert payload["encrypted"] is None
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_skips_direct_message_on_primary_channel(
|
|
mesh_module, monkeypatch
|
|
):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
packet = {
|
|
"id": 111,
|
|
"rxTime": 777,
|
|
"fromId": "!sender",
|
|
"toId": "!recipient",
|
|
"channel": 0,
|
|
"decoded": {"text": "secret dm", "portnum": "TEXT_MESSAGE_APP"},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert not captured
|
|
|
|
|
|
def test_store_packet_dict_allows_primary_channel_broadcast(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 915
|
|
mesh.config.MODEM_PRESET = "LongSlow"
|
|
|
|
packet = {
|
|
"id": 222,
|
|
"rxTime": 888,
|
|
"from": "!relay",
|
|
"to": "^all",
|
|
"channel": 0,
|
|
"decoded": {"text": "announcement", "portnum": "TEXT_MESSAGE_APP"},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["text"] == "announcement"
|
|
assert payload["to_id"] == "^all"
|
|
assert payload["channel"] == 0
|
|
assert payload["lora_freq"] == 915
|
|
assert payload["modem_preset"] == "LongSlow"
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_accepts_routing_app_messages(mesh_module, monkeypatch):
|
|
"""Ensure routing app payloads are treated as message posts."""
|
|
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
packet = {
|
|
"id": 333,
|
|
"rxTime": 999,
|
|
"fromId": "!node",
|
|
"toId": "^all",
|
|
"channel": 0,
|
|
"decoded": {"payload": "GAA=", "portnum": "ROUTING_APP"},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected routing packet to be stored"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["portnum"] == "ROUTING_APP"
|
|
assert payload["text"] == "GAA="
|
|
assert payload["channel"] == 0
|
|
assert payload["encrypted"] is None
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_serializes_routing_payloads(mesh_module, monkeypatch):
|
|
"""Ensure routing payloads are serialized when text is absent."""
|
|
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
packet = {
|
|
"id": 334,
|
|
"rxTime": 1000,
|
|
"fromId": "!node",
|
|
"toId": "^all",
|
|
"channel": 0,
|
|
"decoded": {
|
|
"payload": b"\x01\x02",
|
|
"portnum": "ROUTING_APP",
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected routing packet to be stored"
|
|
_, payload, _ = captured[0]
|
|
assert payload["text"] == "AQI="
|
|
|
|
captured.clear()
|
|
|
|
packet["decoded"]["payload"] = {"kind": "ack"}
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected routing packet to be stored"
|
|
_, payload, _ = captured[0]
|
|
assert payload["text"] == '{"kind": "ack"}'
|
|
|
|
captured.clear()
|
|
|
|
packet["decoded"]["portnum"] = 7
|
|
packet["decoded"]["payload"] = b"\x00"
|
|
packet["decoded"]["routing"] = {"errorReason": "NONE"}
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected numeric routing packet to be stored"
|
|
_, payload, _ = captured[0]
|
|
assert payload["text"] == "AA=="
|
|
|
|
|
|
def test_portnum_candidates_reads_enum_values(mesh_module, monkeypatch):
|
|
"""Ensure portnum candidates include enum and constants when available."""
|
|
|
|
mesh = mesh_module
|
|
module_name = "meshtastic.portnums_pb2"
|
|
|
|
class DummyPortNum:
|
|
@staticmethod
|
|
def Value(name):
|
|
if name == "ROUTING_APP":
|
|
return 7
|
|
raise KeyError(name)
|
|
|
|
dummy_module = types.SimpleNamespace(PortNum=DummyPortNum, ROUTING_APP=8)
|
|
monkeypatch.setitem(sys.modules, module_name, dummy_module)
|
|
|
|
candidates = mesh.handlers._portnum_candidates("ROUTING_APP")
|
|
|
|
assert 7 in candidates
|
|
assert 8 in candidates
|
|
|
|
|
|
def test_store_packet_dict_appends_channel_name(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
mesh.channels._reset_channel_cache()
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
class DummyInterface:
|
|
def __init__(self) -> None:
|
|
self.localNode = SimpleNamespace(
|
|
channels=[
|
|
SimpleNamespace(role=1, settings=SimpleNamespace()),
|
|
SimpleNamespace(
|
|
role=2,
|
|
index=5,
|
|
settings=SimpleNamespace(name="Chat"),
|
|
),
|
|
]
|
|
)
|
|
|
|
def waitForConfig(self) -> None: # noqa: D401 - matches interface contract
|
|
return None
|
|
|
|
mesh.channels.capture_from_interface(DummyInterface())
|
|
capsys.readouterr()
|
|
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
|
|
packet = {
|
|
"id": "789",
|
|
"rxTime": 123456,
|
|
"from": "!abc",
|
|
"to": "!def",
|
|
"channel": 5,
|
|
"decoded": {"text": "hi", "portnum": 1},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured, "Expected message to be stored"
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["channel_name"] == "Chat"
|
|
assert payload["channel"] == 5
|
|
assert payload["text"] == "hi"
|
|
assert payload["encrypted"] is None
|
|
assert payload["reply_id"] is None
|
|
assert payload["emoji"] is None
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
log_output = capsys.readouterr().out
|
|
assert "channel_name='Chat'" in log_output
|
|
assert "channel_display='Chat'" in log_output
|
|
|
|
|
|
def test_store_packet_dict_skips_hidden_channel(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
mesh.channels._reset_channel_cache()
|
|
mesh.config.MODEM_PRESET = None
|
|
|
|
class DummyInterface:
|
|
def __init__(self) -> None:
|
|
self.localNode = SimpleNamespace(
|
|
channels=[
|
|
SimpleNamespace(
|
|
role=1,
|
|
settings=SimpleNamespace(name="Primary"),
|
|
),
|
|
SimpleNamespace(
|
|
role=2,
|
|
index=5,
|
|
settings=SimpleNamespace(name="Chat"),
|
|
),
|
|
]
|
|
)
|
|
|
|
def waitForConfig(self):
|
|
return None
|
|
|
|
mesh.channels.capture_from_interface(DummyInterface())
|
|
capsys.readouterr()
|
|
|
|
captured: list[tuple[str, dict, int]] = []
|
|
ignored: list[str] = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
monkeypatch.setattr(
|
|
mesh.handlers,
|
|
"_record_ignored_packet",
|
|
lambda packet, *, reason: ignored.append(reason),
|
|
)
|
|
|
|
previous_debug = mesh.config.DEBUG
|
|
previous_hidden = mesh.HIDDEN_CHANNELS
|
|
previous_allowed = mesh.ALLOWED_CHANNELS
|
|
mesh.config.DEBUG = True
|
|
mesh.DEBUG = True
|
|
mesh.ALLOWED_CHANNELS = ("Chat",)
|
|
mesh.HIDDEN_CHANNELS = ("Chat",)
|
|
|
|
try:
|
|
packet = {
|
|
"id": "999",
|
|
"rxTime": 24_680,
|
|
"from": "!sender",
|
|
"to": "^all",
|
|
"channel": 5,
|
|
"decoded": {"text": "hidden msg", "portnum": 1},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured == []
|
|
assert ignored == ["hidden-channel"]
|
|
assert "Ignored packet on hidden channel" in capsys.readouterr().out
|
|
finally:
|
|
mesh.HIDDEN_CHANNELS = previous_hidden
|
|
mesh.ALLOWED_CHANNELS = previous_allowed
|
|
mesh.config.DEBUG = previous_debug
|
|
mesh.DEBUG = previous_debug
|
|
|
|
|
|
def test_store_packet_dict_skips_disallowed_channel(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
mesh.channels._reset_channel_cache()
|
|
mesh.config.MODEM_PRESET = None
|
|
|
|
class DummyInterface:
|
|
def __init__(self) -> None:
|
|
self.localNode = SimpleNamespace(
|
|
channels=[
|
|
SimpleNamespace(
|
|
role=1,
|
|
settings=SimpleNamespace(name="Primary"),
|
|
),
|
|
SimpleNamespace(
|
|
role=2,
|
|
index=5,
|
|
settings=SimpleNamespace(name="Chat"),
|
|
),
|
|
]
|
|
)
|
|
|
|
def waitForConfig(self):
|
|
return None
|
|
|
|
mesh.channels.capture_from_interface(DummyInterface())
|
|
capsys.readouterr()
|
|
|
|
captured: list[tuple[str, dict, int]] = []
|
|
ignored: list[str] = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
monkeypatch.setattr(
|
|
mesh.handlers,
|
|
"_record_ignored_packet",
|
|
lambda packet, *, reason: ignored.append(reason),
|
|
)
|
|
|
|
previous_debug = mesh.config.DEBUG
|
|
previous_allowed = mesh.ALLOWED_CHANNELS
|
|
previous_hidden = mesh.HIDDEN_CHANNELS
|
|
mesh.config.DEBUG = True
|
|
mesh.DEBUG = True
|
|
mesh.ALLOWED_CHANNELS = ("Primary",)
|
|
mesh.HIDDEN_CHANNELS = ()
|
|
|
|
try:
|
|
packet = {
|
|
"id": "1001",
|
|
"rxTime": 25_680,
|
|
"from": "!sender",
|
|
"to": "^all",
|
|
"channel": 5,
|
|
"decoded": {"text": "disallowed msg", "portnum": 1},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured == []
|
|
assert ignored == ["disallowed-channel"]
|
|
assert "Ignored packet on disallowed channel" in capsys.readouterr().out
|
|
finally:
|
|
mesh.ALLOWED_CHANNELS = previous_allowed
|
|
mesh.HIDDEN_CHANNELS = previous_hidden
|
|
mesh.config.DEBUG = previous_debug
|
|
mesh.DEBUG = previous_debug
|
|
|
|
|
|
def test_store_packet_dict_includes_encrypted_payload(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
packet = {
|
|
"id": 555,
|
|
"rxTime": 111,
|
|
"from": 2988082812,
|
|
"to": "!receiver",
|
|
"channel": 8,
|
|
"encrypted": "abc123==",
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/messages"
|
|
assert payload["encrypted"] == "abc123=="
|
|
assert payload["text"] is None
|
|
assert payload["from_id"] == 2988082812
|
|
assert payload["to_id"] == "!receiver"
|
|
assert payload["reply_id"] is None
|
|
assert payload["emoji"] is None
|
|
assert "channel_name" not in payload
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert priority == mesh._MESSAGE_POST_PRIORITY
|
|
|
|
|
|
def test_store_packet_dict_handles_telemetry_packet(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
mesh.register_host_node_id("!f00dbabe")
|
|
|
|
packet = {
|
|
"id": 1_256_091_342,
|
|
"rxTime": 1_758_024_300,
|
|
"fromId": "!9e95cf60",
|
|
"toId": "^all",
|
|
"decoded": {
|
|
"portnum": "TELEMETRY_APP",
|
|
"bitfield": 1,
|
|
"telemetry": {
|
|
"time": 1_758_024_300,
|
|
"deviceMetrics": {
|
|
"batteryLevel": 101,
|
|
"voltage": 4.224,
|
|
"channelUtilization": 0.59666663,
|
|
"airUtilTx": 0.03908333,
|
|
"uptimeSeconds": 305044,
|
|
"current": 0.0715,
|
|
},
|
|
"localStats": {
|
|
"numPacketsTx": 1280,
|
|
"numPacketsRx": 1425,
|
|
},
|
|
},
|
|
"payload": {
|
|
"__bytes_b64__": "DTVr0mgSFQhlFQIrh0AdJb8YPyXYFSA9KJTPEg==",
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/telemetry"
|
|
assert priority == mesh._TELEMETRY_POST_PRIORITY
|
|
assert payload["id"] == 1_256_091_342
|
|
assert payload["node_id"] == "!9e95cf60"
|
|
assert payload["from_id"] == "!9e95cf60"
|
|
assert payload["rx_time"] == 1_758_024_300
|
|
assert payload["telemetry_time"] == 1_758_024_300
|
|
assert payload["channel"] == 0
|
|
assert payload["bitfield"] == 1
|
|
assert payload["payload_b64"] == "DTVr0mgSFQhlFQIrh0AdJb8YPyXYFSA9KJTPEg=="
|
|
assert payload["battery_level"] == pytest.approx(101.0)
|
|
assert payload["voltage"] == pytest.approx(4.224)
|
|
assert payload["channel_utilization"] == pytest.approx(0.59666663)
|
|
assert payload["air_util_tx"] == pytest.approx(0.03908333)
|
|
assert payload["uptime_seconds"] == 305044
|
|
assert payload["current"] == pytest.approx(0.0715)
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
assert payload["ingestor"] == "!f00dbabe"
|
|
|
|
|
|
def test_store_packet_dict_handles_environment_telemetry(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
packet = {
|
|
"id": 2_817_720_548,
|
|
"rxTime": 1_758_024_400,
|
|
"from": 3_698_627_780,
|
|
"decoded": {
|
|
"portnum": "TELEMETRY_APP",
|
|
"telemetry": {
|
|
"time": 1_758_024_390,
|
|
"environmentMetrics": {
|
|
"temperature": 21.98,
|
|
"relativeHumidity": 39.475586,
|
|
"barometricPressure": 1017.8353,
|
|
"gasResistance": 1456.0,
|
|
"iaq": 83,
|
|
"distance": 12.5,
|
|
"lux": 100.25,
|
|
"whiteLux": 64.5,
|
|
"irLux": 12.75,
|
|
"uvLux": 1.6,
|
|
"windDirection": 270,
|
|
"windSpeed": 5.9,
|
|
"windGust": 7.4,
|
|
"windLull": 4.8,
|
|
"weight": 32.7,
|
|
"radiation": 0.45,
|
|
"rainfall1h": 0.18,
|
|
"rainfall24h": 1.42,
|
|
"soilMoisture": 3100,
|
|
"soilTemperature": 18.9,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/telemetry"
|
|
assert payload["id"] == 2_817_720_548
|
|
assert payload["node_id"] == "!dc7494c4"
|
|
assert payload["from_id"] == "!dc7494c4"
|
|
assert payload["telemetry_time"] == 1_758_024_390
|
|
assert payload["temperature"] == pytest.approx(21.98)
|
|
assert payload["relative_humidity"] == pytest.approx(39.475586)
|
|
assert payload["barometric_pressure"] == pytest.approx(1017.8353)
|
|
assert payload["gas_resistance"] == pytest.approx(1456.0)
|
|
assert payload["iaq"] == 83
|
|
assert payload["distance"] == pytest.approx(12.5)
|
|
assert payload["lux"] == pytest.approx(100.25)
|
|
assert payload["white_lux"] == pytest.approx(64.5)
|
|
assert payload["ir_lux"] == pytest.approx(12.75)
|
|
assert payload["uv_lux"] == pytest.approx(1.6)
|
|
assert payload["wind_direction"] == 270
|
|
assert payload["wind_speed"] == pytest.approx(5.9)
|
|
assert payload["wind_gust"] == pytest.approx(7.4)
|
|
assert payload["wind_lull"] == pytest.approx(4.8)
|
|
assert payload["weight"] == pytest.approx(32.7)
|
|
assert payload["radiation"] == pytest.approx(0.45)
|
|
assert payload["rainfall_1h"] == pytest.approx(0.18)
|
|
assert payload["rainfall_24h"] == pytest.approx(1.42)
|
|
assert payload["soil_moisture"] == 3100
|
|
assert payload["soil_temperature"] == pytest.approx(18.9)
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
|
|
|
|
def test_store_packet_dict_throttles_host_telemetry(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
logs = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
monkeypatch.setattr(
|
|
mesh.config,
|
|
"_debug_log",
|
|
lambda message, **metadata: logs.append((message, metadata)),
|
|
)
|
|
|
|
mesh.register_host_node_id("!9e95cf60")
|
|
|
|
base_packet = {
|
|
"id": 1_234,
|
|
"fromId": "!9e95cf60",
|
|
"decoded": {
|
|
"portnum": "TELEMETRY_APP",
|
|
"telemetry": {
|
|
"time": 1_000,
|
|
"deviceMetrics": {
|
|
"batteryLevel": 50,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict({**base_packet, "rxTime": 1_000})
|
|
mesh.store_packet_dict({**base_packet, "id": 1_235, "rxTime": 1_300})
|
|
mesh.store_packet_dict({**base_packet, "id": 1_236, "rxTime": 4_700})
|
|
|
|
assert len(captured) == 2
|
|
first_path, first_payload, _ = captured[0]
|
|
second_path, second_payload, _ = captured[1]
|
|
assert first_path == "/api/telemetry"
|
|
assert second_path == "/api/telemetry"
|
|
assert first_payload["id"] == 1_234
|
|
assert second_payload["id"] == 1_236
|
|
|
|
suppression_logs = [
|
|
entry for entry in logs if entry[0] == "Suppressed host telemetry update"
|
|
]
|
|
assert suppression_logs
|
|
assert suppression_logs[0][1]["host_node_id"] == "!9e95cf60"
|
|
assert suppression_logs[0][1]["minutes_remaining"] == 55
|
|
|
|
|
|
def test_store_packet_dict_handles_traceroute_packet(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 915
|
|
mesh.config.MODEM_PRESET = "LongFast"
|
|
mesh.register_host_node_id("!f00dbabe")
|
|
|
|
packet = {
|
|
"id": 2_934_054_466,
|
|
"rxTime": 1_763_183_133,
|
|
"rssi": -70,
|
|
"snr": 10.25,
|
|
"fromId": "3664074452",
|
|
"decoded": {
|
|
"portnum": "PAXCOUNTER_APP",
|
|
"dest": "2660618080",
|
|
"traceroute": {
|
|
"requestId": 17,
|
|
"route": [3_663_643_096, "!beadf00d", "c0ffee99", 1_150_717_793],
|
|
"snrTowards": [42, -14, 41],
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/traces"
|
|
assert priority == mesh._TRACE_POST_PRIORITY
|
|
assert payload["id"] == packet["id"]
|
|
assert payload["request_id"] == 17
|
|
assert payload["src"] == 3_664_074_452
|
|
assert payload["dest"] == 2_660_618_080
|
|
assert payload["rx_time"] == 1_763_183_133
|
|
assert payload["rx_iso"] == "2025-11-15T05:05:33Z"
|
|
assert payload["hops"] == [
|
|
3_663_643_096,
|
|
3_199_070_221,
|
|
3_237_998_233,
|
|
1_150_717_793,
|
|
]
|
|
assert payload["rssi"] == -70
|
|
assert payload["snr"] == pytest.approx(10.25)
|
|
assert "elapsed_ms" in payload
|
|
assert payload["lora_freq"] == 915
|
|
assert payload["modem_preset"] == "LongFast"
|
|
assert payload["ingestor"] == "!f00dbabe"
|
|
|
|
|
|
def test_traceroute_hop_normalization_supports_mappings(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
packet = {
|
|
"id": 1_111,
|
|
"decoded": {
|
|
"portnum": "TRACEROUTE_APP",
|
|
"traceroute": {
|
|
"requestId": 42,
|
|
"route": [{"node_id": "!beadf00d"}, {"num": "0xc0ffee99"}, {"id": 123}],
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
_, payload, _ = captured[0]
|
|
assert payload["hops"] == [0xBEADF00D, 0xC0FFEE99, 123]
|
|
|
|
|
|
def test_traceroute_packet_without_identifiers_is_ignored(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
packet = {
|
|
"decoded": {
|
|
"portnum": "TRACEROUTE_APP",
|
|
"traceroute": {},
|
|
},
|
|
"rxTime": 123,
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured == []
|
|
|
|
|
|
def test_post_queue_prioritises_messages(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
mesh._clear_post_queue()
|
|
calls = []
|
|
|
|
def record(path, payload):
|
|
calls.append((path, payload))
|
|
|
|
monkeypatch.setattr(mesh, "_post_json", record)
|
|
|
|
mesh._enqueue_post_json("/api/messages", {"id": 1}, mesh._MESSAGE_POST_PRIORITY)
|
|
mesh._enqueue_post_json(
|
|
"/api/nodes", {"!node": {"foo": "bar"}}, mesh._NODE_POST_PRIORITY
|
|
)
|
|
|
|
mesh._drain_post_queue()
|
|
|
|
assert [path for path, _ in calls] == ["/api/messages", "/api/nodes"]
|
|
|
|
|
|
def test_drain_post_queue_handles_enqueued_items_during_send(mesh_module):
|
|
mesh = mesh_module
|
|
mesh._clear_post_queue()
|
|
|
|
first_send_started = threading.Event()
|
|
second_item_enqueued = threading.Event()
|
|
second_item_processed = threading.Event()
|
|
calls = []
|
|
|
|
def blocking_send(path, payload):
|
|
calls.append((path, payload))
|
|
if path == "/api/first":
|
|
first_send_started.set()
|
|
assert second_item_enqueued.wait(timeout=2), "Second item was not enqueued"
|
|
elif path == "/api/second":
|
|
second_item_processed.set()
|
|
|
|
mesh._enqueue_post_json(
|
|
"/api/first",
|
|
{"id": 1},
|
|
mesh._DEFAULT_POST_PRIORITY,
|
|
state=mesh.STATE,
|
|
)
|
|
|
|
mesh.STATE.active = True
|
|
drain_thread = threading.Thread(
|
|
target=mesh._drain_post_queue,
|
|
kwargs={"state": mesh.STATE, "send": blocking_send},
|
|
)
|
|
drain_thread.start()
|
|
|
|
assert first_send_started.wait(
|
|
timeout=2
|
|
), "Drain did not begin processing the first item"
|
|
|
|
mesh._queue_post_json(
|
|
"/api/second",
|
|
{"id": 2},
|
|
state=mesh.STATE,
|
|
send=blocking_send,
|
|
)
|
|
second_item_enqueued.set()
|
|
|
|
assert second_item_processed.wait(timeout=2), "Second item was not processed"
|
|
|
|
drain_thread.join(timeout=2)
|
|
assert not drain_thread.is_alive(), "Drain thread did not finish"
|
|
assert [path for path, _ in calls] == ["/api/first", "/api/second"]
|
|
assert not mesh.STATE.queue
|
|
assert mesh.STATE.active is False
|
|
|
|
|
|
def test_store_packet_dict_requires_id(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
def fail_post(*_, **__):
|
|
raise AssertionError("Should not post without an id")
|
|
|
|
monkeypatch.setattr(mesh, "_queue_post_json", fail_post)
|
|
|
|
packet = {"decoded": {"payload": {"text": "hello"}, "portnum": "TEXT_MESSAGE_APP"}}
|
|
mesh.store_packet_dict(packet)
|
|
|
|
|
|
def test_on_receive_logs_when_store_fails(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
monkeypatch.setattr(mesh, "_pkt_to_dict", lambda pkt: {"id": 1})
|
|
|
|
def boom(*_, **__):
|
|
raise ValueError("boom")
|
|
|
|
monkeypatch.setattr(mesh, "store_packet_dict", boom)
|
|
|
|
mesh.on_receive(object(), interface=None)
|
|
|
|
captured = capsys.readouterr()
|
|
assert "context=handlers.on_receive" in captured.out
|
|
assert "Failed to store packet" in captured.out
|
|
|
|
|
|
def test_node_items_snapshot_iterable_without_items(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
class Iterable:
|
|
def __init__(self):
|
|
self._data = {"node": {"foo": "bar"}}
|
|
|
|
def __iter__(self):
|
|
return iter(self._data)
|
|
|
|
def __getitem__(self, key):
|
|
return self._data[key]
|
|
|
|
snapshot = mesh._node_items_snapshot(Iterable(), retries=1)
|
|
assert snapshot == [("node", {"foo": "bar"})]
|
|
|
|
|
|
def test_node_items_snapshot_handles_empty_input(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._node_items_snapshot(None) == []
|
|
assert mesh._node_items_snapshot({}) == []
|
|
|
|
|
|
def test_debug_log_emits_when_enabled(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
mesh._debug_log("hello world")
|
|
|
|
captured = capsys.readouterr()
|
|
lines = [line for line in captured.out.splitlines() if "hello world" in line]
|
|
assert lines, "expected debug log output"
|
|
log_line = lines[-1]
|
|
pattern = (
|
|
r"\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z\] \[potato-mesh\] \[debug\] "
|
|
)
|
|
assert re.match(pattern, log_line), f"unexpected log format: {log_line}"
|
|
assert log_line.endswith("hello world")
|
|
|
|
|
|
def test_event_wait_allows_default_timeout_handles_short_signature(
|
|
mesh_module, monkeypatch
|
|
):
|
|
mesh = mesh_module
|
|
|
|
def wait_without_timeout(self):
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
mesh.threading.Event, "wait", wait_without_timeout, raising=False
|
|
)
|
|
|
|
assert mesh._event_wait_allows_default_timeout() is True
|
|
|
|
|
|
def test_event_wait_allows_default_timeout_handles_varargs(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
def wait_with_varargs(self, *args):
|
|
return False
|
|
|
|
monkeypatch.setattr(mesh.threading.Event, "wait", wait_with_varargs, raising=False)
|
|
|
|
assert mesh._event_wait_allows_default_timeout() is True
|
|
|
|
|
|
def test_parse_ble_target_rejects_invalid_values(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._parse_ble_target("") is None
|
|
assert mesh._parse_ble_target(" ") is None
|
|
assert mesh._parse_ble_target("zz:zz:zz:zz:zz:zz") is None
|
|
|
|
|
|
def test_parse_ble_target_accepts_mac_addresses(mesh_module):
|
|
"""Test that _parse_ble_target accepts valid MAC address format (Linux/Windows)."""
|
|
mesh = mesh_module
|
|
|
|
# Valid MAC addresses should be accepted and normalized to uppercase
|
|
assert mesh._parse_ble_target("ED:4D:9E:95:CF:60") == "ED:4D:9E:95:CF:60"
|
|
assert mesh._parse_ble_target("ed:4d:9e:95:cf:60") == "ED:4D:9E:95:CF:60"
|
|
assert mesh._parse_ble_target("AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF"
|
|
assert mesh._parse_ble_target("00:11:22:33:44:55") == "00:11:22:33:44:55"
|
|
|
|
# With whitespace
|
|
assert mesh._parse_ble_target(" ED:4D:9E:95:CF:60 ") == "ED:4D:9E:95:CF:60"
|
|
|
|
# Invalid MAC addresses should be rejected
|
|
assert mesh._parse_ble_target("ED:4D:9E:95:CF") is None # Too short
|
|
assert mesh._parse_ble_target("ED:4D:9E:95:CF:60:AB") is None # Too long
|
|
assert mesh._parse_ble_target("GG:HH:II:JJ:KK:LL") is None # Invalid hex
|
|
|
|
|
|
def test_parse_ble_target_accepts_uuids(mesh_module):
|
|
"""Test that _parse_ble_target accepts valid UUID format (macOS)."""
|
|
mesh = mesh_module
|
|
|
|
# Valid UUIDs should be accepted and normalized to uppercase
|
|
assert (
|
|
mesh._parse_ble_target("C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E")
|
|
== "C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E"
|
|
)
|
|
assert (
|
|
mesh._parse_ble_target("c0aea92f-045e-9b82-c9a6-a1fd822b3a9e")
|
|
== "C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E"
|
|
)
|
|
assert (
|
|
mesh._parse_ble_target("12345678-1234-5678-9ABC-DEF012345678")
|
|
== "12345678-1234-5678-9ABC-DEF012345678"
|
|
)
|
|
|
|
# With whitespace
|
|
assert (
|
|
mesh._parse_ble_target(" C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E ")
|
|
== "C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E"
|
|
)
|
|
|
|
# Invalid UUIDs should be rejected
|
|
assert mesh._parse_ble_target("C0AEA92F-045E-9B82-C9A6") is None # Too short
|
|
assert (
|
|
mesh._parse_ble_target("C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E-EXTRA") is None
|
|
) # Too long
|
|
assert (
|
|
mesh._parse_ble_target("GGGGGGGG-GGGG-GGGG-GGGG-GGGGGGGGGGGG") is None
|
|
) # Invalid hex
|
|
assert (
|
|
mesh._parse_ble_target("C0AEA92F:045E:9B82:C9A6:A1FD822B3A9E") is None
|
|
) # Wrong separator
|
|
|
|
|
|
def test_parse_network_target_additional_cases(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._parse_network_target("") is None
|
|
assert mesh._parse_network_target(" ") is None
|
|
assert mesh._parse_network_target("tcp://example.com") is None
|
|
|
|
host, port = mesh._parse_network_target("tcp://10.1.2.3:abc")
|
|
assert (host, port) == ("10.1.2.3", mesh._DEFAULT_TCP_PORT)
|
|
|
|
host, port = mesh._parse_network_target("10.1.2.3:9001")
|
|
assert (host, port) == ("10.1.2.3", 9001)
|
|
|
|
|
|
def test_load_ble_interface_sets_global(monkeypatch):
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
monkeypatch.syspath_prepend(str(repo_root))
|
|
|
|
serial_interface_mod = types.ModuleType("meshtastic.serial_interface")
|
|
|
|
class DummySerial:
|
|
def __init__(self, *_, **__):
|
|
pass
|
|
|
|
serial_interface_mod.SerialInterface = DummySerial
|
|
|
|
tcp_interface_mod = types.ModuleType("meshtastic.tcp_interface")
|
|
tcp_interface_mod.TCPInterface = DummySerial
|
|
|
|
ble_interface_mod = types.ModuleType("meshtastic.ble_interface")
|
|
|
|
class DummyBLE:
|
|
def __init__(self, *_, **__):
|
|
pass
|
|
|
|
ble_interface_mod.BLEInterface = DummyBLE
|
|
|
|
meshtastic_mod = types.ModuleType("meshtastic")
|
|
meshtastic_mod.serial_interface = serial_interface_mod
|
|
meshtastic_mod.tcp_interface = tcp_interface_mod
|
|
meshtastic_mod.ble_interface = ble_interface_mod
|
|
|
|
monkeypatch.setitem(sys.modules, "meshtastic", meshtastic_mod)
|
|
monkeypatch.setitem(
|
|
sys.modules, "meshtastic.serial_interface", serial_interface_mod
|
|
)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.tcp_interface", tcp_interface_mod)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.ble_interface", ble_interface_mod)
|
|
|
|
module_name = "data.mesh"
|
|
module = (
|
|
importlib.import_module(module_name)
|
|
if module_name not in sys.modules
|
|
else importlib.reload(sys.modules[module_name])
|
|
)
|
|
|
|
monkeypatch.setattr(module, "BLEInterface", None)
|
|
|
|
resolved = module._load_ble_interface()
|
|
|
|
assert resolved is ble_interface_mod.BLEInterface
|
|
assert module.BLEInterface is ble_interface_mod.BLEInterface
|
|
|
|
|
|
def test_default_serial_targets_deduplicates(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
def fake_glob(pattern):
|
|
if pattern == "/dev/ttyUSB*":
|
|
return ["/dev/ttyUSB0", "/dev/ttyUSB0"]
|
|
if pattern == "/dev/ttyACM*":
|
|
return ["/dev/ttyACM1"]
|
|
return []
|
|
|
|
monkeypatch.setattr(mesh.interfaces.glob, "glob", fake_glob)
|
|
|
|
targets = mesh._default_serial_targets()
|
|
|
|
assert targets.count("/dev/ttyUSB0") == 1
|
|
assert "/dev/ttyACM1" in targets
|
|
assert "/dev/ttyACM0" in targets
|
|
|
|
|
|
def test_post_json_logs_failures(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "INSTANCE", "https://example.invalid")
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
|
|
def boom(*_, **__):
|
|
raise RuntimeError("offline")
|
|
|
|
monkeypatch.setattr(mesh.queue.urllib.request, "urlopen", boom)
|
|
|
|
mesh._post_json("/api/test", {"foo": "bar"})
|
|
|
|
captured = capsys.readouterr()
|
|
assert "context=queue.post_json" in captured.out
|
|
assert "POST request failed" in captured.out
|
|
|
|
|
|
def test_queue_post_json_logs_payload_details(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
mesh._clear_post_queue()
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
|
|
mesh._queue_post_json(
|
|
"/api/test",
|
|
{"alpha": "beta", "count": 7},
|
|
send=lambda *_: None,
|
|
)
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Forwarding payload to API" in out
|
|
assert 'alpha="beta"' in out
|
|
assert "count=7" in out
|
|
|
|
|
|
def test_queue_post_json_skips_when_active(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
mesh._clear_post_queue()
|
|
mesh.STATE.active = True
|
|
|
|
mesh._queue_post_json("/api/test", {"id": 1})
|
|
|
|
assert mesh.STATE.active is True
|
|
assert mesh.STATE.queue
|
|
mesh._clear_post_queue()
|
|
|
|
|
|
def test_process_ingestor_heartbeat_updates_flag(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
mesh.ingestors.STATE.last_heartbeat = None
|
|
mesh.ingestors.STATE.node_id = None
|
|
mesh.handlers.register_host_node_id(None)
|
|
recorded = {"force": None, "count": 0}
|
|
|
|
def fake_queue_ingestor_heartbeat(*, force):
|
|
recorded["force"] = force
|
|
recorded["count"] += 1
|
|
return True
|
|
|
|
monkeypatch.setattr(
|
|
mesh.ingestors, "queue_ingestor_heartbeat", fake_queue_ingestor_heartbeat
|
|
)
|
|
|
|
class DummyIface:
|
|
def __init__(self):
|
|
self.myNodeNum = 0xCAFEBABE
|
|
|
|
updated = mesh._process_ingestor_heartbeat(
|
|
DummyIface(), ingestor_announcement_sent=False
|
|
)
|
|
|
|
assert updated is True
|
|
assert recorded["force"] is True
|
|
assert recorded["count"] == 1
|
|
assert mesh.handlers.host_node_id() == "!cafebabe"
|
|
|
|
|
|
def test_process_ingestor_heartbeat_skips_without_host(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
mesh.handlers.register_host_node_id(None)
|
|
mesh.ingestors.STATE.node_id = None
|
|
mesh.ingestors.STATE.last_heartbeat = None
|
|
|
|
monkeypatch.setattr(mesh.ingestors, "queue_ingestor_heartbeat", lambda **_: False)
|
|
|
|
updated = mesh._process_ingestor_heartbeat(None, ingestor_announcement_sent=False)
|
|
|
|
assert updated is False
|
|
assert mesh.ingestors.STATE.node_id is None
|
|
assert mesh.ingestors.STATE.last_heartbeat is None
|
|
|
|
|
|
def test_ingestor_heartbeat_respects_interval_override(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
mesh.ingestors.STATE.start_time = 100
|
|
mesh.ingestors.STATE.last_heartbeat = 1_000
|
|
mesh.ingestors.STATE.node_id = "!abcd0001"
|
|
mesh._INGESTOR_HEARTBEAT_SECS = 10_000
|
|
monkeypatch.setattr(mesh.ingestors.time, "time", lambda: 2_000)
|
|
sent = mesh.ingestors.queue_ingestor_heartbeat()
|
|
assert sent is False
|
|
assert mesh.ingestors.STATE.last_heartbeat == 1_000
|
|
|
|
|
|
def test_setting_ingestor_attr_propagates(mesh_module):
|
|
mesh = mesh_module
|
|
mesh._INGESTOR_HEARTBEAT_SECS = 123
|
|
assert mesh.config._INGESTOR_HEARTBEAT_SECS == 123
|
|
|
|
|
|
def test_queue_ingestor_heartbeat_requires_node_id(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
|
|
monkeypatch.setattr(
|
|
mesh.queue,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority, send=None: captured.append(
|
|
(path, payload, priority)
|
|
),
|
|
)
|
|
|
|
mesh.ingestors.STATE.node_id = None
|
|
mesh.ingestors.STATE.last_heartbeat = None
|
|
|
|
queued = mesh.ingestors.queue_ingestor_heartbeat(force=True)
|
|
|
|
assert queued is False
|
|
assert captured == []
|
|
|
|
|
|
def test_queue_ingestor_heartbeat_enqueues_and_throttles(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
|
|
monkeypatch.setattr(
|
|
mesh.queue,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority, send=None: captured.append(
|
|
(path, payload, priority)
|
|
),
|
|
)
|
|
|
|
mesh.ingestors.STATE.start_time = 1_700_000_000
|
|
mesh.ingestors.STATE.last_heartbeat = None
|
|
mesh.ingestors.STATE.node_id = None
|
|
mesh.config.LORA_FREQ = 915
|
|
mesh.config.MODEM_PRESET = "LongFast"
|
|
|
|
mesh.ingestors.set_ingestor_node_id("!CAFEBABE")
|
|
first = mesh.ingestors.queue_ingestor_heartbeat(force=True)
|
|
second = mesh.ingestors.queue_ingestor_heartbeat()
|
|
|
|
assert first is True
|
|
assert second is False
|
|
assert len(captured) == 1
|
|
path, payload, priority = captured[0]
|
|
assert path == "/api/ingestors"
|
|
assert payload["node_id"] == "!cafebabe"
|
|
assert payload["start_time"] == 1_700_000_000
|
|
assert payload["last_seen_time"] >= payload["start_time"]
|
|
assert payload["version"] == mesh.VERSION
|
|
assert payload["lora_freq"] == 915
|
|
assert payload["modem_preset"] == "LongFast"
|
|
assert priority == mesh.queue._INGESTOR_POST_PRIORITY
|
|
|
|
|
|
def test_mesh_version_export_matches_package(mesh_module):
|
|
import data
|
|
|
|
mesh = mesh_module
|
|
assert mesh.VERSION == data.VERSION
|
|
|
|
|
|
def test_node_to_dict_handles_proto_fallback(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
class FailingProto(mesh.ProtoMessage):
|
|
def to_dict(self):
|
|
raise RuntimeError("boom")
|
|
|
|
def __str__(self):
|
|
return "proto"
|
|
|
|
def fail_message_to_dict(*_, **__):
|
|
raise RuntimeError("nope")
|
|
|
|
monkeypatch.setattr(mesh, "MessageToDict", fail_message_to_dict)
|
|
monkeypatch.setattr(
|
|
mesh.json, "dumps", lambda *_, **__: (_ for _ in ()).throw(TypeError())
|
|
)
|
|
|
|
converted = mesh._node_to_dict({"value": FailingProto()})
|
|
|
|
assert converted["value"] == "proto"
|
|
|
|
|
|
def test_upsert_node_logs_in_debug(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
captured = []
|
|
|
|
def fake_queue(path, payload, *, priority):
|
|
captured.append((path, payload, priority))
|
|
|
|
monkeypatch.setattr(mesh, "_queue_post_json", fake_queue)
|
|
|
|
mesh.upsert_node("!node", {"user": {"shortName": "SN", "longName": "LN"}})
|
|
|
|
assert captured
|
|
out = capsys.readouterr().out
|
|
assert "context=handlers.upsert_node" in out
|
|
assert "Queued node upsert payload" in out
|
|
|
|
|
|
def test_store_packet_dict_records_ignored_packets(mesh_module, monkeypatch, tmp_path):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
ignored_path = tmp_path / "ignored.txt"
|
|
monkeypatch.setattr(mesh.handlers, "_IGNORED_PACKET_LOG_PATH", ignored_path)
|
|
monkeypatch.setattr(mesh.handlers, "_IGNORED_PACKET_LOCK", threading.Lock())
|
|
|
|
packet = {"decoded": {"portnum": "UNKNOWN"}}
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert ignored_path.exists()
|
|
lines = ignored_path.read_text(encoding="utf-8").strip().splitlines()
|
|
assert lines
|
|
payload = json.loads(lines[-1])
|
|
assert payload["reason"] == "unsupported-port"
|
|
assert payload["packet"]["decoded"]["portnum"] == "UNKNOWN"
|
|
|
|
|
|
def test_coerce_int_and_float_cover_edge_cases(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._coerce_int(None) is None
|
|
assert mesh._coerce_int(True) == 1
|
|
assert mesh._coerce_int(7) == 7
|
|
assert mesh._coerce_int(3.2) == 3
|
|
assert mesh._coerce_int(float("inf")) is None
|
|
assert mesh._coerce_int(" 0x10 ") == 16
|
|
assert mesh._coerce_int(" ") is None
|
|
assert mesh._coerce_int("7.0") == 7
|
|
assert mesh._coerce_int("nan") is None
|
|
|
|
class Intable:
|
|
def __int__(self):
|
|
return 9
|
|
|
|
class BadInt:
|
|
def __int__(self):
|
|
raise TypeError
|
|
|
|
assert mesh._coerce_int(Intable()) == 9
|
|
assert mesh._coerce_int(BadInt()) is None
|
|
|
|
assert mesh._coerce_float(None) is None
|
|
assert mesh._coerce_float(True) == 1.0
|
|
assert mesh._coerce_float(3) == 3.0
|
|
assert mesh._coerce_float(float("inf")) is None
|
|
assert mesh._coerce_float(" 1.5 ") == 1.5
|
|
assert mesh._coerce_float(" ") is None
|
|
assert mesh._coerce_float("nan") is None
|
|
|
|
class Floatable:
|
|
def __float__(self):
|
|
return 2.5
|
|
|
|
class BadFloat:
|
|
def __float__(self):
|
|
raise TypeError
|
|
|
|
assert mesh._coerce_float(Floatable()) == 2.5
|
|
assert mesh._coerce_float(BadFloat()) is None
|
|
|
|
|
|
def test_canonical_node_id_variants(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._canonical_node_id(None) is None
|
|
assert mesh._canonical_node_id(0x1234) == "!00001234"
|
|
assert mesh._canonical_node_id(" ") is None
|
|
assert mesh._canonical_node_id("!deadbeef") == "!deadbeef"
|
|
assert mesh._canonical_node_id("0xCAFEBABE") == "!cafebabe"
|
|
assert mesh._canonical_node_id("12345") == "!00003039"
|
|
assert mesh._canonical_node_id("nothex") is None
|
|
|
|
|
|
def test_node_num_from_id_variants(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._node_num_from_id(None) is None
|
|
assert mesh._node_num_from_id(42) == 42
|
|
assert mesh._node_num_from_id(-1) is None
|
|
assert mesh._node_num_from_id(" ") is None
|
|
assert mesh._node_num_from_id("!00ff") == 0xFF
|
|
assert mesh._node_num_from_id("0x10") == 16
|
|
assert mesh._node_num_from_id("123") == 0x123
|
|
assert mesh._node_num_from_id("bad") == int("bad", 16)
|
|
|
|
|
|
def test_merge_mappings_handles_non_mappings(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
@dataclass
|
|
class UserBase:
|
|
id: str
|
|
|
|
@dataclass
|
|
class UserExtra:
|
|
name: str
|
|
|
|
@dataclass
|
|
class Holder:
|
|
user: object
|
|
|
|
base = Holder(UserBase("!1"))
|
|
extra = Holder(UserExtra("Node"))
|
|
|
|
merged = mesh._merge_mappings(base, extra)
|
|
|
|
assert merged == {"user": {"id": "!1", "name": "Node"}}
|
|
|
|
|
|
def test_extract_payload_bytes_edge_cases(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
assert mesh._extract_payload_bytes(None) is None
|
|
assert (
|
|
mesh._extract_payload_bytes({"payload": {"__bytes_b64__": "invalid"}}) is None
|
|
)
|
|
assert mesh._extract_payload_bytes({"payload": b"data"}) == b"data"
|
|
assert mesh._extract_payload_bytes({"payload": "ZGF0YQ=="}) == b"data"
|
|
|
|
|
|
def test_decode_nodeinfo_payload_handles_user(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
user = mesh_pb2.User()
|
|
user.id = "!01020304"
|
|
payload = user.SerializeToString()
|
|
|
|
def raise_decode(self, *_):
|
|
raise mesh.DecodeError("fail")
|
|
|
|
monkeypatch.setattr(
|
|
mesh_pb2.NodeInfo, "ParseFromString", raise_decode, raising=False
|
|
)
|
|
|
|
node_info = mesh._decode_nodeinfo_payload(payload)
|
|
|
|
assert node_info is not None
|
|
assert node_info.user.id == "!01020304"
|
|
|
|
|
|
def test_nodeinfo_helpers_cover_fallbacks(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
node_info = mesh_pb2.NodeInfo()
|
|
node_info.device_metrics.battery_level = 50
|
|
node_info.position.latitude_i = int(1.23 * 1e7)
|
|
node_info.position.longitude_i = int(4.56 * 1e7)
|
|
node_info.position.location_source = 99
|
|
|
|
monkeypatch.setattr(
|
|
mesh_pb2.Position.LocSource,
|
|
"Name",
|
|
lambda value: (_ for _ in ()).throw(RuntimeError()),
|
|
raising=False,
|
|
)
|
|
|
|
metrics = mesh._nodeinfo_metrics_dict(node_info)
|
|
position = mesh._nodeinfo_position_dict(node_info)
|
|
|
|
assert metrics["batteryLevel"] == 50.0
|
|
assert position["locationSource"] == 99
|
|
|
|
class DummyProto(mesh.ProtoMessage):
|
|
def __init__(self):
|
|
self.id = "!11223344"
|
|
|
|
def __str__(self):
|
|
return "dummy-proto"
|
|
|
|
def to_dict(self):
|
|
return {"id": self.id}
|
|
|
|
def raise_message_to_dict(*_, **__):
|
|
raise RuntimeError()
|
|
|
|
monkeypatch.setattr(mesh, "MessageToDict", raise_message_to_dict)
|
|
|
|
user = mesh._nodeinfo_user_dict(node_info, DummyProto())
|
|
|
|
assert user["id"] == "!11223344"
|
|
|
|
|
|
def test_nodeinfo_user_role_falls_back_to_cli_enum(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
mesh._reset_cli_role_cache()
|
|
|
|
cli_module = types.ModuleType("meshtastic.cli")
|
|
cli_common = types.ModuleType("meshtastic.cli.common")
|
|
|
|
class DummyRole(enum.IntEnum):
|
|
CLIENT = 0
|
|
CLIENT_BASE = 12
|
|
|
|
cli_common.Role = DummyRole
|
|
cli_module.common = cli_common
|
|
|
|
monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_module)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_common)
|
|
|
|
user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12})
|
|
|
|
assert user["role"] == "CLIENT_BASE"
|
|
|
|
mesh._reset_cli_role_cache()
|
|
|
|
cli_dict_module = types.ModuleType("meshtastic.cli")
|
|
cli_dict_common = types.ModuleType("meshtastic.cli.common")
|
|
cli_dict_common.ClientRoles = {12: "client_hidden"}
|
|
cli_dict_module.common = cli_dict_common
|
|
|
|
monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_dict_module)
|
|
monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_dict_common)
|
|
|
|
user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12})
|
|
|
|
assert user["role"] == "CLIENT_HIDDEN"
|
|
|
|
mesh._reset_cli_role_cache()
|
|
|
|
|
|
def test_store_position_packet_defaults(mesh_module, monkeypatch):
|
|
mesh = mesh_module
|
|
captured = []
|
|
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append((path, payload, priority)),
|
|
)
|
|
|
|
mesh.config.LORA_FREQ = 868
|
|
mesh.config.MODEM_PRESET = "MediumFast"
|
|
|
|
packet = {"id": "7", "rxTime": "", "from": "!abcd", "to": "", "decoded": {}}
|
|
|
|
mesh.store_position_packet(packet, {})
|
|
|
|
assert captured
|
|
_, payload, _ = captured[0]
|
|
assert payload["node_id"] == "!0000abcd"
|
|
assert payload["node_num"] == int("abcd", 16)
|
|
assert payload["to_id"] is None
|
|
assert payload["latitude"] is None
|
|
assert payload["longitude"] is None
|
|
assert payload["lora_freq"] == 868
|
|
assert payload["modem_preset"] == "MediumFast"
|
|
|
|
|
|
def test_store_nodeinfo_packet_debug(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
monkeypatch.setattr(mesh, "_queue_post_json", lambda *_, **__: None)
|
|
|
|
from meshtastic.protobuf import mesh_pb2
|
|
|
|
node_info = mesh_pb2.NodeInfo()
|
|
user = node_info.user
|
|
user.id = "!01020304"
|
|
user.short_name = "A"
|
|
user.long_name = "B"
|
|
node_info.channel = 1
|
|
node_info.via_mqtt = True
|
|
node_info.is_ignored = True
|
|
node_info.is_key_manually_verified = True
|
|
|
|
payload = {
|
|
"__bytes_b64__": base64.b64encode(node_info.SerializeToString()).decode()
|
|
}
|
|
|
|
packet = {
|
|
"id": 1,
|
|
"rxTime": 1,
|
|
"decoded": {"portnum": "NODEINFO_APP", "payload": payload},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
out = capsys.readouterr().out
|
|
assert "context=handlers.store_nodeinfo" in out
|
|
assert "Queued nodeinfo payload" in out
|
|
|
|
|
|
def test_store_neighborinfo_packet_debug(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
captured = []
|
|
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append(payload),
|
|
)
|
|
|
|
packet = {
|
|
"id": 1,
|
|
"rxTime": 2,
|
|
"fromId": "!12345678",
|
|
"decoded": {
|
|
"portnum": "NEIGHBORINFO_APP",
|
|
"neighborinfo": {
|
|
"nodeId": 0x12345678,
|
|
"neighbors": [],
|
|
},
|
|
},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
out = capsys.readouterr().out
|
|
assert "context=handlers.store_neighborinfo" in out
|
|
assert "Queued neighborinfo payload" in out
|
|
|
|
|
|
def test_store_packet_dict_debug_message(mesh_module, monkeypatch, capsys):
|
|
mesh = mesh_module
|
|
|
|
monkeypatch.setattr(mesh, "DEBUG", True)
|
|
captured = []
|
|
|
|
monkeypatch.setattr(
|
|
mesh,
|
|
"_queue_post_json",
|
|
lambda path, payload, *, priority: captured.append(payload),
|
|
)
|
|
|
|
packet = {
|
|
"id": 2,
|
|
"rxTime": 10,
|
|
"fromId": "!abc",
|
|
"decoded": {"payload": {"text": "hi"}, "portnum": "TEXT_MESSAGE_APP"},
|
|
}
|
|
|
|
mesh.store_packet_dict(packet)
|
|
|
|
assert captured
|
|
out = capsys.readouterr().out
|
|
assert "context=handlers.store_packet_dict" in out
|
|
assert "Queued message payload" in out
|
|
assert "channel_display=0" in out
|
|
assert "channel_name=" not in out
|
|
|
|
|
|
def test_on_receive_skips_seen_packets(mesh_module):
|
|
mesh = mesh_module
|
|
|
|
packet = {"_potatomesh_seen": True}
|
|
mesh.on_receive(packet, interface=None)
|
|
|
|
assert packet["_potatomesh_seen"] is True
|