data: handle store_forward and router_heartbeat portnum (#667)

* data: handle store_forward and router_heartbeat portnum

* ingestor: address review comments
This commit is contained in:
l5y
2026-03-31 23:42:26 +02:00
committed by GitHub
parent a62a068c08
commit 4fa0745d1b
4 changed files with 184 additions and 3 deletions
+59 -1
View File
@@ -247,6 +247,7 @@ def upsert_node(node_id, node) -> None:
"""
payload = _apply_radio_metadata_to_nodes(upsert_payload(node_id, node))
payload["ingestor"] = host_node_id()
_queue_post_json("/api/nodes", payload, priority=queue._NODE_POST_PRIORITY)
if config.DEBUG:
@@ -1055,6 +1056,43 @@ def store_telemetry_packet(packet: Mapping, decoded: Mapping) -> None:
)
def store_router_heartbeat_packet(packet: Mapping) -> None:
"""Persist a STORE_FORWARD_APP ``ROUTER_HEARTBEAT`` as a node presence update.
The heartbeat carries no message payload — the only actionable signal is
that the store-and-forward router is alive at the observed ``rx_time``.
All other fields are left untouched so the router's existing profile is
not overwritten.
Parameters:
packet: Raw packet metadata.
Returns:
``None``. A minimal node upsert is enqueued at low priority.
"""
node_id = _canonical_node_id(
_first(packet, "fromId", "from_id", "from", default=None)
)
if node_id is None:
return
rx_time = int(_first(packet, "rxTime", "rx_time", default=time.time()))
node_payload: dict = {"lastHeard": rx_time}
nodes_payload = _apply_radio_metadata_to_nodes({node_id: node_payload})
nodes_payload["ingestor"] = host_node_id()
_queue_post_json("/api/nodes", nodes_payload, priority=queue._DEFAULT_POST_PRIORITY)
if config.DEBUG:
config._debug_log(
"Queued router heartbeat node upsert",
context="handlers.store_router_heartbeat",
node_id=node_id,
rx_time=rx_time,
)
def store_nodeinfo_packet(packet: Mapping, decoded: Mapping) -> None:
"""Persist node information updates.
@@ -1203,9 +1241,11 @@ def store_nodeinfo_packet(packet: Mapping, decoded: Mapping) -> None:
except (TypeError, ValueError):
pass
nodes_payload = _apply_radio_metadata_to_nodes({node_id: node_payload})
nodes_payload["ingestor"] = host_node_id()
_queue_post_json(
"/api/nodes",
_apply_radio_metadata_to_nodes({node_id: node_payload}),
nodes_payload,
priority=queue._NODE_POST_PRIORITY,
)
@@ -1390,6 +1430,23 @@ def store_packet_dict(packet: Mapping) -> None:
store_neighborinfo_packet(packet, decoded)
return
store_forward_port_candidates = _portnum_candidates("STORE_FORWARD_APP")
store_forward_section = (
decoded.get("storeforward") if isinstance(decoded, Mapping) else None
)
if portnum == "STORE_FORWARD_APP" or (
portnum_int is not None and portnum_int in store_forward_port_candidates
):
if not isinstance(store_forward_section, Mapping):
_record_ignored_packet(packet, reason="unsupported-store-forward")
return
rr = str(store_forward_section.get("rr") or "").upper()
if rr == "ROUTER_HEARTBEAT":
store_router_heartbeat_packet(packet)
return
_record_ignored_packet(packet, reason="unsupported-store-forward-rr")
return
text = _first(decoded, "payload.text", "text", "data.text", default=None)
encrypted = _first(decoded, "payload.encrypted", "encrypted", default=None)
if encrypted is None:
@@ -1661,6 +1718,7 @@ __all__ = [
"store_nodeinfo_packet",
"store_packet_dict",
"store_position_packet",
"store_router_heartbeat_packet",
"store_telemetry_packet",
"upsert_node",
]
+2 -2
View File
@@ -91,12 +91,12 @@ def _derive_message_id(sender_ts: int, discriminator: str, text: str) -> int:
discriminator: Channel index (``"c<N>"`` for channel messages) or
pubkey prefix (for direct messages) to separate messages with
the same timestamp.
text: Message text; only the first 128 characters are hashed.
text: Message text.
Returns:
A non-negative 32-bit integer suitable for the ``id`` column.
"""
data = f"{sender_ts}:{discriminator}:{text[:128]}".encode("utf-8", errors="replace")
data = f"{sender_ts}:{discriminator}:{text}".encode("utf-8", errors="replace")
return int.from_bytes(hashlib.sha256(data).digest()[:4], "big")
+115
View File
@@ -3631,3 +3631,118 @@ def test_on_receive_skips_seen_packets(mesh_module):
mesh.on_receive(packet, interface=None)
assert packet["_potatomesh_seen"] is True
def test_upsert_node_includes_ingestor_key(mesh_module, monkeypatch):
"""upsert_node must attach the host node ID so /api/nodes can resolve protocol."""
mesh = mesh_module
captured = []
monkeypatch.setattr(
mesh,
"_queue_post_json",
lambda path, payload, *, priority: captured.append((path, payload, priority)),
)
mesh.register_host_node_id("!aabbccdd")
mesh.upsert_node("!deadbeef", {"user": {"shortName": "X"}})
assert captured
_, payload, _ = captured[0]
assert payload.get("ingestor") == "!aabbccdd"
def test_store_packet_dict_nodeinfo_includes_ingestor_key(mesh_module, monkeypatch):
"""store_nodeinfo_packet must include the ingestor key in the /api/nodes payload."""
mesh = mesh_module
captured = []
monkeypatch.setattr(
mesh,
"_queue_post_json",
lambda path, payload, *, priority: captured.append((path, payload, priority)),
)
mesh.register_host_node_id("!11223344")
packet = {
"id": 1,
"rxTime": 1_700_000_000,
"fromId": "!aabbccdd",
"decoded": {
"portnum": "NODEINFO_APP",
"user": {"id": "!aabbccdd", "shortName": "N"},
},
}
mesh.store_packet_dict(packet)
node_calls = [(p, pl) for p, pl, _ in captured if p == "/api/nodes"]
assert node_calls, "Expected a /api/nodes POST"
_, payload = node_calls[0]
assert payload.get("ingestor") == "!11223344"
def test_store_packet_dict_router_heartbeat(mesh_module, monkeypatch):
"""STORE_FORWARD_APP ROUTER_HEARTBEAT upserts the node at low priority."""
mesh = mesh_module
captured = []
monkeypatch.setattr(
mesh,
"_queue_post_json",
lambda path, payload, *, priority: captured.append((path, payload, priority)),
)
mesh.register_host_node_id("!f00dbabe")
packet = {
"id": 2377284085,
"rxTime": 1_774_868_197,
"fromId": "!435a7fbc",
"toId": "^all",
"hopLimit": "2",
"rxSnr": "-12.25",
"rxRssi": "-110",
"decoded": {
"portnum": "STORE_FORWARD_APP",
"storeforward": {
"heartbeat": {"period": "900"},
"rr": "ROUTER_HEARTBEAT",
},
},
}
mesh.store_packet_dict(packet)
assert captured, "Expected a POST for router heartbeat"
path, payload, priority = captured[0]
assert path == "/api/nodes"
assert priority == mesh._DEFAULT_POST_PRIORITY
assert "!435a7fbc" in payload
node_entry = payload["!435a7fbc"]
assert node_entry["lastHeard"] == 1_774_868_197
assert payload.get("ingestor") == "!f00dbabe"
assert set(node_entry.keys()) == {
"lastHeard"
}, "Heartbeat must only set lastHeard, nothing else"
def test_store_packet_dict_store_forward_non_heartbeat_ignored(
mesh_module, monkeypatch
):
"""STORE_FORWARD_APP packets that are not ROUTER_HEARTBEAT are dropped."""
mesh = mesh_module
captured = []
monkeypatch.setattr(
mesh,
"_queue_post_json",
lambda *a, **kw: captured.append(a),
)
packet = {
"id": 1,
"rxTime": 1_700_000_000,
"fromId": "!aabbccdd",
"decoded": {
"portnum": "STORE_FORWARD_APP",
"storeforward": {"rr": "ROUTER_CLIENT_RESPONSE"},
},
}
mesh.store_packet_dict(packet)
assert not captured, "Non-heartbeat STORE_FORWARD_APP must not be queued"
+8
View File
@@ -641,6 +641,14 @@ def test_derive_message_id_is_32bit():
assert 0 <= result <= 0xFFFFFFFF
def test_derive_message_id_distinguishes_long_messages_differing_after_128_chars():
"""Messages that share the first 128 characters must still get different IDs."""
prefix = "A" * 128
id_a = _derive_message_id(1_000_000, "c0", prefix + "AAAAAA")
id_b = _derive_message_id(1_000_000, "c0", prefix + "BBBBBB")
assert id_a != id_b
# ---------------------------------------------------------------------------
# _make_event_handlers — async callbacks
# ---------------------------------------------------------------------------