# Copyright © 2025-26 l5yth & contributors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import enum import importlib import json import re import sys import threading import types import time """End-to-end tests covering the mesh ingestion package.""" from dataclasses import dataclass from pathlib import Path from types import SimpleNamespace from meshtastic_protobuf_stub import build as build_protobuf_stub import pytest @pytest.fixture def mesh_module(monkeypatch): """Import :mod:`data.mesh` with stubbed dependencies.""" repo_root = Path(__file__).resolve().parents[1] monkeypatch.syspath_prepend(str(repo_root)) try: import meshtastic as real_meshtastic # type: ignore except Exception: # pragma: no cover - dependency may be unavailable in CI real_meshtastic = None real_protobuf = ( getattr(real_meshtastic, "protobuf", None) if real_meshtastic else None ) # Prefer real google.protobuf modules when available, otherwise provide stubs try: from google.protobuf import json_format as json_format_mod # type: ignore from google.protobuf import message as message_mod # type: ignore except Exception: # pragma: no cover - protobuf may be missing in CI json_format_mod = types.ModuleType("google.protobuf.json_format") def message_to_dict(obj, *_, **__): if hasattr(obj, "to_dict"): return obj.to_dict() if hasattr(obj, "__dict__"): return dict(obj.__dict__) return {} json_format_mod.MessageToDict = message_to_dict message_mod = types.ModuleType("google.protobuf.message") class DummyProtoMessage: pass class DummyDecodeError(Exception): pass message_mod.Message = DummyProtoMessage message_mod.DecodeError = DummyDecodeError protobuf_mod = types.ModuleType("google.protobuf") protobuf_mod.json_format = json_format_mod protobuf_mod.message = message_mod google_mod = types.ModuleType("google") google_mod.protobuf = protobuf_mod monkeypatch.setitem(sys.modules, "google", google_mod) monkeypatch.setitem(sys.modules, "google.protobuf", protobuf_mod) monkeypatch.setitem(sys.modules, "google.protobuf.json_format", json_format_mod) monkeypatch.setitem(sys.modules, "google.protobuf.message", message_mod) else: monkeypatch.setitem(sys.modules, "google.protobuf.json_format", json_format_mod) monkeypatch.setitem(sys.modules, "google.protobuf.message", message_mod) message_module = sys.modules.get("google.protobuf.message", message_mod) # Stub meshtastic.serial_interface.SerialInterface serial_interface_mod = types.ModuleType("meshtastic.serial_interface") class DummySerialInterface: def __init__(self, *_, **__): self.closed = False def close(self): self.closed = True serial_interface_mod.SerialInterface = DummySerialInterface tcp_interface_mod = types.ModuleType("meshtastic.tcp_interface") class DummyTCPInterface: def __init__(self, *_, **__): self.closed = False def close(self): self.closed = True tcp_interface_mod.TCPInterface = DummyTCPInterface ble_interface_mod = types.ModuleType("meshtastic.ble_interface") class DummyBLEInterface: def __init__(self, *_, **__): self.closed = False def close(self): self.closed = True ble_interface_mod.BLEInterface = DummyBLEInterface meshtastic_mod = types.ModuleType("meshtastic") meshtastic_mod.serial_interface = serial_interface_mod meshtastic_mod.tcp_interface = tcp_interface_mod meshtastic_mod.ble_interface = ble_interface_mod mesh_interface_mod = types.ModuleType("meshtastic.mesh_interface") def _default_nodeinfo_callback(iface, packet): iface.nodes[packet["id"]] = packet return packet["id"] class DummyNodeInfoHandler: """Stub that mimics Meshtastic's NodeInfo handler semantics.""" def __init__(self): self.callback = getattr( meshtastic_mod, "_onNodeInfoReceive", _default_nodeinfo_callback ) def onReceive(self, iface, packet): nodes = getattr(iface, "nodes", None) if isinstance(nodes, dict): nodes[packet["id"]] = packet return self.callback(iface, packet) mesh_interface_mod.NodeInfoHandler = DummyNodeInfoHandler meshtastic_mod.mesh_interface = mesh_interface_mod monkeypatch.setitem(sys.modules, "meshtastic.mesh_interface", mesh_interface_mod) meshtastic_mod._onNodeInfoReceive = _default_nodeinfo_callback if real_protobuf is not None: meshtastic_mod.protobuf = real_protobuf else: serialization_mod = sys.modules.get("data.mesh_ingestor.serialization") proto_base = getattr(serialization_mod, "ProtoMessage", message_module.Message) decode_error = getattr(message_module, "DecodeError", Exception) config_pb2_mod, mesh_pb2_mod = build_protobuf_stub( proto_base, decode_error, ) protobuf_pkg = types.ModuleType("meshtastic.protobuf") protobuf_pkg.config_pb2 = config_pb2_mod protobuf_pkg.mesh_pb2 = mesh_pb2_mod meshtastic_mod.protobuf = protobuf_pkg monkeypatch.setitem(sys.modules, "meshtastic.protobuf", protobuf_pkg) monkeypatch.setitem( sys.modules, "meshtastic.protobuf.config_pb2", config_pb2_mod ) monkeypatch.setitem(sys.modules, "meshtastic.protobuf.mesh_pb2", mesh_pb2_mod) monkeypatch.setitem(sys.modules, "meshtastic", meshtastic_mod) monkeypatch.setitem( sys.modules, "meshtastic.serial_interface", serial_interface_mod ) monkeypatch.setitem(sys.modules, "meshtastic.tcp_interface", tcp_interface_mod) monkeypatch.setitem(sys.modules, "meshtastic.ble_interface", ble_interface_mod) if real_protobuf is not None: monkeypatch.setitem(sys.modules, "meshtastic.protobuf", real_protobuf) # Stub pubsub.pub pubsub_mod = types.ModuleType("pubsub") class DummyPub: def __init__(self): self.subscriptions = [] def subscribe(self, *args, **kwargs): self.subscriptions.append((args, kwargs)) pubsub_mod.pub = DummyPub() monkeypatch.setitem(sys.modules, "pubsub", pubsub_mod) module_name = "data.mesh_ingestor" if module_name in sys.modules: module = importlib.reload(sys.modules[module_name]) else: module = importlib.import_module(module_name) if hasattr(module, "_clear_post_queue"): module._clear_post_queue() # Ensure radio metadata starts unset for each test run. module.config.LORA_FREQ = None module.config.MODEM_PRESET = None for attr in ("LORA_FREQ", "MODEM_PRESET"): if attr in module.__dict__: delattr(module, attr) module.channels._reset_channel_cache() module.ingestors.STATE.start_time = int(time.time()) module.ingestors.STATE.last_heartbeat = None module.ingestors.STATE.node_id = None yield module # Ensure a clean import for the next test if hasattr(module, "_clear_post_queue"): module._clear_post_queue() sys.modules.pop(module_name, None) def test_instance_domain_prefers_primary_env(mesh_module, monkeypatch): """Ensure the ingestor prefers ``INSTANCE_DOMAIN`` over the legacy variable.""" monkeypatch.setenv("INSTANCE_DOMAIN", "https://new.example") monkeypatch.setenv("POTATOMESH_INSTANCE", "https://legacy.example") try: refreshed_instance = mesh_module.config._resolve_instance_domain() mesh_module.config.INSTANCE = refreshed_instance mesh_module.INSTANCE = refreshed_instance assert refreshed_instance == "https://new.example" assert mesh_module.INSTANCE == "https://new.example" finally: monkeypatch.delenv("INSTANCE_DOMAIN", raising=False) monkeypatch.delenv("POTATOMESH_INSTANCE", raising=False) mesh_module.config.INSTANCE = mesh_module.config._resolve_instance_domain() mesh_module.INSTANCE = mesh_module.config.INSTANCE def test_instance_domain_falls_back_to_legacy(mesh_module, monkeypatch): """Verify ``POTATOMESH_INSTANCE`` is used when ``INSTANCE_DOMAIN`` is unset.""" monkeypatch.delenv("INSTANCE_DOMAIN", raising=False) monkeypatch.setenv("POTATOMESH_INSTANCE", "https://legacy-only.example") try: refreshed_instance = mesh_module.config._resolve_instance_domain() mesh_module.config.INSTANCE = refreshed_instance mesh_module.INSTANCE = refreshed_instance assert refreshed_instance == "https://legacy-only.example" assert mesh_module.INSTANCE == "https://legacy-only.example" finally: monkeypatch.delenv("POTATOMESH_INSTANCE", raising=False) mesh_module.config.INSTANCE = mesh_module.config._resolve_instance_domain() mesh_module.INSTANCE = mesh_module.config.INSTANCE def test_instance_domain_infers_scheme_for_hostnames(mesh_module, monkeypatch): """Ensure bare hostnames are promoted to HTTPS URLs for ingestion.""" monkeypatch.setenv("INSTANCE_DOMAIN", "mesh.example.org") monkeypatch.delenv("POTATOMESH_INSTANCE", raising=False) try: refreshed_instance = mesh_module.config._resolve_instance_domain() mesh_module.config.INSTANCE = refreshed_instance mesh_module.INSTANCE = refreshed_instance assert refreshed_instance == "https://mesh.example.org" assert mesh_module.INSTANCE == "https://mesh.example.org" finally: monkeypatch.delenv("INSTANCE_DOMAIN", raising=False) mesh_module.config.INSTANCE = mesh_module.config._resolve_instance_domain() mesh_module.INSTANCE = mesh_module.config.INSTANCE def test_parse_channel_names_applies_allowlist(mesh_module): """Ensure allowlists reuse the shared channel parser.""" mesh = mesh_module previous_allowed = mesh.ALLOWED_CHANNELS try: parsed = mesh.config._parse_channel_names(" Primary ,Chat ,primary , Ops ") mesh.ALLOWED_CHANNELS = parsed assert parsed == ("Primary", "Chat", "Ops") assert mesh.channels.allowed_channel_names() == ("Primary", "Chat", "Ops") assert mesh.channels.is_allowed_channel("chat") assert mesh.channels.is_allowed_channel(" ops ") assert not mesh.channels.is_allowed_channel("unknown") assert not mesh.channels.is_allowed_channel(None) assert mesh.config._parse_channel_names("") == () finally: mesh.ALLOWED_CHANNELS = previous_allowed def test_allowed_channel_defaults_allow_all(mesh_module): """Ensure unset allowlists do not block any channels.""" mesh = mesh_module previous_allowed = mesh.ALLOWED_CHANNELS try: mesh.ALLOWED_CHANNELS = () assert mesh.channels.is_allowed_channel("Any") finally: mesh.ALLOWED_CHANNELS = previous_allowed def test_parse_hidden_channels_deduplicates_names(mesh_module): """Ensure hidden channel parsing strips blanks and deduplicates.""" mesh = mesh_module previous_hidden = mesh.HIDDEN_CHANNELS try: parsed = mesh.config._parse_hidden_channels(" Chat , ,Secret ,chat") mesh.HIDDEN_CHANNELS = parsed assert parsed == ("Chat", "Secret") assert mesh.channels.hidden_channel_names() == ("Chat", "Secret") assert mesh.channels.is_hidden_channel(" chat ") assert not mesh.channels.is_hidden_channel("unknown") assert mesh.config._parse_hidden_channels("") == () finally: mesh.HIDDEN_CHANNELS = previous_hidden def test_subscribe_receive_topics_covers_all_handlers(mesh_module, monkeypatch): mesh = mesh_module daemon_mod = sys.modules["data.mesh_ingestor.daemon"] recorded_calls: list[tuple[tuple[object, ...], dict[str, object]]] = [] class RecordingPub: def subscribe(self, *args, **kwargs): recorded_calls.append((args, kwargs)) monkeypatch.setattr(daemon_mod, "pub", RecordingPub()) subscribed_topics = mesh._subscribe_receive_topics() expected_topics = list(mesh._RECEIVE_TOPICS) assert subscribed_topics == expected_topics assert len(recorded_calls) == len(expected_topics) for (args, kwargs), topic in zip(recorded_calls, expected_topics): assert not kwargs assert args[0] is mesh.on_receive assert args[1] == topic def test_snapshot_interval_defaults_to_60_seconds(mesh_module): mesh = mesh_module assert mesh.SNAPSHOT_SECS == 60 def test_extract_host_node_id_prefers_my_info_fields(mesh_module): mesh = mesh_module class DummyInterface: def __init__(self): self.myInfo = {"my_node_num": 0x9E95CF60} iface = DummyInterface() assert mesh._extract_host_node_id(iface) == "!9e95cf60" def test_extract_host_node_id_from_nested_info(mesh_module): mesh = mesh_module class DummyInterface: def __init__(self): self.myInfo = {"info": {"id": "!cafebabe"}} iface = DummyInterface() assert mesh._extract_host_node_id(iface) == "!cafebabe" def test_extract_host_node_id_from_callable(mesh_module): mesh = mesh_module class CallableNoDict: __slots__ = () def __call__(self): return {"id": "!f00ba4"} class DummyInterface: def __init__(self): self.localNode = CallableNoDict() iface = DummyInterface() assert mesh._extract_host_node_id(iface) == "!00f00ba4" def test_extract_host_node_id_from_my_node_num_attribute(mesh_module): mesh = mesh_module class DummyInterface: def __init__(self): self.myNodeNum = 0xDEADBEEF iface = DummyInterface() assert mesh._extract_host_node_id(iface) == "!deadbeef" @pytest.mark.parametrize("value", ["mock", "Mock", " disabled "]) def test_create_serial_interface_allows_mock(mesh_module, value): mesh = mesh_module iface, resolved = mesh._create_serial_interface(value) assert resolved == "mock" assert isinstance(iface.nodes, dict) iface.close() def test_create_serial_interface_uses_serial_module(mesh_module, monkeypatch): mesh = mesh_module created = {} sentinel = object() def fake_interface(*, devPath): created["devPath"] = devPath return SimpleNamespace(nodes={"!foo": sentinel}, close=lambda: None) monkeypatch.setattr(mesh, "SerialInterface", fake_interface) iface, resolved = mesh._create_serial_interface("/dev/ttyTEST") assert created["devPath"] == "/dev/ttyTEST" assert resolved == "/dev/ttyTEST" assert iface.nodes == {"!foo": sentinel} def test_create_serial_interface_uses_tcp_for_ip(mesh_module, monkeypatch): mesh = mesh_module created = {} def fake_tcp_interface(*, hostname, portNumber, **_): created["hostname"] = hostname created["portNumber"] = portNumber return SimpleNamespace(nodes={}, close=lambda: None) monkeypatch.setattr(mesh, "TCPInterface", fake_tcp_interface) iface, resolved = mesh._create_serial_interface("192.168.1.25:4500") assert created == {"hostname": "192.168.1.25", "portNumber": 4500} assert resolved == "tcp://192.168.1.25:4500" assert iface.nodes == {} def test_create_serial_interface_defaults_tcp_port(mesh_module, monkeypatch): mesh = mesh_module created = {} def fake_tcp_interface(*, hostname, portNumber, **_): created["hostname"] = hostname created["portNumber"] = portNumber return SimpleNamespace(nodes={}, close=lambda: None) monkeypatch.setattr(mesh, "TCPInterface", fake_tcp_interface) _, resolved = mesh._create_serial_interface("tcp://10.20.30.40") assert created["hostname"] == "10.20.30.40" assert created["portNumber"] == mesh._DEFAULT_TCP_PORT assert resolved == "tcp://10.20.30.40:4403" def test_create_serial_interface_plain_ip(mesh_module, monkeypatch): mesh = mesh_module created = {} def fake_tcp_interface(*, hostname, portNumber, **_): created["hostname"] = hostname created["portNumber"] = portNumber return SimpleNamespace(nodes={}, close=lambda: None) monkeypatch.setattr(mesh, "TCPInterface", fake_tcp_interface) _, resolved = mesh._create_serial_interface(" 192.168.50.10 ") assert created["hostname"] == "192.168.50.10" assert created["portNumber"] == mesh._DEFAULT_TCP_PORT assert resolved == "tcp://192.168.50.10:4403" def test_create_serial_interface_ble(mesh_module, monkeypatch): mesh = mesh_module created = {} def fake_ble_interface(*, address=None, **_): created["address"] = address return SimpleNamespace(nodes={}, close=lambda: None) monkeypatch.setattr(mesh, "BLEInterface", fake_ble_interface) iface, resolved = mesh._create_serial_interface("ed:4d:9e:95:cf:60") assert created["address"] == "ED:4D:9E:95:CF:60" assert resolved == "ED:4D:9E:95:CF:60" assert iface.nodes == {} def test_ensure_radio_metadata_extracts_config(mesh_module, capsys): mesh = mesh_module class DummyEnumValue: def __init__(self, name: str) -> None: self.name = name class DummyEnum: def __init__(self, mapping: dict[int, str]) -> None: self.values_by_number = { number: DummyEnumValue(name) for number, name in mapping.items() } class DummyField: def __init__(self, enum_type=None) -> None: self.enum_type = enum_type class DummyDescriptor: def __init__(self, fields: dict[str, DummyField]) -> None: self.fields_by_name = fields def make_lora( region_value: int, region_name: str, preset_value: int, preset_name: str, *, preset_field: str = "modem_preset", ): descriptor = DummyDescriptor( { "region": DummyField(DummyEnum({region_value: region_name})), preset_field: DummyField(DummyEnum({preset_value: preset_name})), } ) class DummyLora: DESCRIPTOR = descriptor def __init__(self) -> None: self.region = region_value setattr(self, preset_field, preset_value) def HasField(self, name: str) -> bool: # noqa: D401 - simple proxy return hasattr(self, name) return DummyLora() class DummyRadio: def __init__(self, lora) -> None: self.lora = lora def HasField(self, name: str) -> bool: return hasattr(self, name) class DummyConfig: def __init__(self, lora, *, expose_direct: bool) -> None: if expose_direct: self.lora = lora else: self.radio = DummyRadio(lora) def HasField(self, name: str) -> bool: # noqa: D401 - mimics protobuf API return hasattr(self, name) class DummyLocalNode: def __init__(self, config) -> None: self.localConfig = config class DummyInterface: def __init__(self, local_config) -> None: self.localNode = DummyLocalNode(local_config) self.wait_calls = 0 def waitForConfig(self) -> None: # noqa: D401 - matches Meshtastic API self.wait_calls += 1 primary_lora = make_lora(3, "EU_868", 4, "MEDIUM_FAST") iface = DummyInterface(DummyConfig(primary_lora, expose_direct=False)) mesh._ensure_radio_metadata(iface) first_log = capsys.readouterr().out assert iface.wait_calls == 1 assert mesh.config.LORA_FREQ == 868 assert mesh.config.MODEM_PRESET == "MediumFast" assert "Captured LoRa radio metadata" in first_log assert "lora_freq=868" in first_log assert "modem_preset='MediumFast'" in first_log secondary_lora = make_lora(7, "US_915", 2, "LONG_FAST", preset_field="preset") second_iface = DummyInterface(DummyConfig(secondary_lora, expose_direct=True)) mesh._ensure_radio_metadata(second_iface) second_log = capsys.readouterr().out assert second_iface.wait_calls == 1 assert mesh.config.LORA_FREQ == 868 assert mesh.config.MODEM_PRESET == "MediumFast" assert second_log == "" def test_capture_channels_from_interface_records_metadata(mesh_module, capsys): mesh = mesh_module mesh.config.MODEM_PRESET = "MediumFast" mesh.channels._reset_channel_cache() class DummyInterface: def __init__(self) -> None: self.wait_calls = 0 primary = SimpleNamespace( role=1, settings=SimpleNamespace(name=" radioamator ") ) secondary = SimpleNamespace( role="SECONDARY", index="7", settings=SimpleNamespace(name="TestChannel"), ) self.localNode = SimpleNamespace(channels=[primary, secondary]) def waitForConfig(self) -> None: # noqa: D401 - matches interface contract self.wait_calls += 1 iface = DummyInterface() mesh.channels.capture_from_interface(iface) log_output = capsys.readouterr().out assert iface.wait_calls == 1 assert mesh.channels.channel_mappings() == ((0, "radioamator"), (7, "TestChannel")) assert mesh.channels.channel_name(7) == "TestChannel" assert "Captured channel metadata" in log_output assert "channels=((0, 'radioamator'), (7, 'TestChannel'))" in log_output mesh.channels.capture_from_interface(SimpleNamespace(localNode=None)) assert mesh.channels.channel_mappings() == ((0, "radioamator"), (7, "TestChannel")) def test_capture_channels_primary_falls_back_to_env(mesh_module, monkeypatch, capsys): mesh = mesh_module mesh.config.MODEM_PRESET = None mesh.channels._reset_channel_cache() monkeypatch.setenv("CHANNEL", "FallbackName") class DummyInterface: def __init__(self) -> None: self.localNode = SimpleNamespace( channels={"primary": SimpleNamespace(role="PRIMARY")} ) def waitForConfig(self) -> None: # noqa: D401 - placeholder return None mesh.channels._reset_channel_cache() mesh.channels.capture_from_interface(DummyInterface()) log_output = capsys.readouterr().out assert mesh.channels.channel_mappings() == ((0, "FallbackName"),) assert mesh.channels.channel_name(0) == "FallbackName" assert "FallbackName" in log_output def test_capture_channels_primary_falls_back_to_preset(mesh_module, capsys): mesh = mesh_module mesh.config.MODEM_PRESET = " MediumFast " mesh.channels._reset_channel_cache() class DummyInterface: def __init__(self) -> None: self.localNode = SimpleNamespace( channels=[SimpleNamespace(role="PRIMARY", settings=SimpleNamespace())] ) def waitForConfig(self) -> None: # noqa: D401 - matches interface contract return None mesh.channels.capture_from_interface(DummyInterface()) log_output = capsys.readouterr().out assert mesh.channels.channel_mappings() == ((0, "MediumFast"),) assert mesh.channels.channel_name(0) == "MediumFast" assert "MediumFast" in log_output def test_create_default_interface_falls_back_to_tcp(mesh_module, monkeypatch): mesh = mesh_module attempts = [] def fake_targets(): return ["/dev/ttyFAIL"] def fake_create(port): attempts.append(port) if port.startswith("/dev/tty"): raise RuntimeError("missing serial device") return SimpleNamespace(nodes={}, close=lambda: None), "tcp://127.0.0.1:4403" monkeypatch.setattr(mesh, "_default_serial_targets", fake_targets) monkeypatch.setattr(mesh, "_create_serial_interface", fake_create) iface, resolved = mesh._create_default_interface() assert attempts == ["/dev/ttyFAIL", mesh._DEFAULT_TCP_TARGET] assert resolved == "tcp://127.0.0.1:4403" assert iface.nodes == {} def test_create_default_interface_raises_when_unavailable(mesh_module, monkeypatch): mesh = mesh_module monkeypatch.setattr(mesh, "_default_serial_targets", lambda: ["/dev/ttyFAIL"]) def always_fail(port): raise RuntimeError(f"boom for {port}") monkeypatch.setattr(mesh, "_create_serial_interface", always_fail) with pytest.raises(mesh.NoAvailableMeshInterface) as exc_info: mesh._create_default_interface() assert "/dev/ttyFAIL" in str(exc_info.value) def test_node_to_dict_handles_nested_structures(mesh_module): mesh = mesh_module @dataclass class Child: number: int class DummyProto(mesh.ProtoMessage): def __init__(self, **payload): self._payload = payload def to_dict(self): return self._payload @dataclass class Node: info: Child proto: DummyProto payload: bytes seq: list node = Node(Child(5), DummyProto(value=7), b"hi", [Child(1), DummyProto(value=9)]) result = mesh._node_to_dict(node) assert result["info"] == {"number": 5} assert result["proto"] == {"value": 7} assert result["payload"] == "hi" assert result["seq"] == [{"number": 1}, {"value": 9}] def test_store_packet_dict_posts_text_message(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" mesh.register_host_node_id("!f00dbabe") packet = { "id": 123, "rxTime": 1_700_000_000, "fromId": "!abc", "toId": "!def", "channel": "2", "hopLimit": "3", "snr": "1.25", "rxRssi": "-70", "decoded": { "payload": {"text": "hello"}, "portnum": "TEXT_MESSAGE_APP", "channel": 4, }, } mesh.store_packet_dict(packet) assert captured, "Expected POST to be triggered for text message" path, payload, priority = captured[0] assert path == "/api/messages" assert payload["id"] == 123 assert payload["channel"] == 4 assert payload["from_id"] == "!abc" assert payload["to_id"] == "!def" assert payload["text"] == "hello" assert payload["portnum"] == "TEXT_MESSAGE_APP" assert payload["rx_time"] == 1_700_000_000 assert payload["rx_iso"] == mesh._iso(1_700_000_000) assert payload["hop_limit"] == 3 assert payload["snr"] == pytest.approx(1.25) assert payload["rssi"] == -70 assert payload["reply_id"] is None assert payload["emoji"] is None assert payload["ingestor"] == "!f00dbabe" assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_posts_reaction_message(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) packet = { "id": 999, "rxTime": 1_700_100_000, "fromId": "!reply", "toId": "!root", "decoded": { "portnum": "REACTION_APP", "data": { "reply_id": "123", "emoji": " 👍 ", }, }, } mesh.store_packet_dict(packet) assert captured, "Expected POST to be triggered for reaction message" path, payload, priority = captured[0] assert path == "/api/messages" assert payload["id"] == 999 assert payload["from_id"] == "!reply" assert payload["to_id"] == "!root" assert payload["portnum"] == "REACTION_APP" assert payload["text"] is None assert payload["reply_id"] == 123 assert payload["emoji"] == "👍" assert payload["rx_time"] == 1_700_100_000 assert payload["rx_iso"] == mesh._iso(1_700_100_000) assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_posts_position(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" mesh.register_host_node_id("!f00dbabe") packet = { "id": 200498337, "rxTime": 1_758_624_186, "fromId": "!b1fa2b07", "toId": "^all", "rxSnr": -9.5, "rxRssi": -104, "decoded": { "portnum": "POSITION_APP", "bitfield": 1, "position": { "latitudeI": int(52.518912 * 1e7), "longitudeI": int(13.5512064 * 1e7), "altitude": -16, "time": 1_758_624_189, "locationSource": "LOC_INTERNAL", "precisionBits": 17, "satsInView": 7, "PDOP": 211, "groundSpeed": 2, "groundTrack": 0, "raw": { "latitude_i": int(52.518912 * 1e7), "longitude_i": int(13.5512064 * 1e7), "altitude": -16, "time": 1_758_624_189, }, }, "payload": { "__bytes_b64__": "DQDATR8VAMATCBjw//////////8BJb150mgoAljTAXgCgAEAmAEHuAER", }, }, } mesh.store_packet_dict(packet) assert captured, "Expected POST to be triggered for position packet" path, payload, priority = captured[0] assert path == "/api/positions" assert priority == mesh._POSITION_POST_PRIORITY assert payload["id"] == 200498337 assert payload["node_id"] == "!b1fa2b07" assert payload["node_num"] == int("b1fa2b07", 16) assert payload["num"] == payload["node_num"] assert payload["rx_time"] == 1_758_624_186 assert payload["rx_iso"] == mesh._iso(1_758_624_186) assert payload["latitude"] == pytest.approx(52.518912) assert payload["longitude"] == pytest.approx(13.5512064) assert payload["altitude"] == pytest.approx(-16) assert payload["position_time"] == 1_758_624_189 assert payload["location_source"] == "LOC_INTERNAL" assert payload["precision_bits"] == 17 assert payload["sats_in_view"] == 7 assert payload["pdop"] == pytest.approx(211.0) assert payload["ground_speed"] == pytest.approx(2.0) assert payload["ground_track"] == pytest.approx(0.0) assert payload["snr"] == pytest.approx(-9.5) assert payload["rssi"] == -104 assert payload["hop_limit"] is None assert payload["bitfield"] == 1 assert ( payload["payload_b64"] == "DQDATR8VAMATCBjw//////////8BJb150mgoAljTAXgCgAEAmAEHuAER" ) assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert payload["ingestor"] == "!f00dbabe" assert payload["raw"]["time"] == 1_758_624_189 def test_store_packet_dict_posts_neighborinfo(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" mesh.register_host_node_id("!f00dbabe") packet = { "id": 2049886869, "rxTime": 1_758_884_186, "fromId": "!7c5b0920", "decoded": { "portnum": "NEIGHBORINFO_APP", "neighborinfo": { "nodeId": 0x7C5B0920, "lastSentById": 0x9E3AA2F0, "nodeBroadcastIntervalSecs": 1800, "neighbors": [ {"nodeId": 0x2B2A4D51, "snr": -6.5}, {"nodeId": 0x437FE3E0, "snr": -2.75, "rxTime": 1_758_884_150}, {"nodeId": "!0badc0de", "snr": None}, ], }, }, } mesh.store_packet_dict(packet) assert captured, "Expected POST to be triggered for neighbor info" path, payload, priority = captured[0] assert path == "/api/neighbors" assert priority == mesh._NEIGHBOR_POST_PRIORITY assert payload["node_id"] == "!7c5b0920" assert payload["node_num"] == 0x7C5B0920 assert payload["rx_time"] == 1_758_884_186 assert payload["node_broadcast_interval_secs"] == 1800 assert payload["last_sent_by_id"] == "!9e3aa2f0" neighbors = payload["neighbors"] assert len(neighbors) == 3 assert neighbors[0]["neighbor_id"] == "!2b2a4d51" assert neighbors[0]["neighbor_num"] == 0x2B2A4D51 assert neighbors[0]["rx_time"] == 1_758_884_186 assert neighbors[0]["snr"] == pytest.approx(-6.5) assert neighbors[1]["neighbor_id"] == "!437fe3e0" assert neighbors[1]["rx_time"] == 1_758_884_150 assert neighbors[1]["snr"] == pytest.approx(-2.75) assert neighbors[2]["neighbor_id"] == "!0badc0de" assert neighbors[2]["neighbor_num"] == 0x0BAD_C0DE assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert payload["ingestor"] == "!f00dbabe" def test_store_packet_dict_handles_nodeinfo_packet(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" from meshtastic.protobuf import config_pb2, mesh_pb2 node_info = mesh_pb2.NodeInfo() node_info.num = 321 user = node_info.user user.id = "!abcd1234" user.short_name = "LoRa" user.long_name = "LoRa Node" user.role = config_pb2.Config.DeviceConfig.Role.Value("CLIENT") user.hw_model = mesh_pb2.HardwareModel.Value("TBEAM") node_info.device_metrics.battery_level = 87 node_info.device_metrics.voltage = 3.91 node_info.device_metrics.channel_utilization = 5.5 node_info.device_metrics.air_util_tx = 0.12 node_info.device_metrics.uptime_seconds = 4321 node_info.position.latitude_i = int(52.5 * 1e7) node_info.position.longitude_i = int(13.4 * 1e7) node_info.position.altitude = 48 node_info.position.time = 1_700_000_050 node_info.position.location_source = mesh_pb2.Position.LocSource.Value( "LOC_INTERNAL" ) node_info.snr = 9.5 node_info.last_heard = 1_700_000_040 node_info.hops_away = 2 node_info.is_favorite = True payload_b64 = base64.b64encode(node_info.SerializeToString()).decode() packet = { "id": 999, "rxTime": 1_700_000_200, "from": int("abcd1234", 16), "rxSnr": -5.5, "decoded": { "portnum": "NODEINFO_APP", "payload": {"__bytes_b64__": payload_b64}, }, } mesh.store_packet_dict(packet) assert captured, "Expected nodeinfo packet to trigger POST" path, payload, priority = captured[0] assert path == "/api/nodes" assert priority == mesh._NODE_POST_PRIORITY assert "!abcd1234" in payload node_entry = payload["!abcd1234"] assert node_entry["num"] == 321 assert node_entry["lastHeard"] == 1_700_000_200 assert node_entry["snr"] == pytest.approx(9.5) assert node_entry["hopsAway"] == 2 assert node_entry["isFavorite"] is True assert node_entry["user"]["shortName"] == "LoRa" assert node_entry["deviceMetrics"]["batteryLevel"] == pytest.approx(87) assert node_entry["deviceMetrics"]["voltage"] == pytest.approx(3.91) assert node_entry["deviceMetrics"]["uptimeSeconds"] == 4321 assert node_entry["position"]["latitude"] == pytest.approx(52.5) assert node_entry["position"]["longitude"] == pytest.approx(13.4) assert node_entry["position"]["time"] == 1_700_000_050 assert node_entry["lora_freq"] == 868 assert node_entry["modem_preset"] == "MediumFast" def test_store_packet_dict_handles_user_only_nodeinfo(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" from meshtastic.protobuf import mesh_pb2 user_msg = mesh_pb2.User() user_msg.id = "!11223344" user_msg.short_name = "Test" user_msg.long_name = "Test Node" payload_b64 = base64.b64encode(user_msg.SerializeToString()).decode() packet = { "id": 42, "rxTime": 1_234, "from": int("11223344", 16), "decoded": { "portnum": "NODEINFO_APP", "payload": {"__bytes_b64__": payload_b64}, "user": { "id": "!11223344", "shortName": "Test", "longName": "Test Node", "hwModel": "HELTEC_V3", }, }, } mesh.store_packet_dict(packet) assert captured _, payload, _ = captured[0] node_entry = payload["!11223344"] assert node_entry["lastHeard"] == 1_234 assert node_entry["user"]["longName"] == "Test Node" assert "deviceMetrics" not in node_entry assert node_entry["lora_freq"] == 868 assert node_entry["modem_preset"] == "MediumFast" def test_store_packet_dict_nodeinfo_merges_proto_user(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" from meshtastic.protobuf import mesh_pb2 user_msg = mesh_pb2.User() user_msg.id = "!44556677" user_msg.short_name = "Proto" user_msg.long_name = "Proto User" node_info = mesh_pb2.NodeInfo() node_info.snr = 2.5 payload_b64 = base64.b64encode(node_info.SerializeToString()).decode() packet = { "id": 73, "rxTime": 5_000, "fromId": "!44556677", "decoded": { "portnum": "NODEINFO_APP", "payload": {"__bytes_b64__": payload_b64}, "user": user_msg, }, } mesh.store_packet_dict(packet) assert captured _, payload, _ = captured[0] node_entry = payload["!44556677"] assert node_entry["lastHeard"] == 5_000 assert node_entry["user"]["shortName"] == "Proto" assert node_entry["user"]["longName"] == "Proto User" assert node_entry["lora_freq"] == 868 assert node_entry["modem_preset"] == "MediumFast" def test_store_packet_dict_nodeinfo_sanitizes_nested_proto(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" from meshtastic.protobuf import mesh_pb2 user_msg = mesh_pb2.User() user_msg.id = "!55667788" user_msg.short_name = "Nested" node_info = mesh_pb2.NodeInfo() node_info.hops_away = 1 payload_b64 = base64.b64encode(node_info.SerializeToString()).decode() packet = { "id": 74, "rxTime": 6_000, "fromId": "!55667788", "decoded": { "portnum": "NODEINFO_APP", "payload": {"__bytes_b64__": payload_b64}, "user": { "id": "!55667788", "shortName": "Nested", "raw": user_msg, }, }, } mesh.store_packet_dict(packet) assert captured _, payload, _ = captured[0] node_entry = payload["!55667788"] assert node_entry["user"]["shortName"] == "Nested" assert isinstance(node_entry["user"]["raw"], dict) assert node_entry["user"]["raw"]["id"] == "!55667788" assert node_entry["lora_freq"] == 868 assert node_entry["modem_preset"] == "MediumFast" def test_store_packet_dict_nodeinfo_uses_from_id_when_user_missing( mesh_module, monkeypatch ): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" from meshtastic.protobuf import mesh_pb2 node_info = mesh_pb2.NodeInfo() node_info.snr = 1.5 node_info.last_heard = 100 payload_b64 = base64.b64encode(node_info.SerializeToString()).decode() packet = { "id": 7, "rxTime": 200, "from": 0x01020304, "decoded": {"portnum": 5, "payload": {"__bytes_b64__": payload_b64}}, } mesh.store_packet_dict(packet) assert captured _, payload, _ = captured[0] assert "!01020304" in payload def test_nodeinfo_wrapper_infers_missing_identifier(mesh_module, monkeypatch): """Ensure the Meshtastic nodeinfo hook derives canonical IDs from payloads.""" _ = mesh_module import meshtastic from data.mesh_ingestor import interfaces captured_packets: list[dict] = [] def _original_handler(iface, packet): captured_packets.append(packet) return packet["id"] monkeypatch.setattr( meshtastic, "_onNodeInfoReceive", _original_handler, raising=False ) interfaces._patch_meshtastic_nodeinfo_handler() safe_handler = meshtastic._onNodeInfoReceive class DummyUser: def __init__(self) -> None: self.num = 0x88776655 class DummyDecoded: def __init__(self) -> None: self.user = DummyUser() class DummyPacket: def __init__(self) -> None: self.decoded = DummyDecoded() iface = types.SimpleNamespace(nodes={}) safe_handler(iface, DummyPacket()) assert captured_packets, "Expected wrapper to call the original handler" packet = captured_packets[0] assert packet["id"] == "!88776655" def test_nodeinfo_handler_wrapper_prevents_key_error(mesh_module): """The NodeInfo handler should operate safely when the ID field is absent.""" import meshtastic from data.mesh_ingestor import interfaces interfaces._patch_meshtastic_nodeinfo_handler() assert getattr( meshtastic.mesh_interface.NodeInfoHandler, "_potato_mesh_safe_wrapper", False, ), "Expected NodeInfoHandler to be replaced with a safe subclass" handler = meshtastic.mesh_interface.NodeInfoHandler() iface = types.SimpleNamespace(nodes={}) packet = {"decoded": {"user": {"id": "!01020304"}}} result = handler.onReceive(iface, packet) assert iface.nodes["!01020304"]["id"] == "!01020304" assert result == "!01020304" def test_interfaces_patch_handles_preimported_serial(): """Regression: importing serial module before patch still updates handler.""" preserved_modules: dict[str, types.ModuleType | None] = {} module_names = [ "data.mesh_ingestor.interfaces", "data.mesh_ingestor", "meshtastic.serial_interface", "meshtastic.tcp_interface", "meshtastic.mesh_interface", "meshtastic", ] for name in module_names: preserved_modules[name] = sys.modules.pop(name, None) try: def _default_nodeinfo_callback(_iface, packet): return packet["id"] mesh_interface_mod = types.ModuleType("meshtastic.mesh_interface") class DummyNodeInfoHandler: """Stub that mirrors Meshtastic's original handler semantics.""" def __init__(self) -> None: self.callback = _default_nodeinfo_callback def onReceive(self, iface, packet): # noqa: D401 - simple passthrough return self.callback(iface, packet) mesh_interface_mod.NodeInfoHandler = DummyNodeInfoHandler serial_interface_mod = types.ModuleType("meshtastic.serial_interface") class DummySerialInterface: def __init__(self, *_, **__): self.nodes = {} def close(self): # noqa: D401 - mimic Meshtastic close API self.nodes.clear() serial_interface_mod.SerialInterface = DummySerialInterface serial_interface_mod.NodeInfoHandler = DummyNodeInfoHandler tcp_interface_mod = types.ModuleType("meshtastic.tcp_interface") class DummyTCPInterface: def __init__(self, *_, **__): self.nodes = {} def close(self): # noqa: D401 - mimic Meshtastic close API self.nodes.clear() tcp_interface_mod.TCPInterface = DummyTCPInterface meshtastic_mod = types.ModuleType("meshtastic") meshtastic_mod.__path__ = [] # mark as package for import machinery meshtastic_mod._onNodeInfoReceive = _default_nodeinfo_callback meshtastic_mod.mesh_interface = mesh_interface_mod meshtastic_mod.serial_interface = serial_interface_mod meshtastic_mod.tcp_interface = tcp_interface_mod sys.modules["meshtastic"] = meshtastic_mod sys.modules["meshtastic.mesh_interface"] = mesh_interface_mod sys.modules["meshtastic.serial_interface"] = serial_interface_mod sys.modules["meshtastic.tcp_interface"] = tcp_interface_mod serial_module = importlib.import_module("meshtastic.serial_interface") assert serial_module.NodeInfoHandler is DummyNodeInfoHandler interfaces = importlib.import_module("data.mesh_ingestor.interfaces") patched_handler = serial_module.NodeInfoHandler assert patched_handler is not DummyNodeInfoHandler assert getattr(patched_handler, "_potato_mesh_safe_wrapper", False) handler = patched_handler() iface = types.SimpleNamespace(nodes={}) assert handler.onReceive(iface, {}) is None assert iface.nodes == {} patched_callback = getattr(meshtastic_mod, "_onNodeInfoReceive") assert getattr(patched_callback, "_potato_mesh_safe_wrapper", False) assert interfaces.SerialInterface is DummySerialInterface finally: for name in module_names: sys.modules.pop(name, None) for name, module in preserved_modules.items(): if module is not None: sys.modules[name] = module def test_store_packet_dict_ignores_non_text(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda *args, **kwargs: captured.append((args, kwargs)), ) packet = { "id": 456, "rxTime": 1_700_000_100, "fromId": "!abc", "toId": "!def", "decoded": { "payload": {"text": "ignored"}, "portnum": "ENVIRONMENTAL_MEASUREMENT", }, } mesh.store_packet_dict(packet) assert not captured, "Non-text messages should not be queued" def test_node_items_snapshot_handles_transient_runtime_error(mesh_module): mesh = mesh_module class FlakyDict(dict): def __init__(self): super().__init__({"node": {"foo": "bar"}}) self.calls = 0 def items(self): self.calls += 1 if self.calls == 1: raise RuntimeError("dictionary changed size during iteration") return super().items() nodes = FlakyDict() snapshot = mesh._node_items_snapshot(nodes, retries=3) assert snapshot == [("node", {"foo": "bar"})] assert nodes.calls == 2 def test_node_items_snapshot_returns_none_when_still_mutating(mesh_module): mesh = mesh_module class AlwaysChanging(dict): def __init__(self): super().__init__({"node": {"foo": "bar"}}) def items(self): raise RuntimeError("dictionary changed size during iteration") nodes = AlwaysChanging() snapshot = mesh._node_items_snapshot(nodes, retries=2) assert snapshot is None def test_get_handles_dicts_and_objects(mesh_module): mesh = mesh_module class Dummy: value = "obj" assert mesh._get({"key": 1}, "key") == 1 assert mesh._get({"key": 1}, "missing", "fallback") == "fallback" dummy = Dummy() assert mesh._get(dummy, "value") == "obj" assert mesh._get(dummy, "missing", "default") == "default" def test_post_json_skips_without_instance(mesh_module, monkeypatch): mesh = mesh_module monkeypatch.setattr(mesh, "INSTANCE", "") def fail_request(*_, **__): raise AssertionError("Request should not be created when INSTANCE is empty") monkeypatch.setattr(mesh.urllib.request, "Request", fail_request) mesh._post_json("/ignored", {"foo": "bar"}) def test_post_json_sends_payload_with_token(mesh_module, monkeypatch): mesh = mesh_module monkeypatch.setattr(mesh, "INSTANCE", "https://example.test") monkeypatch.setattr(mesh, "API_TOKEN", "secret") captured = {} def fake_urlopen(req, timeout=0): captured["req"] = req class DummyResponse: def __enter__(self): return self def __exit__(self, *exc): return False def read(self): return b"ok" return DummyResponse() monkeypatch.setattr(mesh.urllib.request, "urlopen", fake_urlopen) mesh._post_json("/api/test", {"hello": "world"}) req = captured["req"] assert req.full_url == "https://example.test/api/test" assert req.headers["Content-type"] == "application/json" assert req.get_header("Authorization") == "Bearer secret" assert mesh.json.loads(req.data.decode("utf-8")) == {"hello": "world"} def test_node_to_dict_handles_non_utf8_bytes(mesh_module): mesh = mesh_module @dataclass class Node: payload: bytes other: object class Custom: def __str__(self): return "custom!" node = Node(b"\xff", Custom()) result = mesh._node_to_dict(node) assert result["payload"] == "ff" assert result["other"] == "custom!" def test_first_prefers_first_non_empty_value(mesh_module): mesh = mesh_module data = {"primary": {"value": ""}, "secondary": {"value": "found"}} assert mesh._first(data, "primary.value", "secondary.value") == "found" assert mesh._first(data, "missing.path", default="fallback") == "fallback" def test_first_handles_attribute_sources(mesh_module): mesh = mesh_module ns = SimpleNamespace(empty=None, value="attr") assert mesh._first(ns, "empty", "value") == "attr" def test_pkt_to_dict_handles_dict_and_proto(mesh_module, monkeypatch): mesh = mesh_module assert mesh._pkt_to_dict({"a": 1}) == {"a": 1} class DummyProto(mesh.ProtoMessage): def to_dict(self): return {"value": 5} assert mesh._pkt_to_dict(DummyProto()) == {"value": 5} class Unknown: pass def broken_dumps(*_, **__): raise TypeError("boom") monkeypatch.setattr(mesh.json, "dumps", broken_dumps) fallback = mesh._pkt_to_dict(Unknown()) assert set(fallback) == {"_unparsed"} assert isinstance(fallback["_unparsed"], str) def test_main_retries_interface_creation(mesh_module, monkeypatch): mesh = mesh_module attempts = [] class DummyEvent: def __init__(self): self.wait_calls = 0 def is_set(self): return self.wait_calls >= 3 def set(self): self.wait_calls = 3 def wait(self, timeout): self.wait_calls += 1 return self.is_set() class DummyInterface: def __init__(self): self.closed = False self.nodes = {} def close(self): self.closed = True iface = DummyInterface() def fake_create(port): attempts.append(port) if len(attempts) < 3: raise RuntimeError("boom") return iface, port monkeypatch.setattr(mesh, "PORT", "/dev/ttyTEST") monkeypatch.setattr(mesh, "_create_serial_interface", fake_create) monkeypatch.setattr(mesh.threading, "Event", DummyEvent) monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None) monkeypatch.setattr(mesh, "SNAPSHOT_SECS", 0) monkeypatch.setattr(mesh, "_RECONNECT_INITIAL_DELAY_SECS", 0) monkeypatch.setattr(mesh, "_RECONNECT_MAX_DELAY_SECS", 0) mesh.main() assert len(attempts) == 3 assert iface.closed is True def test_connected_state_handles_threading_event(mesh_module): mesh = mesh_module event = mesh.threading.Event() assert mesh._connected_state(event) is False event.set() assert mesh._connected_state(event) is True def test_main_reconnects_when_connection_event_clears(mesh_module, monkeypatch): mesh = mesh_module attempts = [] interfaces = [] current_iface = {"obj": None} import threading as real_threading_module real_event_cls = real_threading_module.Event class DummyInterface: def __init__(self): self.nodes = {} self.isConnected = real_event_cls() self.isConnected.set() self.close_calls = 0 def close(self): self.close_calls += 1 def fake_create(port): iface = DummyInterface() attempts.append(port) interfaces.append(iface) current_iface["obj"] = iface return iface, port class DummyStopEvent: def __init__(self): self._flag = False self.wait_calls = 0 def is_set(self): return self._flag def set(self): self._flag = True def wait(self, timeout): self.wait_calls += 1 if self.wait_calls == 1: iface = current_iface["obj"] assert iface is not None, "interface should be available" iface.isConnected.clear() return self._flag self._flag = True return True monkeypatch.setattr(mesh, "PORT", "/dev/ttyTEST") monkeypatch.setattr(mesh, "_create_serial_interface", fake_create) monkeypatch.setattr(mesh.threading, "Event", DummyStopEvent) monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None) monkeypatch.setattr(mesh, "SNAPSHOT_SECS", 0) monkeypatch.setattr(mesh, "_RECONNECT_INITIAL_DELAY_SECS", 0) monkeypatch.setattr(mesh, "_RECONNECT_MAX_DELAY_SECS", 0) monkeypatch.setattr(mesh, "_CLOSE_TIMEOUT_SECS", 0) mesh.main() assert len(attempts) == 2 assert len(interfaces) == 2 assert interfaces[0].close_calls >= 1 assert interfaces[1].close_calls >= 1 def test_main_recreates_interface_after_snapshot_error(mesh_module, monkeypatch): mesh = mesh_module class DummyEvent: def __init__(self): self.wait_calls = 0 def is_set(self): return self.wait_calls >= 2 def set(self): self.wait_calls = 2 def wait(self, timeout): self.wait_calls += 1 return self.is_set() interfaces = [] def fake_create(port): fail_first = not interfaces class FlakyInterface: def __init__(self, should_fail): self.closed = False self._should_fail = should_fail self._calls = 0 @property def nodes(self): self._calls += 1 if self._should_fail and self._calls == 1: raise RuntimeError("temporary failure") return {"!node": {"id": 1}} def close(self): self.closed = True interface = FlakyInterface(fail_first) interfaces.append(interface) return interface, port upsert_calls = [] def record_upsert(node_id, node): upsert_calls.append(node_id) monkeypatch.setattr(mesh, "PORT", "/dev/ttyTEST") monkeypatch.setattr(mesh, "_create_serial_interface", fake_create) monkeypatch.setattr(mesh, "upsert_node", record_upsert) monkeypatch.setattr(mesh.threading, "Event", DummyEvent) monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None) monkeypatch.setattr(mesh, "SNAPSHOT_SECS", 0) monkeypatch.setattr(mesh, "_RECONNECT_INITIAL_DELAY_SECS", 0) monkeypatch.setattr(mesh, "_RECONNECT_MAX_DELAY_SECS", 0) mesh.main() assert len(interfaces) >= 2 assert interfaces[0].closed is True assert upsert_calls == ["!node"] def test_main_exits_when_defaults_unavailable(mesh_module, monkeypatch): mesh = mesh_module def fail_default(): raise mesh.NoAvailableMeshInterface("no interface available") monkeypatch.setattr(mesh, "PORT", None) monkeypatch.setattr(mesh, "_create_default_interface", fail_default) monkeypatch.setattr(mesh.signal, "signal", lambda *_, **__: None) with pytest.raises(SystemExit) as exc_info: mesh.main() assert exc_info.value.code == 1 def test_store_packet_dict_uses_top_level_channel(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" packet = { "id": "789", "rxTime": 123456, "from": "!abc", "to": "!def", "channel": "5", "decoded": {"text": "hi", "portnum": 1}, } mesh.store_packet_dict(packet) assert captured, "Expected message to be stored" path, payload, priority = captured[0] assert path == "/api/messages" assert payload["channel"] == 5 assert payload["portnum"] == "1" assert payload["text"] == "hi" assert payload["encrypted"] is None assert payload["snr"] is None and payload["rssi"] is None assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_handles_invalid_channel(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" packet = { "id": 321, "rxTime": 999, "fromId": "!abc", "decoded": { "payload": {"text": "hello"}, "portnum": "TEXT_MESSAGE_APP", "channel": "not-a-number", }, } mesh.store_packet_dict(packet) assert captured path, payload, priority = captured[0] assert path == "/api/messages" assert payload["channel"] == 0 assert payload["encrypted"] is None assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_skips_direct_message_on_primary_channel( mesh_module, monkeypatch ): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) packet = { "id": 111, "rxTime": 777, "fromId": "!sender", "toId": "!recipient", "channel": 0, "decoded": {"text": "secret dm", "portnum": "TEXT_MESSAGE_APP"}, } mesh.store_packet_dict(packet) assert not captured def test_store_packet_dict_allows_primary_channel_broadcast(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 915 mesh.config.MODEM_PRESET = "LongSlow" packet = { "id": 222, "rxTime": 888, "from": "!relay", "to": "^all", "channel": 0, "decoded": {"text": "announcement", "portnum": "TEXT_MESSAGE_APP"}, } mesh.store_packet_dict(packet) assert captured path, payload, priority = captured[0] assert path == "/api/messages" assert payload["text"] == "announcement" assert payload["to_id"] == "^all" assert payload["channel"] == 0 assert payload["lora_freq"] == 915 assert payload["modem_preset"] == "LongSlow" assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_accepts_routing_app_messages(mesh_module, monkeypatch): """Ensure routing app payloads are treated as message posts.""" mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) packet = { "id": 333, "rxTime": 999, "fromId": "!node", "toId": "^all", "channel": 0, "decoded": {"payload": "GAA=", "portnum": "ROUTING_APP"}, } mesh.store_packet_dict(packet) assert captured, "Expected routing packet to be stored" path, payload, priority = captured[0] assert path == "/api/messages" assert payload["portnum"] == "ROUTING_APP" assert payload["text"] == "GAA=" assert payload["channel"] == 0 assert payload["encrypted"] is None assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_serializes_routing_payloads(mesh_module, monkeypatch): """Ensure routing payloads are serialized when text is absent.""" mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) packet = { "id": 334, "rxTime": 1000, "fromId": "!node", "toId": "^all", "channel": 0, "decoded": { "payload": b"\x01\x02", "portnum": "ROUTING_APP", }, } mesh.store_packet_dict(packet) assert captured, "Expected routing packet to be stored" _, payload, _ = captured[0] assert payload["text"] == "AQI=" captured.clear() packet["decoded"]["payload"] = {"kind": "ack"} mesh.store_packet_dict(packet) assert captured, "Expected routing packet to be stored" _, payload, _ = captured[0] assert payload["text"] == '{"kind": "ack"}' captured.clear() packet["decoded"]["portnum"] = 7 packet["decoded"]["payload"] = b"\x00" packet["decoded"]["routing"] = {"errorReason": "NONE"} mesh.store_packet_dict(packet) assert captured, "Expected numeric routing packet to be stored" _, payload, _ = captured[0] assert payload["text"] == "AA==" def test_portnum_candidates_reads_enum_values(mesh_module, monkeypatch): """Ensure portnum candidates include enum and constants when available.""" mesh = mesh_module module_name = "meshtastic.portnums_pb2" class DummyPortNum: @staticmethod def Value(name): if name == "ROUTING_APP": return 7 raise KeyError(name) dummy_module = types.SimpleNamespace(PortNum=DummyPortNum, ROUTING_APP=8) monkeypatch.setitem(sys.modules, module_name, dummy_module) candidates = mesh.handlers._portnum_candidates("ROUTING_APP") assert 7 in candidates assert 8 in candidates def test_store_packet_dict_appends_channel_name(mesh_module, monkeypatch, capsys): mesh = mesh_module mesh.channels._reset_channel_cache() mesh.config.MODEM_PRESET = "MediumFast" class DummyInterface: def __init__(self) -> None: self.localNode = SimpleNamespace( channels=[ SimpleNamespace(role=1, settings=SimpleNamespace()), SimpleNamespace( role=2, index=5, settings=SimpleNamespace(name="Chat"), ), ] ) def waitForConfig(self) -> None: # noqa: D401 - matches interface contract return None mesh.channels.capture_from_interface(DummyInterface()) capsys.readouterr() captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) monkeypatch.setattr(mesh, "DEBUG", True) packet = { "id": "789", "rxTime": 123456, "from": "!abc", "to": "!def", "channel": 5, "decoded": {"text": "hi", "portnum": 1}, } mesh.store_packet_dict(packet) assert captured, "Expected message to be stored" path, payload, priority = captured[0] assert path == "/api/messages" assert payload["channel_name"] == "Chat" assert payload["channel"] == 5 assert payload["text"] == "hi" assert payload["encrypted"] is None assert payload["reply_id"] is None assert payload["emoji"] is None assert priority == mesh._MESSAGE_POST_PRIORITY log_output = capsys.readouterr().out assert "channel_name='Chat'" in log_output assert "channel_display='Chat'" in log_output def test_store_packet_dict_skips_hidden_channel(mesh_module, monkeypatch, capsys): mesh = mesh_module mesh.channels._reset_channel_cache() mesh.config.MODEM_PRESET = None class DummyInterface: def __init__(self) -> None: self.localNode = SimpleNamespace( channels=[ SimpleNamespace( role=1, settings=SimpleNamespace(name="Primary"), ), SimpleNamespace( role=2, index=5, settings=SimpleNamespace(name="Chat"), ), ] ) def waitForConfig(self): return None mesh.channels.capture_from_interface(DummyInterface()) capsys.readouterr() captured: list[tuple[str, dict, int]] = [] ignored: list[str] = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) monkeypatch.setattr( mesh.handlers, "_record_ignored_packet", lambda packet, *, reason: ignored.append(reason), ) previous_debug = mesh.config.DEBUG previous_hidden = mesh.HIDDEN_CHANNELS previous_allowed = mesh.ALLOWED_CHANNELS mesh.config.DEBUG = True mesh.DEBUG = True mesh.ALLOWED_CHANNELS = ("Chat",) mesh.HIDDEN_CHANNELS = ("Chat",) try: packet = { "id": "999", "rxTime": 24_680, "from": "!sender", "to": "^all", "channel": 5, "decoded": {"text": "hidden msg", "portnum": 1}, } mesh.store_packet_dict(packet) assert captured == [] assert ignored == ["hidden-channel"] assert "Ignored packet on hidden channel" in capsys.readouterr().out finally: mesh.HIDDEN_CHANNELS = previous_hidden mesh.ALLOWED_CHANNELS = previous_allowed mesh.config.DEBUG = previous_debug mesh.DEBUG = previous_debug def test_store_packet_dict_skips_disallowed_channel(mesh_module, monkeypatch, capsys): mesh = mesh_module mesh.channels._reset_channel_cache() mesh.config.MODEM_PRESET = None class DummyInterface: def __init__(self) -> None: self.localNode = SimpleNamespace( channels=[ SimpleNamespace( role=1, settings=SimpleNamespace(name="Primary"), ), SimpleNamespace( role=2, index=5, settings=SimpleNamespace(name="Chat"), ), ] ) def waitForConfig(self): return None mesh.channels.capture_from_interface(DummyInterface()) capsys.readouterr() captured: list[tuple[str, dict, int]] = [] ignored: list[str] = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) monkeypatch.setattr( mesh.handlers, "_record_ignored_packet", lambda packet, *, reason: ignored.append(reason), ) previous_debug = mesh.config.DEBUG previous_allowed = mesh.ALLOWED_CHANNELS previous_hidden = mesh.HIDDEN_CHANNELS mesh.config.DEBUG = True mesh.DEBUG = True mesh.ALLOWED_CHANNELS = ("Primary",) mesh.HIDDEN_CHANNELS = () try: packet = { "id": "1001", "rxTime": 25_680, "from": "!sender", "to": "^all", "channel": 5, "decoded": {"text": "disallowed msg", "portnum": 1}, } mesh.store_packet_dict(packet) assert captured == [] assert ignored == ["disallowed-channel"] assert "Ignored packet on disallowed channel" in capsys.readouterr().out finally: mesh.ALLOWED_CHANNELS = previous_allowed mesh.HIDDEN_CHANNELS = previous_hidden mesh.config.DEBUG = previous_debug mesh.DEBUG = previous_debug def test_store_packet_dict_includes_encrypted_payload(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" packet = { "id": 555, "rxTime": 111, "from": 2988082812, "to": "!receiver", "channel": 8, "encrypted": "abc123==", } mesh.store_packet_dict(packet) assert captured path, payload, priority = captured[0] assert path == "/api/messages" assert payload["encrypted"] == "abc123==" assert payload["text"] is None assert payload["from_id"] == 2988082812 assert payload["to_id"] == "!receiver" assert payload["reply_id"] is None assert payload["emoji"] is None assert "channel_name" not in payload assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert priority == mesh._MESSAGE_POST_PRIORITY def test_store_packet_dict_handles_telemetry_packet(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" mesh.register_host_node_id("!f00dbabe") packet = { "id": 1_256_091_342, "rxTime": 1_758_024_300, "fromId": "!9e95cf60", "toId": "^all", "decoded": { "portnum": "TELEMETRY_APP", "bitfield": 1, "telemetry": { "time": 1_758_024_300, "deviceMetrics": { "batteryLevel": 101, "voltage": 4.224, "channelUtilization": 0.59666663, "airUtilTx": 0.03908333, "uptimeSeconds": 305044, "current": 0.0715, }, "localStats": { "numPacketsTx": 1280, "numPacketsRx": 1425, }, }, "payload": { "__bytes_b64__": "DTVr0mgSFQhlFQIrh0AdJb8YPyXYFSA9KJTPEg==", }, }, } mesh.store_packet_dict(packet) assert captured path, payload, priority = captured[0] assert path == "/api/telemetry" assert priority == mesh._TELEMETRY_POST_PRIORITY assert payload["id"] == 1_256_091_342 assert payload["node_id"] == "!9e95cf60" assert payload["from_id"] == "!9e95cf60" assert payload["rx_time"] == 1_758_024_300 assert payload["telemetry_time"] == 1_758_024_300 assert payload["channel"] == 0 assert payload["bitfield"] == 1 assert payload["payload_b64"] == "DTVr0mgSFQhlFQIrh0AdJb8YPyXYFSA9KJTPEg==" assert payload["battery_level"] == pytest.approx(101.0) assert payload["voltage"] == pytest.approx(4.224) assert payload["channel_utilization"] == pytest.approx(0.59666663) assert payload["air_util_tx"] == pytest.approx(0.03908333) assert payload["uptime_seconds"] == 305044 assert payload["current"] == pytest.approx(0.0715) assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" assert payload["ingestor"] == "!f00dbabe" def test_store_packet_dict_handles_environment_telemetry(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" packet = { "id": 2_817_720_548, "rxTime": 1_758_024_400, "from": 3_698_627_780, "decoded": { "portnum": "TELEMETRY_APP", "telemetry": { "time": 1_758_024_390, "environmentMetrics": { "temperature": 21.98, "relativeHumidity": 39.475586, "barometricPressure": 1017.8353, "gasResistance": 1456.0, "iaq": 83, "distance": 12.5, "lux": 100.25, "whiteLux": 64.5, "irLux": 12.75, "uvLux": 1.6, "windDirection": 270, "windSpeed": 5.9, "windGust": 7.4, "windLull": 4.8, "weight": 32.7, "radiation": 0.45, "rainfall1h": 0.18, "rainfall24h": 1.42, "soilMoisture": 3100, "soilTemperature": 18.9, }, }, }, } mesh.store_packet_dict(packet) assert captured path, payload, priority = captured[0] assert path == "/api/telemetry" assert payload["id"] == 2_817_720_548 assert payload["node_id"] == "!dc7494c4" assert payload["from_id"] == "!dc7494c4" assert payload["telemetry_time"] == 1_758_024_390 assert payload["temperature"] == pytest.approx(21.98) assert payload["relative_humidity"] == pytest.approx(39.475586) assert payload["barometric_pressure"] == pytest.approx(1017.8353) assert payload["gas_resistance"] == pytest.approx(1456.0) assert payload["iaq"] == 83 assert payload["distance"] == pytest.approx(12.5) assert payload["lux"] == pytest.approx(100.25) assert payload["white_lux"] == pytest.approx(64.5) assert payload["ir_lux"] == pytest.approx(12.75) assert payload["uv_lux"] == pytest.approx(1.6) assert payload["wind_direction"] == 270 assert payload["wind_speed"] == pytest.approx(5.9) assert payload["wind_gust"] == pytest.approx(7.4) assert payload["wind_lull"] == pytest.approx(4.8) assert payload["weight"] == pytest.approx(32.7) assert payload["radiation"] == pytest.approx(0.45) assert payload["rainfall_1h"] == pytest.approx(0.18) assert payload["rainfall_24h"] == pytest.approx(1.42) assert payload["soil_moisture"] == 3100 assert payload["soil_temperature"] == pytest.approx(18.9) assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" def test_store_packet_dict_throttles_host_telemetry(mesh_module, monkeypatch): mesh = mesh_module captured = [] logs = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) monkeypatch.setattr( mesh.config, "_debug_log", lambda message, **metadata: logs.append((message, metadata)), ) mesh.register_host_node_id("!9e95cf60") base_packet = { "id": 1_234, "fromId": "!9e95cf60", "decoded": { "portnum": "TELEMETRY_APP", "telemetry": { "time": 1_000, "deviceMetrics": { "batteryLevel": 50, }, }, }, } mesh.store_packet_dict({**base_packet, "rxTime": 1_000}) mesh.store_packet_dict({**base_packet, "id": 1_235, "rxTime": 1_300}) mesh.store_packet_dict({**base_packet, "id": 1_236, "rxTime": 4_700}) assert len(captured) == 2 first_path, first_payload, _ = captured[0] second_path, second_payload, _ = captured[1] assert first_path == "/api/telemetry" assert second_path == "/api/telemetry" assert first_payload["id"] == 1_234 assert second_payload["id"] == 1_236 suppression_logs = [ entry for entry in logs if entry[0] == "Suppressed host telemetry update" ] assert suppression_logs assert suppression_logs[0][1]["host_node_id"] == "!9e95cf60" assert suppression_logs[0][1]["minutes_remaining"] == 55 def test_store_packet_dict_handles_traceroute_packet(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 915 mesh.config.MODEM_PRESET = "LongFast" mesh.register_host_node_id("!f00dbabe") packet = { "id": 2_934_054_466, "rxTime": 1_763_183_133, "rssi": -70, "snr": 10.25, "fromId": "3664074452", "decoded": { "portnum": "PAXCOUNTER_APP", "dest": "2660618080", "traceroute": { "requestId": 17, "route": [3_663_643_096, "!beadf00d", "c0ffee99", 1_150_717_793], "snrTowards": [42, -14, 41], }, }, } mesh.store_packet_dict(packet) assert captured path, payload, priority = captured[0] assert path == "/api/traces" assert priority == mesh._TRACE_POST_PRIORITY assert payload["id"] == packet["id"] assert payload["request_id"] == 17 assert payload["src"] == 3_664_074_452 assert payload["dest"] == 2_660_618_080 assert payload["rx_time"] == 1_763_183_133 assert payload["rx_iso"] == "2025-11-15T05:05:33Z" assert payload["hops"] == [ 3_663_643_096, 3_199_070_221, 3_237_998_233, 1_150_717_793, ] assert payload["rssi"] == -70 assert payload["snr"] == pytest.approx(10.25) assert "elapsed_ms" in payload assert payload["lora_freq"] == 915 assert payload["modem_preset"] == "LongFast" assert payload["ingestor"] == "!f00dbabe" def test_traceroute_hop_normalization_supports_mappings(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) packet = { "id": 1_111, "decoded": { "portnum": "TRACEROUTE_APP", "traceroute": { "requestId": 42, "route": [{"node_id": "!beadf00d"}, {"num": "0xc0ffee99"}, {"id": 123}], }, }, } mesh.store_packet_dict(packet) assert captured _, payload, _ = captured[0] assert payload["hops"] == [0xBEADF00D, 0xC0FFEE99, 123] def test_traceroute_packet_without_identifiers_is_ignored(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) packet = { "decoded": { "portnum": "TRACEROUTE_APP", "traceroute": {}, }, "rxTime": 123, } mesh.store_packet_dict(packet) assert captured == [] def test_post_queue_prioritises_messages(mesh_module, monkeypatch): mesh = mesh_module mesh._clear_post_queue() calls = [] def record(path, payload): calls.append((path, payload)) monkeypatch.setattr(mesh, "_post_json", record) mesh._enqueue_post_json("/api/messages", {"id": 1}, mesh._MESSAGE_POST_PRIORITY) mesh._enqueue_post_json( "/api/nodes", {"!node": {"foo": "bar"}}, mesh._NODE_POST_PRIORITY ) mesh._drain_post_queue() assert [path for path, _ in calls] == ["/api/messages", "/api/nodes"] def test_drain_post_queue_handles_enqueued_items_during_send(mesh_module): mesh = mesh_module mesh._clear_post_queue() first_send_started = threading.Event() second_item_enqueued = threading.Event() second_item_processed = threading.Event() calls = [] def blocking_send(path, payload): calls.append((path, payload)) if path == "/api/first": first_send_started.set() assert second_item_enqueued.wait(timeout=2), "Second item was not enqueued" elif path == "/api/second": second_item_processed.set() mesh._enqueue_post_json( "/api/first", {"id": 1}, mesh._DEFAULT_POST_PRIORITY, state=mesh.STATE, ) mesh.STATE.active = True drain_thread = threading.Thread( target=mesh._drain_post_queue, kwargs={"state": mesh.STATE, "send": blocking_send}, ) drain_thread.start() assert first_send_started.wait( timeout=2 ), "Drain did not begin processing the first item" mesh._queue_post_json( "/api/second", {"id": 2}, state=mesh.STATE, send=blocking_send, ) second_item_enqueued.set() assert second_item_processed.wait(timeout=2), "Second item was not processed" drain_thread.join(timeout=2) assert not drain_thread.is_alive(), "Drain thread did not finish" assert [path for path, _ in calls] == ["/api/first", "/api/second"] assert not mesh.STATE.queue assert mesh.STATE.active is False def test_store_packet_dict_requires_id(mesh_module, monkeypatch): mesh = mesh_module def fail_post(*_, **__): raise AssertionError("Should not post without an id") monkeypatch.setattr(mesh, "_queue_post_json", fail_post) packet = {"decoded": {"payload": {"text": "hello"}, "portnum": "TEXT_MESSAGE_APP"}} mesh.store_packet_dict(packet) def test_on_receive_logs_when_store_fails(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "_pkt_to_dict", lambda pkt: {"id": 1}) def boom(*_, **__): raise ValueError("boom") monkeypatch.setattr(mesh, "store_packet_dict", boom) mesh.on_receive(object(), interface=None) captured = capsys.readouterr() assert "context=handlers.on_receive" in captured.out assert "Failed to store packet" in captured.out def test_node_items_snapshot_iterable_without_items(mesh_module): mesh = mesh_module class Iterable: def __init__(self): self._data = {"node": {"foo": "bar"}} def __iter__(self): return iter(self._data) def __getitem__(self, key): return self._data[key] snapshot = mesh._node_items_snapshot(Iterable(), retries=1) assert snapshot == [("node", {"foo": "bar"})] def test_node_items_snapshot_handles_empty_input(mesh_module): mesh = mesh_module assert mesh._node_items_snapshot(None) == [] assert mesh._node_items_snapshot({}) == [] def test_debug_log_emits_when_enabled(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "DEBUG", True) mesh._debug_log("hello world") captured = capsys.readouterr() lines = [line for line in captured.out.splitlines() if "hello world" in line] assert lines, "expected debug log output" log_line = lines[-1] pattern = ( r"\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z\] \[potato-mesh\] \[debug\] " ) assert re.match(pattern, log_line), f"unexpected log format: {log_line}" assert log_line.endswith("hello world") def test_event_wait_allows_default_timeout_handles_short_signature( mesh_module, monkeypatch ): mesh = mesh_module def wait_without_timeout(self): return True monkeypatch.setattr( mesh.threading.Event, "wait", wait_without_timeout, raising=False ) assert mesh._event_wait_allows_default_timeout() is True def test_event_wait_allows_default_timeout_handles_varargs(mesh_module, monkeypatch): mesh = mesh_module def wait_with_varargs(self, *args): return False monkeypatch.setattr(mesh.threading.Event, "wait", wait_with_varargs, raising=False) assert mesh._event_wait_allows_default_timeout() is True def test_parse_ble_target_rejects_invalid_values(mesh_module): mesh = mesh_module assert mesh._parse_ble_target("") is None assert mesh._parse_ble_target(" ") is None assert mesh._parse_ble_target("zz:zz:zz:zz:zz:zz") is None def test_parse_ble_target_accepts_mac_addresses(mesh_module): """Test that _parse_ble_target accepts valid MAC address format (Linux/Windows).""" mesh = mesh_module # Valid MAC addresses should be accepted and normalized to uppercase assert mesh._parse_ble_target("ED:4D:9E:95:CF:60") == "ED:4D:9E:95:CF:60" assert mesh._parse_ble_target("ed:4d:9e:95:cf:60") == "ED:4D:9E:95:CF:60" assert mesh._parse_ble_target("AA:BB:CC:DD:EE:FF") == "AA:BB:CC:DD:EE:FF" assert mesh._parse_ble_target("00:11:22:33:44:55") == "00:11:22:33:44:55" # With whitespace assert mesh._parse_ble_target(" ED:4D:9E:95:CF:60 ") == "ED:4D:9E:95:CF:60" # Invalid MAC addresses should be rejected assert mesh._parse_ble_target("ED:4D:9E:95:CF") is None # Too short assert mesh._parse_ble_target("ED:4D:9E:95:CF:60:AB") is None # Too long assert mesh._parse_ble_target("GG:HH:II:JJ:KK:LL") is None # Invalid hex def test_parse_ble_target_accepts_uuids(mesh_module): """Test that _parse_ble_target accepts valid UUID format (macOS).""" mesh = mesh_module # Valid UUIDs should be accepted and normalized to uppercase assert ( mesh._parse_ble_target("C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E") == "C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E" ) assert ( mesh._parse_ble_target("c0aea92f-045e-9b82-c9a6-a1fd822b3a9e") == "C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E" ) assert ( mesh._parse_ble_target("12345678-1234-5678-9ABC-DEF012345678") == "12345678-1234-5678-9ABC-DEF012345678" ) # With whitespace assert ( mesh._parse_ble_target(" C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E ") == "C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E" ) # Invalid UUIDs should be rejected assert mesh._parse_ble_target("C0AEA92F-045E-9B82-C9A6") is None # Too short assert ( mesh._parse_ble_target("C0AEA92F-045E-9B82-C9A6-A1FD822B3A9E-EXTRA") is None ) # Too long assert ( mesh._parse_ble_target("GGGGGGGG-GGGG-GGGG-GGGG-GGGGGGGGGGGG") is None ) # Invalid hex assert ( mesh._parse_ble_target("C0AEA92F:045E:9B82:C9A6:A1FD822B3A9E") is None ) # Wrong separator def test_parse_network_target_additional_cases(mesh_module): mesh = mesh_module assert mesh._parse_network_target("") is None assert mesh._parse_network_target(" ") is None assert mesh._parse_network_target("tcp://example.com") is None host, port = mesh._parse_network_target("tcp://10.1.2.3:abc") assert (host, port) == ("10.1.2.3", mesh._DEFAULT_TCP_PORT) host, port = mesh._parse_network_target("10.1.2.3:9001") assert (host, port) == ("10.1.2.3", 9001) def test_load_ble_interface_sets_global(monkeypatch): repo_root = Path(__file__).resolve().parents[1] monkeypatch.syspath_prepend(str(repo_root)) serial_interface_mod = types.ModuleType("meshtastic.serial_interface") class DummySerial: def __init__(self, *_, **__): pass serial_interface_mod.SerialInterface = DummySerial tcp_interface_mod = types.ModuleType("meshtastic.tcp_interface") tcp_interface_mod.TCPInterface = DummySerial ble_interface_mod = types.ModuleType("meshtastic.ble_interface") class DummyBLE: def __init__(self, *_, **__): pass ble_interface_mod.BLEInterface = DummyBLE meshtastic_mod = types.ModuleType("meshtastic") meshtastic_mod.serial_interface = serial_interface_mod meshtastic_mod.tcp_interface = tcp_interface_mod meshtastic_mod.ble_interface = ble_interface_mod monkeypatch.setitem(sys.modules, "meshtastic", meshtastic_mod) monkeypatch.setitem( sys.modules, "meshtastic.serial_interface", serial_interface_mod ) monkeypatch.setitem(sys.modules, "meshtastic.tcp_interface", tcp_interface_mod) monkeypatch.setitem(sys.modules, "meshtastic.ble_interface", ble_interface_mod) module_name = "data.mesh" module = ( importlib.import_module(module_name) if module_name not in sys.modules else importlib.reload(sys.modules[module_name]) ) monkeypatch.setattr(module, "BLEInterface", None) resolved = module._load_ble_interface() assert resolved is ble_interface_mod.BLEInterface assert module.BLEInterface is ble_interface_mod.BLEInterface def test_default_serial_targets_deduplicates(mesh_module, monkeypatch): mesh = mesh_module def fake_glob(pattern): if pattern == "/dev/ttyUSB*": return ["/dev/ttyUSB0", "/dev/ttyUSB0"] if pattern == "/dev/ttyACM*": return ["/dev/ttyACM1"] return [] monkeypatch.setattr(mesh.interfaces.glob, "glob", fake_glob) targets = mesh._default_serial_targets() assert targets.count("/dev/ttyUSB0") == 1 assert "/dev/ttyACM1" in targets assert "/dev/ttyACM0" in targets def test_post_json_logs_failures(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "INSTANCE", "https://example.invalid") monkeypatch.setattr(mesh, "DEBUG", True) def boom(*_, **__): raise RuntimeError("offline") monkeypatch.setattr(mesh.queue.urllib.request, "urlopen", boom) mesh._post_json("/api/test", {"foo": "bar"}) captured = capsys.readouterr() assert "context=queue.post_json" in captured.out assert "POST request failed" in captured.out def test_queue_post_json_logs_payload_details(mesh_module, monkeypatch, capsys): mesh = mesh_module mesh._clear_post_queue() monkeypatch.setattr(mesh, "DEBUG", True) mesh._queue_post_json( "/api/test", {"alpha": "beta", "count": 7}, send=lambda *_: None, ) out = capsys.readouterr().out assert "Forwarding payload to API" in out assert 'alpha="beta"' in out assert "count=7" in out def test_queue_post_json_skips_when_active(mesh_module, monkeypatch): mesh = mesh_module mesh._clear_post_queue() mesh.STATE.active = True mesh._queue_post_json("/api/test", {"id": 1}) assert mesh.STATE.active is True assert mesh.STATE.queue mesh._clear_post_queue() def test_process_ingestor_heartbeat_updates_flag(mesh_module, monkeypatch): mesh = mesh_module mesh.ingestors.STATE.last_heartbeat = None mesh.ingestors.STATE.node_id = None mesh.handlers.register_host_node_id(None) recorded = {"force": None, "count": 0} def fake_queue_ingestor_heartbeat(*, force): recorded["force"] = force recorded["count"] += 1 return True monkeypatch.setattr( mesh.ingestors, "queue_ingestor_heartbeat", fake_queue_ingestor_heartbeat ) class DummyIface: def __init__(self): self.myNodeNum = 0xCAFEBABE updated = mesh._process_ingestor_heartbeat( DummyIface(), ingestor_announcement_sent=False ) assert updated is True assert recorded["force"] is True assert recorded["count"] == 1 assert mesh.handlers.host_node_id() == "!cafebabe" def test_process_ingestor_heartbeat_skips_without_host(mesh_module, monkeypatch): mesh = mesh_module mesh.handlers.register_host_node_id(None) mesh.ingestors.STATE.node_id = None mesh.ingestors.STATE.last_heartbeat = None monkeypatch.setattr(mesh.ingestors, "queue_ingestor_heartbeat", lambda **_: False) updated = mesh._process_ingestor_heartbeat(None, ingestor_announcement_sent=False) assert updated is False assert mesh.ingestors.STATE.node_id is None assert mesh.ingestors.STATE.last_heartbeat is None def test_ingestor_heartbeat_respects_interval_override(mesh_module, monkeypatch): mesh = mesh_module mesh.ingestors.STATE.start_time = 100 mesh.ingestors.STATE.last_heartbeat = 1_000 mesh.ingestors.STATE.node_id = "!abcd0001" mesh._INGESTOR_HEARTBEAT_SECS = 10_000 monkeypatch.setattr(mesh.ingestors.time, "time", lambda: 2_000) sent = mesh.ingestors.queue_ingestor_heartbeat() assert sent is False assert mesh.ingestors.STATE.last_heartbeat == 1_000 def test_setting_ingestor_attr_propagates(mesh_module): mesh = mesh_module mesh._INGESTOR_HEARTBEAT_SECS = 123 assert mesh.config._INGESTOR_HEARTBEAT_SECS == 123 def test_queue_ingestor_heartbeat_requires_node_id(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh.queue, "_queue_post_json", lambda path, payload, *, priority, send=None: captured.append( (path, payload, priority) ), ) mesh.ingestors.STATE.node_id = None mesh.ingestors.STATE.last_heartbeat = None queued = mesh.ingestors.queue_ingestor_heartbeat(force=True) assert queued is False assert captured == [] def test_queue_ingestor_heartbeat_enqueues_and_throttles(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh.queue, "_queue_post_json", lambda path, payload, *, priority, send=None: captured.append( (path, payload, priority) ), ) mesh.ingestors.STATE.start_time = 1_700_000_000 mesh.ingestors.STATE.last_heartbeat = None mesh.ingestors.STATE.node_id = None mesh.config.LORA_FREQ = 915 mesh.config.MODEM_PRESET = "LongFast" mesh.ingestors.set_ingestor_node_id("!CAFEBABE") first = mesh.ingestors.queue_ingestor_heartbeat(force=True) second = mesh.ingestors.queue_ingestor_heartbeat() assert first is True assert second is False assert len(captured) == 1 path, payload, priority = captured[0] assert path == "/api/ingestors" assert payload["node_id"] == "!cafebabe" assert payload["start_time"] == 1_700_000_000 assert payload["last_seen_time"] >= payload["start_time"] assert payload["version"] == mesh.VERSION assert payload["lora_freq"] == 915 assert payload["modem_preset"] == "LongFast" assert priority == mesh.queue._INGESTOR_POST_PRIORITY def test_mesh_version_export_matches_package(mesh_module): import data mesh = mesh_module assert mesh.VERSION == data.VERSION def test_node_to_dict_handles_proto_fallback(mesh_module, monkeypatch): mesh = mesh_module class FailingProto(mesh.ProtoMessage): def to_dict(self): raise RuntimeError("boom") def __str__(self): return "proto" def fail_message_to_dict(*_, **__): raise RuntimeError("nope") monkeypatch.setattr(mesh, "MessageToDict", fail_message_to_dict) monkeypatch.setattr( mesh.json, "dumps", lambda *_, **__: (_ for _ in ()).throw(TypeError()) ) converted = mesh._node_to_dict({"value": FailingProto()}) assert converted["value"] == "proto" def test_upsert_node_logs_in_debug(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "DEBUG", True) captured = [] def fake_queue(path, payload, *, priority): captured.append((path, payload, priority)) monkeypatch.setattr(mesh, "_queue_post_json", fake_queue) mesh.upsert_node("!node", {"user": {"shortName": "SN", "longName": "LN"}}) assert captured out = capsys.readouterr().out assert "context=handlers.upsert_node" in out assert "Queued node upsert payload" in out def test_store_packet_dict_records_ignored_packets(mesh_module, monkeypatch, tmp_path): mesh = mesh_module monkeypatch.setattr(mesh, "DEBUG", True) ignored_path = tmp_path / "ignored.txt" monkeypatch.setattr(mesh.handlers, "_IGNORED_PACKET_LOG_PATH", ignored_path) monkeypatch.setattr(mesh.handlers, "_IGNORED_PACKET_LOCK", threading.Lock()) packet = {"decoded": {"portnum": "UNKNOWN"}} mesh.store_packet_dict(packet) assert ignored_path.exists() lines = ignored_path.read_text(encoding="utf-8").strip().splitlines() assert lines payload = json.loads(lines[-1]) assert payload["reason"] == "unsupported-port" assert payload["packet"]["decoded"]["portnum"] == "UNKNOWN" def test_coerce_int_and_float_cover_edge_cases(mesh_module): mesh = mesh_module assert mesh._coerce_int(None) is None assert mesh._coerce_int(True) == 1 assert mesh._coerce_int(7) == 7 assert mesh._coerce_int(3.2) == 3 assert mesh._coerce_int(float("inf")) is None assert mesh._coerce_int(" 0x10 ") == 16 assert mesh._coerce_int(" ") is None assert mesh._coerce_int("7.0") == 7 assert mesh._coerce_int("nan") is None class Intable: def __int__(self): return 9 class BadInt: def __int__(self): raise TypeError assert mesh._coerce_int(Intable()) == 9 assert mesh._coerce_int(BadInt()) is None assert mesh._coerce_float(None) is None assert mesh._coerce_float(True) == 1.0 assert mesh._coerce_float(3) == 3.0 assert mesh._coerce_float(float("inf")) is None assert mesh._coerce_float(" 1.5 ") == 1.5 assert mesh._coerce_float(" ") is None assert mesh._coerce_float("nan") is None class Floatable: def __float__(self): return 2.5 class BadFloat: def __float__(self): raise TypeError assert mesh._coerce_float(Floatable()) == 2.5 assert mesh._coerce_float(BadFloat()) is None def test_canonical_node_id_variants(mesh_module): mesh = mesh_module assert mesh._canonical_node_id(None) is None assert mesh._canonical_node_id(0x1234) == "!00001234" assert mesh._canonical_node_id(" ") is None assert mesh._canonical_node_id("!deadbeef") == "!deadbeef" assert mesh._canonical_node_id("0xCAFEBABE") == "!cafebabe" assert mesh._canonical_node_id("12345") == "!00003039" assert mesh._canonical_node_id("nothex") is None def test_node_num_from_id_variants(mesh_module): mesh = mesh_module assert mesh._node_num_from_id(None) is None assert mesh._node_num_from_id(42) == 42 assert mesh._node_num_from_id(-1) is None assert mesh._node_num_from_id(" ") is None assert mesh._node_num_from_id("!00ff") == 0xFF assert mesh._node_num_from_id("0x10") == 16 assert mesh._node_num_from_id("123") == 0x123 assert mesh._node_num_from_id("bad") == int("bad", 16) def test_merge_mappings_handles_non_mappings(mesh_module): mesh = mesh_module @dataclass class UserBase: id: str @dataclass class UserExtra: name: str @dataclass class Holder: user: object base = Holder(UserBase("!1")) extra = Holder(UserExtra("Node")) merged = mesh._merge_mappings(base, extra) assert merged == {"user": {"id": "!1", "name": "Node"}} def test_extract_payload_bytes_edge_cases(mesh_module): mesh = mesh_module assert mesh._extract_payload_bytes(None) is None assert ( mesh._extract_payload_bytes({"payload": {"__bytes_b64__": "invalid"}}) is None ) assert mesh._extract_payload_bytes({"payload": b"data"}) == b"data" assert mesh._extract_payload_bytes({"payload": "ZGF0YQ=="}) == b"data" def test_decode_nodeinfo_payload_handles_user(mesh_module, monkeypatch): mesh = mesh_module from meshtastic.protobuf import mesh_pb2 user = mesh_pb2.User() user.id = "!01020304" payload = user.SerializeToString() def raise_decode(self, *_): raise mesh.DecodeError("fail") monkeypatch.setattr( mesh_pb2.NodeInfo, "ParseFromString", raise_decode, raising=False ) node_info = mesh._decode_nodeinfo_payload(payload) assert node_info is not None assert node_info.user.id == "!01020304" def test_nodeinfo_helpers_cover_fallbacks(mesh_module, monkeypatch): mesh = mesh_module from meshtastic.protobuf import mesh_pb2 node_info = mesh_pb2.NodeInfo() node_info.device_metrics.battery_level = 50 node_info.position.latitude_i = int(1.23 * 1e7) node_info.position.longitude_i = int(4.56 * 1e7) node_info.position.location_source = 99 monkeypatch.setattr( mesh_pb2.Position.LocSource, "Name", lambda value: (_ for _ in ()).throw(RuntimeError()), raising=False, ) metrics = mesh._nodeinfo_metrics_dict(node_info) position = mesh._nodeinfo_position_dict(node_info) assert metrics["batteryLevel"] == 50.0 assert position["locationSource"] == 99 class DummyProto(mesh.ProtoMessage): def __init__(self): self.id = "!11223344" def __str__(self): return "dummy-proto" def to_dict(self): return {"id": self.id} def raise_message_to_dict(*_, **__): raise RuntimeError() monkeypatch.setattr(mesh, "MessageToDict", raise_message_to_dict) user = mesh._nodeinfo_user_dict(node_info, DummyProto()) assert user["id"] == "!11223344" def test_nodeinfo_user_role_falls_back_to_cli_enum(mesh_module, monkeypatch): mesh = mesh_module mesh._reset_cli_role_cache() cli_module = types.ModuleType("meshtastic.cli") cli_common = types.ModuleType("meshtastic.cli.common") class DummyRole(enum.IntEnum): CLIENT = 0 CLIENT_BASE = 12 cli_common.Role = DummyRole cli_module.common = cli_common monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_module) monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_common) user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12}) assert user["role"] == "CLIENT_BASE" mesh._reset_cli_role_cache() cli_dict_module = types.ModuleType("meshtastic.cli") cli_dict_common = types.ModuleType("meshtastic.cli.common") cli_dict_common.ClientRoles = {12: "client_hidden"} cli_dict_module.common = cli_dict_common monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_dict_module) monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_dict_common) user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12}) assert user["role"] == "CLIENT_HIDDEN" mesh._reset_cli_role_cache() def test_store_position_packet_defaults(mesh_module, monkeypatch): mesh = mesh_module captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append((path, payload, priority)), ) mesh.config.LORA_FREQ = 868 mesh.config.MODEM_PRESET = "MediumFast" packet = {"id": "7", "rxTime": "", "from": "!abcd", "to": "", "decoded": {}} mesh.store_position_packet(packet, {}) assert captured _, payload, _ = captured[0] assert payload["node_id"] == "!0000abcd" assert payload["node_num"] == int("abcd", 16) assert payload["to_id"] is None assert payload["latitude"] is None assert payload["longitude"] is None assert payload["lora_freq"] == 868 assert payload["modem_preset"] == "MediumFast" def test_store_nodeinfo_packet_debug(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "DEBUG", True) monkeypatch.setattr(mesh, "_queue_post_json", lambda *_, **__: None) from meshtastic.protobuf import mesh_pb2 node_info = mesh_pb2.NodeInfo() user = node_info.user user.id = "!01020304" user.short_name = "A" user.long_name = "B" node_info.channel = 1 node_info.via_mqtt = True node_info.is_ignored = True node_info.is_key_manually_verified = True payload = { "__bytes_b64__": base64.b64encode(node_info.SerializeToString()).decode() } packet = { "id": 1, "rxTime": 1, "decoded": {"portnum": "NODEINFO_APP", "payload": payload}, } mesh.store_packet_dict(packet) out = capsys.readouterr().out assert "context=handlers.store_nodeinfo" in out assert "Queued nodeinfo payload" in out def test_store_neighborinfo_packet_debug(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "DEBUG", True) captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append(payload), ) packet = { "id": 1, "rxTime": 2, "fromId": "!12345678", "decoded": { "portnum": "NEIGHBORINFO_APP", "neighborinfo": { "nodeId": 0x12345678, "neighbors": [], }, }, } mesh.store_packet_dict(packet) assert captured out = capsys.readouterr().out assert "context=handlers.store_neighborinfo" in out assert "Queued neighborinfo payload" in out def test_store_packet_dict_debug_message(mesh_module, monkeypatch, capsys): mesh = mesh_module monkeypatch.setattr(mesh, "DEBUG", True) captured = [] monkeypatch.setattr( mesh, "_queue_post_json", lambda path, payload, *, priority: captured.append(payload), ) packet = { "id": 2, "rxTime": 10, "fromId": "!abc", "decoded": {"payload": {"text": "hi"}, "portnum": "TEXT_MESSAGE_APP"}, } mesh.store_packet_dict(packet) assert captured out = capsys.readouterr().out assert "context=handlers.store_packet_dict" in out assert "Queued message payload" in out assert "channel_display=0" in out assert "channel_name=" not in out def test_on_receive_skips_seen_packets(mesh_module): mesh = mesh_module packet = {"_potatomesh_seen": True} mesh.on_receive(packet, interface=None) assert packet["_potatomesh_seen"] is True