Files
potato-mesh/tests/test_handlers_unit.py
l5y 512b4f157b Fix regression where Meshcore chat senders show as Meshtastic (#794)
* Fix regression where Meshcore chat senders show as Meshtastic

* Address review feedback for protocol misclassification fix

- ingest.rb: exclude wrapper ``protocol`` key from /api/nodes batch-limit
  count so the documented 1000-node maximum still applies after the
  Python ingestor started stamping protocol at the wrapper level.
- Drop plan-file references from production and test comments per the
  repo guidelines; the why is already explained inline.

* Address protocol-fallback review feedback

- Neighbor placeholder now inherits the source node's protocol from the
  surrounding /api/neighbors entry, so the badge tracks the radio the
  peer lives on instead of collapsing to the neutral "Unknown" label
  (review item #1).
- resolve_record_protocol logs one warn_log line when an explicit
  protocol stamp is rejected as malformed, making misbehaving custom
  protocol adapters visible in the operator log instead of silently
  falling back (review item #3).

* Extract buildNodePlaceholder helper for testability

The neighbor placeholder logic in main.js lives inside an untested
closure, so codecov reported the protocol-propagation lines as
uncovered.  Extract the small placeholder builder into long-link-router
so it can be unit tested directly; the closure-internal call site stays
trivial (one factory call + one fallback call).
2026-05-24 09:49:45 +02:00

1334 lines
49 KiB
Python

# Copyright © 2025-26 l5yth & contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit tests for the :mod:`data.mesh_ingestor.handlers` subpackage."""
from __future__ import annotations
import base64
import sys
import time
from pathlib import Path
from types import SimpleNamespace
import pytest
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
import data.mesh_ingestor.config as config
import data.mesh_ingestor.handlers as handlers
import data.mesh_ingestor.handlers._state as _state_mod
import data.mesh_ingestor.handlers.generic as generic_mod
import data.mesh_ingestor.handlers.ignored as ignored_mod
import data.mesh_ingestor.handlers.telemetry as telemetry_mod
@pytest.fixture(autouse=True)
def reset_handler_state():
"""Reset global handler state between tests."""
_state_mod._host_node_id = None
_state_mod._host_telemetry_last_rx = None
_state_mod._host_nodeinfo_last_seen = None
_state_mod._last_packet_monotonic = None
yield
_state_mod._host_node_id = None
_state_mod._host_telemetry_last_rx = None
_state_mod._host_nodeinfo_last_seen = None
_state_mod._last_packet_monotonic = None
# ---------------------------------------------------------------------------
# _state: host_node_id / register_host_node_id
# ---------------------------------------------------------------------------
class TestHostNodeId:
"""Tests for host node ID state accessors."""
def test_returns_none_initially(self):
"""host_node_id() returns None before registration."""
assert handlers.host_node_id() is None
def test_register_stores_canonical_id(self):
"""Registering a valid node ID stores it canonically."""
handlers.register_host_node_id("!aabbccdd")
assert handlers.host_node_id() == "!aabbccdd"
def test_register_none_clears_id(self):
"""Registering None clears the stored host ID."""
handlers.register_host_node_id("!aabbccdd")
handlers.register_host_node_id(None)
assert handlers.host_node_id() is None
def test_register_resets_telemetry_window(self):
"""Registering a new host ID resets the telemetry suppression window."""
_state_mod._host_telemetry_last_rx = 999_999
handlers.register_host_node_id("!aabbccdd")
assert _state_mod._host_telemetry_last_rx is None
def test_register_resets_nodeinfo_window(self):
"""Registering a new host ID resets the NODEINFO suppression window."""
_state_mod._host_nodeinfo_last_seen = 12345.0
handlers.register_host_node_id("!aabbccdd")
assert _state_mod._host_nodeinfo_last_seen is None
def test_register_canonicalises_numeric(self):
"""Numeric node ID is converted to !xxxxxxxx form."""
handlers.register_host_node_id(0xAABBCCDD)
assert handlers.host_node_id() == "!aabbccdd"
# ---------------------------------------------------------------------------
# _state: last_packet_monotonic / _mark_packet_seen
# ---------------------------------------------------------------------------
class TestLastPacketMonotonic:
"""Tests for packet timestamp tracking."""
def test_returns_none_initially(self):
"""Returns None before any packet is processed."""
assert handlers.last_packet_monotonic() is None
def test_updates_after_mark(self):
"""_mark_packet_seen() updates the monotonic timestamp."""
_state_mod._mark_packet_seen()
ts = handlers.last_packet_monotonic()
assert ts is not None
assert isinstance(ts, float)
def test_mark_packet_seen_exported_from_handlers(self):
"""handlers._mark_packet_seen must be accessible via the package."""
assert callable(handlers._mark_packet_seen)
handlers._mark_packet_seen()
ts = handlers.last_packet_monotonic()
assert ts is not None
# ---------------------------------------------------------------------------
# _state: _host_telemetry_suppressed
# ---------------------------------------------------------------------------
class TestHostTelemetrySuppressed:
"""Tests for host telemetry suppression logic."""
def test_not_suppressed_when_no_previous(self):
"""Not suppressed when no previous telemetry timestamp is set."""
suppressed, mins = _state_mod._host_telemetry_suppressed(int(time.time()))
assert suppressed is False
assert mins == 0
def test_suppressed_within_interval(self):
"""Suppressed when within the suppression window."""
now = int(time.time())
_state_mod._host_telemetry_last_rx = now - 10 # 10 seconds ago
suppressed, mins = _state_mod._host_telemetry_suppressed(now)
assert suppressed is True
assert mins > 0
def test_not_suppressed_after_interval(self):
"""Not suppressed after the full interval has elapsed."""
now = int(time.time())
_state_mod._host_telemetry_last_rx = (
now - _state_mod._HOST_TELEMETRY_INTERVAL_SECS - 1
)
suppressed, mins = _state_mod._host_telemetry_suppressed(now)
assert suppressed is False
assert mins == 0
def test_minutes_remaining_rounds_up(self):
"""Minutes remaining is rounded up (ceiling division)."""
now = int(time.time())
# 30 seconds remaining → 1 minute remaining
_state_mod._host_telemetry_last_rx = (
now - _state_mod._HOST_TELEMETRY_INTERVAL_SECS + 30
)
suppressed, mins = _state_mod._host_telemetry_suppressed(now)
assert suppressed is True
assert mins == 1
# ---------------------------------------------------------------------------
# _state: _host_nodeinfo_suppressed / _mark_host_nodeinfo_seen
# ---------------------------------------------------------------------------
class TestHostNodeinfoSuppressed:
"""Tests for host NODEINFO suppression logic."""
def test_not_suppressed_when_no_previous(self):
"""Not suppressed when no previous NODEINFO timestamp is set."""
assert _state_mod._host_nodeinfo_suppressed(time.monotonic()) is False
def test_suppressed_within_interval(self):
"""Suppressed when within the suppression window."""
now = time.monotonic()
_state_mod._host_nodeinfo_last_seen = now - 10.0 # 10 seconds ago
assert _state_mod._host_nodeinfo_suppressed(now) is True
def test_not_suppressed_after_interval(self):
"""Not suppressed after the full interval has elapsed."""
now = time.monotonic()
_state_mod._host_nodeinfo_last_seen = (
now - _state_mod._HOST_NODEINFO_INTERVAL_SECS - 1.0
)
assert _state_mod._host_nodeinfo_suppressed(now) is False
def test_mark_updates_timestamp(self):
"""_mark_host_nodeinfo_seen stores the provided timestamp."""
now = time.monotonic()
_state_mod._mark_host_nodeinfo_seen(now)
assert _state_mod._host_nodeinfo_last_seen == now
def test_suppressed_after_mark(self):
"""Immediately after marking, a second call is suppressed."""
now = time.monotonic()
_state_mod._mark_host_nodeinfo_seen(now)
assert _state_mod._host_nodeinfo_suppressed(now + 1.0) is True
def test_not_suppressed_after_mark_and_full_interval(self):
"""After a full interval has elapsed, suppression lifts."""
long_ago = time.monotonic() - _state_mod._HOST_NODEINFO_INTERVAL_SECS - 5.0
_state_mod._mark_host_nodeinfo_seen(long_ago)
assert _state_mod._host_nodeinfo_suppressed(time.monotonic()) is False
# ---------------------------------------------------------------------------
# radio: _radio_metadata_fields / _apply_radio_metadata
# ---------------------------------------------------------------------------
class TestRadioMetadata:
"""Tests for radio metadata helper functions."""
def test_empty_when_neither_configured(self, monkeypatch):
"""Returns empty dict when LORA_FREQ and MODEM_PRESET are both None."""
monkeypatch.setattr(config, "LORA_FREQ", None)
monkeypatch.setattr(config, "MODEM_PRESET", None)
assert handlers._radio_metadata_fields() == {}
def test_includes_lora_freq(self, monkeypatch):
"""Includes lora_freq when configured."""
monkeypatch.setattr(config, "LORA_FREQ", 915)
monkeypatch.setattr(config, "MODEM_PRESET", None)
assert handlers._radio_metadata_fields() == {"lora_freq": 915}
def test_includes_modem_preset(self, monkeypatch):
"""Includes modem_preset when configured."""
monkeypatch.setattr(config, "LORA_FREQ", None)
monkeypatch.setattr(config, "MODEM_PRESET", "LongFast")
assert handlers._radio_metadata_fields() == {"modem_preset": "LongFast"}
def test_apply_radio_metadata_enriches_payload(self, monkeypatch):
"""_apply_radio_metadata adds radio fields to the payload."""
monkeypatch.setattr(config, "LORA_FREQ", 915)
monkeypatch.setattr(config, "MODEM_PRESET", "LongFast")
payload = {"id": 1}
result = handlers._apply_radio_metadata(payload)
assert result["lora_freq"] == 915
assert result["modem_preset"] == "LongFast"
assert result is payload # mutated in-place
def test_apply_radio_metadata_to_nodes_enriches_node_dicts(self, monkeypatch):
"""_apply_radio_metadata_to_nodes enriches each node-value dict."""
monkeypatch.setattr(config, "LORA_FREQ", 915)
monkeypatch.setattr(config, "MODEM_PRESET", None)
payload = {"!aabb": {"lastHeard": 100}, "ingestor": "!host"}
handlers._apply_radio_metadata_to_nodes(payload)
assert payload["!aabb"]["lora_freq"] == 915
# Non-dict values like "ingestor" string are not enriched
assert isinstance(payload["ingestor"], str)
# ---------------------------------------------------------------------------
# ignored: _record_ignored_packet
# ---------------------------------------------------------------------------
class TestRecordIgnoredPacket:
"""Tests for :func:`handlers.ignored._record_ignored_packet`."""
def test_noop_when_debug_false(self, monkeypatch, tmp_path):
"""Does nothing when DEBUG is disabled."""
monkeypatch.setattr(config, "DEBUG", False)
log_path = tmp_path / "ignored.txt"
monkeypatch.setattr(ignored_mod, "_IGNORED_PACKET_LOG_PATH", log_path)
ignored_mod._record_ignored_packet({"test": 1}, reason="test-reason")
assert not log_path.exists()
def test_writes_json_line_when_debug(self, monkeypatch, tmp_path):
"""Appends a JSON record when DEBUG is enabled."""
import json
import threading
monkeypatch.setattr(config, "DEBUG", True)
log_path = tmp_path / "ignored.txt"
monkeypatch.setattr(ignored_mod, "_IGNORED_PACKET_LOG_PATH", log_path)
monkeypatch.setattr(ignored_mod, "_IGNORED_PACKET_LOCK", threading.Lock())
ignored_mod._record_ignored_packet(
{"portnum": "BAD"}, reason="unsupported-port"
)
assert log_path.exists()
line = log_path.read_text().strip()
record = json.loads(line)
assert record["reason"] == "unsupported-port"
assert "timestamp" in record
def test_bytes_in_packet_are_base64(self, monkeypatch, tmp_path):
"""Byte values in the packet are Base64-encoded in the log."""
import json
import threading
monkeypatch.setattr(config, "DEBUG", True)
log_path = tmp_path / "ignored.txt"
monkeypatch.setattr(ignored_mod, "_IGNORED_PACKET_LOG_PATH", log_path)
monkeypatch.setattr(ignored_mod, "_IGNORED_PACKET_LOCK", threading.Lock())
ignored_mod._record_ignored_packet({"data": b"\x00\x01"}, reason="test")
record = json.loads(log_path.read_text().strip())
assert record["packet"]["data"] == base64.b64encode(b"\x00\x01").decode()
# ---------------------------------------------------------------------------
# position: base64_payload
# ---------------------------------------------------------------------------
class TestBase64Payload:
"""Tests for :func:`handlers.base64_payload`."""
def test_none_returns_none(self):
"""None input returns None."""
assert handlers.base64_payload(None) is None
def test_empty_bytes_returns_none(self):
"""Empty bytes return None."""
assert handlers.base64_payload(b"") is None
def test_encodes_bytes(self):
"""Non-empty bytes are Base64 encoded."""
result = handlers.base64_payload(b"\x00\x01\x02")
assert result == base64.b64encode(b"\x00\x01\x02").decode("ascii")
# ---------------------------------------------------------------------------
# generic: _is_encrypted_flag
# ---------------------------------------------------------------------------
class TestIsEncryptedFlag:
"""Tests for :func:`handlers._is_encrypted_flag`."""
def test_true_bool(self):
assert handlers._is_encrypted_flag(True) is True
def test_false_bool(self):
assert handlers._is_encrypted_flag(False) is False
def test_nonzero_int(self):
assert handlers._is_encrypted_flag(1) is True
def test_zero_int(self):
assert handlers._is_encrypted_flag(0) is False
def test_empty_string(self):
assert handlers._is_encrypted_flag("") is False
def test_false_string(self):
assert handlers._is_encrypted_flag("false") is False
def test_no_string(self):
assert handlers._is_encrypted_flag("no") is False
def test_zero_string(self):
assert handlers._is_encrypted_flag("0") is False
def test_truthy_string(self):
assert handlers._is_encrypted_flag("yes") is True
def test_none_is_falsy(self):
assert handlers._is_encrypted_flag(None) is False
def test_nonempty_bytes(self):
assert handlers._is_encrypted_flag(b"\x01") is True
def test_empty_bytes(self):
assert handlers._is_encrypted_flag(b"") is False
# ---------------------------------------------------------------------------
# generic: upsert_node
# ---------------------------------------------------------------------------
class TestUpsertNode:
"""Tests for :func:`handlers.upsert_node`."""
def test_queues_node_payload(self):
"""upsert_node enqueues a POST to /api/nodes."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.upsert_node("!aabbccdd", {"user": {"shortName": "AB"}})
finally:
q._queue_post_json = original
assert any(p == "/api/nodes" for p, _ in sent)
def test_includes_ingestor_field(self):
"""Payload includes ingestor field with host node ID."""
import data.mesh_ingestor.queue as q
handlers.register_host_node_id("!deadbeef")
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.upsert_node("!aabbccdd", {"user": {}})
finally:
q._queue_post_json = original
_, payload = sent[0]
assert payload.get("ingestor") == "!deadbeef"
def test_includes_protocol_field_from_config(self, monkeypatch):
"""Payload carries config.PROTOCOL so the web app classifies the upsert
correctly even before the ingestor heartbeat has registered a
protocol mapping (see CONTRACTS.md).
"""
import data.mesh_ingestor.queue as q
from data.mesh_ingestor import config as ingestor_config
monkeypatch.setattr(ingestor_config, "PROTOCOL", "meshcore")
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.upsert_node("!aabbccdd", {"user": {}})
finally:
q._queue_post_json = original
_, payload = sent[0]
assert payload.get("protocol") == "meshcore"
# ---------------------------------------------------------------------------
# generic: on_receive deduplication
# ---------------------------------------------------------------------------
class TestOnReceive:
"""Tests for :func:`handlers.on_receive`."""
def test_deduplicates_via_seen_flag(self, monkeypatch):
"""Packets with _potatomesh_seen=True are skipped."""
calls = []
monkeypatch.setattr(
"data.mesh_ingestor.handlers.generic.store_packet_dict",
lambda pkt: calls.append(pkt),
)
packet = {"_potatomesh_seen": True, "decoded": {}}
handlers.on_receive(packet, None)
assert calls == []
def test_marks_packet_seen(self, monkeypatch):
"""First call marks the packet as seen."""
monkeypatch.setattr(
"data.mesh_ingestor.handlers.generic.store_packet_dict",
lambda pkt: None,
)
packet = {"decoded": {}}
handlers.on_receive(packet, None)
assert packet.get("_potatomesh_seen") is True
def test_updates_monotonic_timestamp(self, monkeypatch):
"""on_receive updates the last-packet monotonic timestamp."""
monkeypatch.setattr(
"data.mesh_ingestor.handlers.generic.store_packet_dict",
lambda pkt: None,
)
handlers.on_receive({"decoded": {}}, None)
assert handlers.last_packet_monotonic() is not None
# ---------------------------------------------------------------------------
# store_position_packet
# ---------------------------------------------------------------------------
class TestStorePositionPacket:
"""Tests for :func:`handlers.store_position_packet`."""
def _make_packet(self, from_id="!aabbccdd", pkt_id=1001, **extra):
pkt = {
"id": pkt_id,
"rxTime": 1_700_000_000,
"fromId": from_id,
"decoded": {
"position": {"latitude": 37.5, "longitude": -122.1},
},
}
pkt.update(extra)
return pkt
def test_queues_position_payload(self):
"""Valid position packet is queued to /api/positions."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
self._make_packet(),
{"position": {"latitude": 37.5, "longitude": -122.1}},
)
finally:
q._queue_post_json = original
assert any(p == "/api/positions" for p, _ in sent)
def test_includes_protocol_field(self, monkeypatch):
"""Position payload carries the configured protocol so the web app
classifies the record correctly even before the ingestor heartbeat
has registered. See CONTRACTS.md.
"""
import data.mesh_ingestor.queue as q
from data.mesh_ingestor import config as ingestor_config
monkeypatch.setattr(ingestor_config, "PROTOCOL", "meshcore")
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
self._make_packet(),
{"position": {"latitude": 37.5, "longitude": -122.1}},
)
finally:
q._queue_post_json = original
_, payload = next((p, pl) for p, pl in sent if p == "/api/positions")
assert payload.get("protocol") == "meshcore"
def test_skips_when_no_node_id(self):
"""Packet missing a node ID is silently dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet({}, {})
finally:
q._queue_post_json = original
assert sent == []
def test_skips_when_no_packet_id(self):
"""Packet missing a packet ID is silently dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet({"fromId": "!aabbccdd"}, {})
finally:
q._queue_post_json = original
assert sent == []
def test_latitude_i_conversion(self):
"""latitudeI integer is divided by 1e7 to get degrees."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
{"id": 99, "rxTime": 100, "fromId": "!aabbccdd"},
{"position": {"latitudeI": 375000000, "longitudeI": -1221000000}},
)
finally:
q._queue_post_json = original
assert len(sent) == 1
payload = sent[0][1]
assert abs(payload["latitude"] - 37.5) < 1e-4
assert abs(payload["longitude"] - -122.1) < 1e-4
# Issue #782: paired ``(lat, lon) = (0, 0)`` is the Meshtastic "no GPS
# lock" sentinel and must collapse to ``None``; the altitude and location
# source siblings ride along because they are meaningless without a fix.
def test_paired_zero_lat_lon_collapses(self):
"""Sentinel coords emit ``latitude`` / ``longitude`` / ``altitude`` as None."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
{"id": 100, "rxTime": 100, "fromId": "!aabbccdd"},
{
"position": {
"latitude": 0.0,
"longitude": 0.0,
"altitude": 0,
"locationSource": "LOC_MANUAL",
},
},
)
finally:
q._queue_post_json = original
assert len(sent) == 1
payload = sent[0][1]
assert payload["latitude"] is None
assert payload["longitude"] is None
assert payload["altitude"] is None
assert payload["location_source"] is None
def test_paired_zero_latitude_i_collapses(self):
"""Integer-scaled ``(latitudeI=0, longitudeI=0)`` also collapses."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
{"id": 101, "rxTime": 100, "fromId": "!aabbccdd"},
{"position": {"latitudeI": 0, "longitudeI": 0}},
)
finally:
q._queue_post_json = original
assert len(sent) == 1
payload = sent[0][1]
assert payload["latitude"] is None
assert payload["longitude"] is None
def test_equator_fix_preserved(self):
"""``(lat=0, lon=13.5)`` is a real equator fix and must survive."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
{"id": 102, "rxTime": 100, "fromId": "!aabbccdd"},
{"position": {"latitude": 0.0, "longitude": 13.5}},
)
finally:
q._queue_post_json = original
assert len(sent) == 1
payload = sent[0][1]
assert payload["latitude"] == 0.0
assert abs(payload["longitude"] - 13.5) < 1e-9
def test_zero_position_time_stripped(self):
"""``position.time = 0`` is the firmware "no GPS lock" sentinel."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_position_packet(
{"id": 103, "rxTime": 100, "fromId": "!aabbccdd"},
{"position": {"latitude": 37.5, "longitude": -122.1, "time": 0}},
)
finally:
q._queue_post_json = original
assert len(sent) == 1
payload = sent[0][1]
assert payload["position_time"] is None
# ---------------------------------------------------------------------------
# store_telemetry_packet
# ---------------------------------------------------------------------------
class TestStoreTelemetryPacket:
"""Tests for :func:`handlers.store_telemetry_packet`."""
def _make_telemetry_packet(self, from_id="!aabbccdd", pkt_id=2001):
return {
"id": pkt_id,
"rxTime": 1_700_000_000,
"fromId": from_id,
"decoded": {
"portnum": "TELEMETRY_APP",
"telemetry": {
"deviceMetrics": {"batteryLevel": 80, "voltage": 3.8},
},
},
}
def test_queues_telemetry_payload(self):
"""Valid telemetry packet is queued to /api/telemetry."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
pkt = self._make_telemetry_packet()
handlers.store_telemetry_packet(pkt, pkt["decoded"])
finally:
q._queue_post_json = original
assert any(p == "/api/telemetry" for p, _ in sent)
def test_skips_without_telemetry_section(self):
"""Packet without a telemetry section is silently dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_telemetry_packet({"id": 1}, {})
finally:
q._queue_post_json = original
assert sent == []
def test_skips_without_packet_id(self):
"""Telemetry packet without an id is dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_telemetry_packet(
{"fromId": "!aabbccdd"},
{"telemetry": {"deviceMetrics": {}}},
)
finally:
q._queue_post_json = original
assert sent == []
def test_host_telemetry_suppressed_within_interval(self, monkeypatch):
"""Host node telemetry is suppressed within the interval window."""
import data.mesh_ingestor.queue as q
handlers.register_host_node_id("!aabbccdd")
now = int(time.time())
_state_mod._host_telemetry_last_rx = now - 10 # recent
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
pkt = {
"id": 1,
"rxTime": now,
"fromId": "!aabbccdd",
"decoded": {
"portnum": "TELEMETRY_APP",
"telemetry": {"deviceMetrics": {"batteryLevel": 80}},
},
}
handlers.store_telemetry_packet(pkt, pkt["decoded"])
finally:
q._queue_post_json = original
assert sent == []
def test_telemetry_type_device(self):
"""deviceMetrics triggers telemetry_type='device'."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
pkt = self._make_telemetry_packet()
handlers.store_telemetry_packet(pkt, pkt["decoded"])
finally:
q._queue_post_json = original
_, payload = sent[0]
assert payload.get("telemetry_type") == "device"
def test_invalid_telemetry_type_dropped_from_payload(self, monkeypatch):
"""Unrecognised telemetry_type is omitted from the payload."""
import data.mesh_ingestor.queue as q
monkeypatch.setattr(telemetry_mod, "_VALID_TELEMETRY_TYPES", frozenset())
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
pkt = self._make_telemetry_packet()
handlers.store_telemetry_packet(pkt, pkt["decoded"])
finally:
q._queue_post_json = original
_, payload = sent[0]
assert "telemetry_type" not in payload
# ---------------------------------------------------------------------------
# store_nodeinfo_packet
# ---------------------------------------------------------------------------
class TestStoreNodeinfoPacket:
"""Tests for :func:`handlers.store_nodeinfo_packet`."""
def test_queues_node_payload(self):
"""Valid nodeinfo packet is queued to /api/nodes."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_nodeinfo_packet(
{"id": 1, "rxTime": 100, "fromId": "!aabbccdd"},
{
"user": {
"id": "!aabbccdd",
"shortName": "AB",
"longName": "Alpha Bravo",
}
},
)
finally:
q._queue_post_json = original
assert any(p == "/api/nodes" for p, _ in sent)
def test_skips_when_no_node_id(self):
"""Packet with no resolvable node ID is silently dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_nodeinfo_packet({}, {})
finally:
q._queue_post_json = original
assert sent == []
def test_host_nodeinfo_not_suppressed_on_first_call(self):
"""First NODEINFO from the host node is always forwarded."""
import data.mesh_ingestor.queue as q
handlers.register_host_node_id("!aabbccdd")
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(path)
try:
handlers.store_nodeinfo_packet(
{"id": 1, "rxTime": 100, "fromId": "!aabbccdd"},
{"user": {"id": "!aabbccdd", "shortName": "AB", "longName": "Alpha"}},
)
finally:
q._queue_post_json = original
assert "/api/nodes" in sent
def test_host_nodeinfo_suppressed_within_window(self):
"""Second NODEINFO from the host within the throttle window is dropped."""
import data.mesh_ingestor.queue as q
handlers.register_host_node_id("!aabbccdd")
# Simulate a recent upsert so the window is active.
_state_mod._mark_host_nodeinfo_seen(time.monotonic())
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(path)
try:
handlers.store_nodeinfo_packet(
{"id": 2, "rxTime": 200, "fromId": "!aabbccdd"},
{"user": {"id": "!aabbccdd", "shortName": "AB", "longName": "Alpha"}},
)
finally:
q._queue_post_json = original
assert sent == []
def test_host_nodeinfo_allowed_after_window_expires(self):
"""NODEINFO from the host is forwarded after the throttle window expires."""
import data.mesh_ingestor.queue as q
handlers.register_host_node_id("!aabbccdd")
# Place last-seen far in the past so the window has expired.
_state_mod._host_nodeinfo_last_seen = (
time.monotonic() - _state_mod._HOST_NODEINFO_INTERVAL_SECS - 10.0
)
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(path)
try:
handlers.store_nodeinfo_packet(
{"id": 3, "rxTime": 300, "fromId": "!aabbccdd"},
{"user": {"id": "!aabbccdd", "shortName": "AB", "longName": "Alpha"}},
)
finally:
q._queue_post_json = original
assert "/api/nodes" in sent
def test_non_host_nodeinfo_never_suppressed(self):
"""NODEINFO from a non-host node is never throttled."""
import data.mesh_ingestor.queue as q
handlers.register_host_node_id("!aabbccdd")
# Mark the host as recently seen to activate the throttle.
_state_mod._mark_host_nodeinfo_seen(time.monotonic())
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(path)
try:
handlers.store_nodeinfo_packet(
{"id": 4, "rxTime": 400, "fromId": "!11223344"},
{
"user": {
"id": "!11223344",
"shortName": "CD",
"longName": "Charlie Delta",
}
},
)
finally:
q._queue_post_json = original
assert "/api/nodes" in sent
# Issue #782: a nodeinfo packet whose merged position dict carries the
# Meshtastic ``(0, 0)`` Null Island sentinel must have the coords stripped
# from the queued ``POST /api/nodes`` body — alongside ``altitude`` /
# ``locationSource``, which are meaningless without a fix.
def test_nodeinfo_paired_zero_position_stripped(self):
"""Sentinel ``(0, 0)`` position drops latitude/longitude/altitude."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_nodeinfo_packet(
{"id": 5, "rxTime": 500, "fromId": "!11223399"},
{
"user": {
"id": "!11223399",
"shortName": "ZZ",
"longName": "Zero Zero",
},
"position": {
"latitude": 0.0,
"longitude": 0.0,
"altitude": 0,
"locationSource": "LOC_MANUAL",
},
},
)
finally:
q._queue_post_json = original
assert sent, "expected at least one queued POST"
body = sent[-1][1]
position = body["!11223399"].get("position")
# All four sentinel keys are stripped; the position dict may either be
# absent entirely or present-but-empty depending on other merged keys.
if position is not None:
for key in ("latitude", "longitude", "altitude", "locationSource"):
assert key not in position
def test_nodeinfo_zero_position_time_stripped(self):
"""``position.time = 0`` collapses out of the nodeinfo POST body."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_nodeinfo_packet(
{"id": 6, "rxTime": 600, "fromId": "!11223388"},
{
"user": {
"id": "!11223388",
"shortName": "ZT",
"longName": "Zero Time",
},
"position": {
"latitude": 52.5,
"longitude": 13.4,
"time": 0,
},
},
)
finally:
q._queue_post_json = original
assert sent
body = sent[-1][1]
position = body["!11223388"].get("position")
assert position is not None
assert "time" not in position
assert position["latitude"] == 52.5
assert position["longitude"] == 13.4
def test_nodeinfo_equator_fix_preserved(self):
"""``(lat=0, lon=13.4)`` is a real equator fix and survives."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_nodeinfo_packet(
{"id": 7, "rxTime": 700, "fromId": "!11223377"},
{
"user": {
"id": "!11223377",
"shortName": "EQ",
"longName": "Equator",
},
"position": {"latitude": 0.0, "longitude": 13.4},
},
)
finally:
q._queue_post_json = original
assert sent
body = sent[-1][1]
position = body["!11223377"].get("position")
assert position["latitude"] == 0.0
assert position["longitude"] == 13.4
# ---------------------------------------------------------------------------
# store_neighborinfo_packet
# ---------------------------------------------------------------------------
class TestStoreNeighborinfoPacket:
"""Tests for :func:`handlers.store_neighborinfo_packet`."""
def test_queues_neighbor_payload(self):
"""Valid neighborinfo packet is queued to /api/neighbors."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_neighborinfo_packet(
{"id": 1, "rxTime": 100, "fromId": "!aabbccdd"},
{
"neighborinfo": {
"nodeId": 0xAABBCCDD,
"neighbors": [
{"nodeId": 0x11223344, "snr": 5.0},
],
}
},
)
finally:
q._queue_post_json = original
assert any(p == "/api/neighbors" for p, _ in sent)
def test_skips_when_no_neighborinfo_section(self):
"""Missing neighborinfo section is silently dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_neighborinfo_packet({"fromId": "!aabbccdd"}, {})
finally:
q._queue_post_json = original
assert sent == []
# ---------------------------------------------------------------------------
# store_router_heartbeat_packet
# ---------------------------------------------------------------------------
class TestStoreRouterHeartbeatPacket:
"""Tests for :func:`handlers.store_router_heartbeat_packet`."""
def test_queues_node_upsert(self):
"""Router heartbeat queues a minimal node upsert."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_router_heartbeat_packet(
{"fromId": "!aabbccdd", "rxTime": 1_700_000_000}
)
finally:
q._queue_post_json = original
assert any(p == "/api/nodes" for p, _ in sent)
def test_skips_when_no_from_id(self):
"""Heartbeat without from_id is silently dropped."""
import data.mesh_ingestor.queue as q
sent = []
original = q._queue_post_json
q._queue_post_json = lambda path, payload, *, priority, **kw: sent.append(
(path, payload)
)
try:
handlers.store_router_heartbeat_packet({})
finally:
q._queue_post_json = original
assert sent == []
# ---------------------------------------------------------------------------
# _coerce_emoji_codepoint
# ---------------------------------------------------------------------------
class TestCoerceEmojiCodepoint:
"""Tests for :func:`_coerce_emoji_codepoint`."""
def test_none_returns_none(self):
"""``None`` input yields ``None``."""
assert generic_mod._coerce_emoji_codepoint(None) is None
def test_int_codepoint_above_127(self):
"""Integer codepoint above 127 is converted to the character."""
assert generic_mod._coerce_emoji_codepoint(128077) == "\U0001f44d"
def test_string_codepoint_above_127(self):
"""Digit string representing a codepoint above 127 is converted."""
assert generic_mod._coerce_emoji_codepoint("128077") == "\U0001f44d"
def test_small_int_preserved_as_string(self):
"""Small integer (≤ 127) is kept as its string representation."""
assert generic_mod._coerce_emoji_codepoint(1) == "1"
def test_small_string_digit_preserved(self):
"""Digit string ≤ 127 is kept as-is (slot marker)."""
assert generic_mod._coerce_emoji_codepoint("1") == "1"
def test_emoji_string_passthrough(self):
"""An already-resolved emoji character passes through."""
assert generic_mod._coerce_emoji_codepoint("\U0001f44d") == "\U0001f44d"
def test_whitespace_only_returns_none(self):
"""Whitespace-only string yields ``None``."""
assert generic_mod._coerce_emoji_codepoint(" ") is None
def test_empty_string_returns_none(self):
"""Empty string yields ``None``."""
assert generic_mod._coerce_emoji_codepoint("") is None
def test_float_codepoint_above_127(self):
"""Float codepoint above 127 is truncated and converted."""
assert generic_mod._coerce_emoji_codepoint(128077.0) == "\U0001f44d"
def test_invalid_codepoint_returns_none(self):
"""Out-of-range numeric codepoint returns ``None`` rather than the
decimal form (which would render as garbage in the chat log)."""
assert generic_mod._coerce_emoji_codepoint(0x7FFFFFFF) is None
def test_invalid_string_codepoint_returns_none(self):
"""Out-of-range numeric string also returns ``None``."""
assert generic_mod._coerce_emoji_codepoint(str(0x7FFFFFFF)) is None
# ---------------------------------------------------------------------------
# _is_reaction_placeholder_text
# ---------------------------------------------------------------------------
class TestIsReactionPlaceholderText:
"""Tests for :func:`_is_reaction_placeholder_text`."""
def test_none_is_placeholder(self):
"""``None`` is a placeholder."""
assert generic_mod._is_reaction_placeholder_text(None) is True
def test_empty_is_placeholder(self):
"""Empty string is a placeholder."""
assert generic_mod._is_reaction_placeholder_text("") is True
def test_whitespace_is_placeholder(self):
"""Whitespace-only string is a placeholder."""
assert generic_mod._is_reaction_placeholder_text(" ") is True
def test_digit_slot_marker(self):
"""Digit strings like '1' and '3' are placeholders."""
assert generic_mod._is_reaction_placeholder_text("1") is True
assert generic_mod._is_reaction_placeholder_text("3") is True
def test_bare_emoji_is_placeholder(self):
"""A single emoji character is a placeholder."""
assert generic_mod._is_reaction_placeholder_text("\U0001f44d") is True
def test_substantial_text_is_not_placeholder(self):
"""Prose text is not a placeholder."""
assert generic_mod._is_reaction_placeholder_text("Hello world") is False
def test_text_with_emoji_is_not_placeholder(self):
"""Text containing both words and emoji is not a placeholder."""
assert (
generic_mod._is_reaction_placeholder_text("Great job! \U0001f44d") is False
)
def test_short_ascii_word_is_not_placeholder(self):
"""A short ASCII word is not a placeholder."""
assert generic_mod._is_reaction_placeholder_text("hi") is False
# ---------------------------------------------------------------------------
# _is_likely_reaction
# ---------------------------------------------------------------------------
class TestIsLikelyReaction:
"""Tests for :func:`_is_likely_reaction`."""
def test_reaction_app_portnum_string(self):
"""Explicit REACTION_APP portnum is always a reaction."""
assert (
generic_mod._is_likely_reaction(
"REACTION_APP", None, 123, "\U0001f44d", None
)
is True
)
def test_reply_id_emoji_no_text(self):
"""reply_id + emoji + no text is a reaction."""
assert (
generic_mod._is_likely_reaction(
"TEXT_MESSAGE_APP", 1, 123, "\U0001f44d", None
)
is True
)
def test_reply_id_emoji_digit_text(self):
"""reply_id + emoji + digit count text is a reaction."""
assert (
generic_mod._is_likely_reaction(
"TEXT_MESSAGE_APP", 1, 123, "\U0001f44d", "3"
)
is True
)
def test_reply_id_emoji_substantial_text_not_reaction(self):
"""reply_id + emoji + substantial text is NOT a reaction."""
assert (
generic_mod._is_likely_reaction(
"TEXT_MESSAGE_APP", 1, 123, "\U0001f44d", "Great job!"
)
is False
)
def test_no_emoji_not_reaction(self):
"""Missing emoji means not a reaction (even with reply_id)."""
assert (
generic_mod._is_likely_reaction("TEXT_MESSAGE_APP", 1, 123, None, None)
is False
)
def test_no_reply_id_not_reaction(self):
"""Missing reply_id means not a reaction (non-REACTION_APP portnum)."""
assert (
generic_mod._is_likely_reaction(
"TEXT_MESSAGE_APP", 1, None, "\U0001f44d", None
)
is False
)
def test_portnum_int_matches_reaction_candidate(self, monkeypatch):
"""An unknown portnum string is still classified as a reaction when
``portnum_int`` matches one of the firmware-resolved REACTION_APP
candidates. Different Meshtastic firmware versions assign different
integer values to the REACTION_APP enum, so the integer fallback is
the authoritative path."""
monkeypatch.setattr(generic_mod, "_portnum_candidates", lambda name: {77})
assert (
generic_mod._is_likely_reaction("UNKNOWN_PORT", 77, None, None, None)
is True
)
# ---------------------------------------------------------------------------
# _coerce_emoji_codepoint — string conversion failure path
# ---------------------------------------------------------------------------
class TestCoerceEmojiStringFailure:
"""Cover the ``except Exception`` branch in :func:`_coerce_emoji_codepoint`.
The string-conversion ``try`` is defensive against pathological values
(objects whose ``__str__`` raises). We exercise it directly so the
fallback ``return None`` line is covered.
"""
def test_object_str_raises(self):
"""A value whose ``__str__`` raises yields ``None``."""
class Boom:
def __str__(self): # noqa: D401 - pytest-style helper
raise RuntimeError("boom")
assert generic_mod._coerce_emoji_codepoint(Boom()) is None