ingestor: support ROUTING_APP messages (#584)

* ingestor: support ROUTING_APP messages

* data: cover missing unit test vectors

* data: address review comments

* tests: fix
This commit is contained in:
l5y
2025-12-31 13:13:34 +01:00
committed by GitHub
parent 4591d5acd6
commit 09fbc32e48
2 changed files with 171 additions and 46 deletions

View File

@@ -100,6 +100,41 @@ from .serialization import (
)
def _portnum_candidates(name: str) -> set[int]:
"""Return Meshtastic port number candidates for ``name``.
Parameters:
name: Port name to look up in Meshtastic ``PortNum`` enums.
Returns:
Set of integer port numbers resolved from Meshtastic modules.
"""
candidates: set[int] = set()
for module_name in (
"meshtastic.portnums_pb2",
"meshtastic.protobuf.portnums_pb2",
):
module = sys.modules.get(module_name)
if module is None:
with contextlib.suppress(ModuleNotFoundError):
module = importlib.import_module(module_name)
if module is None:
continue
portnum_enum = getattr(module, "PortNum", None)
value_lookup = getattr(portnum_enum, "Value", None) if portnum_enum else None
if callable(value_lookup):
with contextlib.suppress(Exception):
candidate = _coerce_int(value_lookup(name))
if candidate is not None:
candidates.add(candidate)
constant_value = getattr(module, name, None)
candidate = _coerce_int(constant_value)
if candidate is not None:
candidates.add(candidate)
return candidates
def register_host_node_id(node_id: str | None) -> None:
"""Record the canonical identifier for the connected host device.
@@ -1280,28 +1315,7 @@ def store_packet_dict(packet: Mapping) -> None:
traceroute_section = (
decoded.get("traceroute") if isinstance(decoded, Mapping) else None
)
traceroute_port_ints: set[int] = set()
for module_name in (
"meshtastic.portnums_pb2",
"meshtastic.protobuf.portnums_pb2",
):
module = sys.modules.get(module_name)
if module is None:
with contextlib.suppress(ModuleNotFoundError):
module = importlib.import_module(module_name)
if module is None:
continue
portnum_enum = getattr(module, "PortNum", None)
value_lookup = getattr(portnum_enum, "Value", None) if portnum_enum else None
if callable(value_lookup):
with contextlib.suppress(Exception):
candidate = _coerce_int(value_lookup("TRACEROUTE_APP"))
if candidate is not None:
traceroute_port_ints.add(candidate)
constant_value = getattr(module, "TRACEROUTE_APP", None)
candidate = _coerce_int(constant_value)
if candidate is not None:
traceroute_port_ints.add(candidate)
traceroute_port_ints = _portnum_candidates("TRACEROUTE_APP")
if (
portnum == "TRACEROUTE_APP"
@@ -1359,36 +1373,43 @@ def store_packet_dict(packet: Mapping) -> None:
if emoji_text:
emoji = emoji_text
allowed_port_values = {"1", "TEXT_MESSAGE_APP", "REACTION_APP"}
routing_section = decoded.get("routing") if isinstance(decoded, Mapping) else None
routing_port_candidates = _portnum_candidates("ROUTING_APP")
if text is None and (
portnum == "ROUTING_APP"
or (portnum_int is not None and portnum_int in routing_port_candidates)
or isinstance(routing_section, Mapping)
):
routing_payload = _first(decoded, "payload", "data", default=None)
if routing_payload is not None:
if isinstance(routing_payload, bytes):
text = base64.b64encode(routing_payload).decode("ascii")
elif isinstance(routing_payload, str):
text = routing_payload
else:
try:
text = json.dumps(routing_payload, ensure_ascii=True)
except TypeError:
text = str(routing_payload)
if isinstance(text, str):
text = text.strip() or None
allowed_port_values = {"1", "TEXT_MESSAGE_APP", "REACTION_APP", "ROUTING_APP"}
allowed_port_ints = {1}
reaction_port_candidates: set[int] = set()
for module_name in (
"meshtastic.portnums_pb2",
"meshtastic.protobuf.portnums_pb2",
):
module = sys.modules.get(module_name)
if module is None:
with contextlib.suppress(ModuleNotFoundError):
module = importlib.import_module(module_name)
if module is None:
continue
portnum_enum = getattr(module, "PortNum", None)
value_lookup = getattr(portnum_enum, "Value", None) if portnum_enum else None
if callable(value_lookup):
with contextlib.suppress(Exception):
candidate = _coerce_int(value_lookup("REACTION_APP"))
if candidate is not None:
reaction_port_candidates.add(candidate)
constant_value = getattr(module, "REACTION_APP", None)
candidate = _coerce_int(constant_value)
if candidate is not None:
reaction_port_candidates.add(candidate)
reaction_port_candidates = _portnum_candidates("REACTION_APP")
for candidate in reaction_port_candidates:
allowed_port_ints.add(candidate)
allowed_port_values.add(str(candidate))
for candidate in routing_port_candidates:
allowed_port_ints.add(candidate)
allowed_port_values.add(str(candidate))
if isinstance(routing_section, Mapping) and portnum_int is not None:
allowed_port_ints.add(portnum_int)
allowed_port_values.add(str(portnum_int))
is_reaction_packet = portnum == "REACTION_APP" or (
reply_id is not None and emoji is not None
)

View File

@@ -1929,6 +1929,110 @@ def test_store_packet_dict_allows_primary_channel_broadcast(mesh_module, monkeyp
assert priority == mesh._MESSAGE_POST_PRIORITY
def test_store_packet_dict_accepts_routing_app_messages(mesh_module, monkeypatch):
"""Ensure routing app payloads are treated as message posts."""
mesh = mesh_module
captured = []
monkeypatch.setattr(
mesh,
"_queue_post_json",
lambda path, payload, *, priority: captured.append((path, payload, priority)),
)
packet = {
"id": 333,
"rxTime": 999,
"fromId": "!node",
"toId": "^all",
"channel": 0,
"decoded": {"payload": "GAA=", "portnum": "ROUTING_APP"},
}
mesh.store_packet_dict(packet)
assert captured, "Expected routing packet to be stored"
path, payload, priority = captured[0]
assert path == "/api/messages"
assert payload["portnum"] == "ROUTING_APP"
assert payload["text"] == "GAA="
assert payload["channel"] == 0
assert payload["encrypted"] is None
assert priority == mesh._MESSAGE_POST_PRIORITY
def test_store_packet_dict_serializes_routing_payloads(mesh_module, monkeypatch):
"""Ensure routing payloads are serialized when text is absent."""
mesh = mesh_module
captured = []
monkeypatch.setattr(
mesh,
"_queue_post_json",
lambda path, payload, *, priority: captured.append((path, payload, priority)),
)
packet = {
"id": 334,
"rxTime": 1000,
"fromId": "!node",
"toId": "^all",
"channel": 0,
"decoded": {
"payload": b"\x01\x02",
"portnum": "ROUTING_APP",
},
}
mesh.store_packet_dict(packet)
assert captured, "Expected routing packet to be stored"
_, payload, _ = captured[0]
assert payload["text"] == "AQI="
captured.clear()
packet["decoded"]["payload"] = {"kind": "ack"}
mesh.store_packet_dict(packet)
assert captured, "Expected routing packet to be stored"
_, payload, _ = captured[0]
assert payload["text"] == '{"kind": "ack"}'
captured.clear()
packet["decoded"]["portnum"] = 7
packet["decoded"]["payload"] = b"\x00"
packet["decoded"]["routing"] = {"errorReason": "NONE"}
mesh.store_packet_dict(packet)
assert captured, "Expected numeric routing packet to be stored"
_, payload, _ = captured[0]
assert payload["text"] == "AA=="
def test_portnum_candidates_reads_enum_values(mesh_module, monkeypatch):
"""Ensure portnum candidates include enum and constants when available."""
mesh = mesh_module
module_name = "meshtastic.portnums_pb2"
class DummyPortNum:
@staticmethod
def Value(name):
if name == "ROUTING_APP":
return 7
raise KeyError(name)
dummy_module = types.SimpleNamespace(PortNum=DummyPortNum, ROUTING_APP=8)
monkeypatch.setitem(sys.modules, module_name, dummy_module)
candidates = mesh.handlers._portnum_candidates("ROUTING_APP")
assert 7 in candidates
assert 8 in candidates
def test_store_packet_dict_appends_channel_name(mesh_module, monkeypatch, capsys):
mesh = mesh_module
mesh.channels._reset_channel_cache()