diff --git a/data/mesh_ingestor/protocols/meshcore.py b/data/mesh_ingestor/protocols/meshcore.py index 293322f..ccf9d22 100644 --- a/data/mesh_ingestor/protocols/meshcore.py +++ b/data/mesh_ingestor/protocols/meshcore.py @@ -70,8 +70,9 @@ from meshcore import ( TCPConnection, ) -from .. import config +from .. import config, ingestors as _ingestors, queue as _queue from ..connection import default_serial_targets, parse_ble_target, parse_tcp_target +from ..serialization import _iso, _node_num_from_id # --------------------------------------------------------------------------- # Exceptions @@ -338,7 +339,11 @@ def _contact_to_node_dict(contact: dict) -> dict: lat = contact.get("adv_lat") lon = contact.get("adv_lon") if lat is not None and lon is not None and (lat or lon): - node["position"] = {"latitude": lat, "longitude": lon} + pos: dict = {"latitude": lat, "longitude": lon} + last_advert = contact.get("last_advert") + if last_advert is not None: + pos["time"] = last_advert + node["position"] = pos return node @@ -386,10 +391,56 @@ def _self_info_to_node_dict(self_info: dict) -> dict: lat = self_info.get("adv_lat") lon = self_info.get("adv_lon") if lat is not None and lon is not None and (lat or lon): - node["position"] = {"latitude": lat, "longitude": lon} + node["position"] = {"latitude": lat, "longitude": lon, "time": int(time.time())} return node +def _store_meshcore_position( + node_id: str, + lat: float, + lon: float, + position_time: int | None, + ingestor: str | None, +) -> None: + """Enqueue a ``POST /api/positions`` for a MeshCore contact's advertised position. + + MeshCore does not issue dedicated position packets; position data is embedded + in contact advertisements. A stable pseudo-ID is derived from the node + identity and the position timestamp so repeated advertisements of the same + position are idempotently de-duplicated by the web app's ``ON CONFLICT`` + clause. + + Parameters: + node_id: Canonical ``!xxxxxxxx`` node identifier. + lat: Latitude in decimal degrees. + lon: Longitude in decimal degrees. + position_time: Unix timestamp from the contact's ``last_advert`` field, + or ``None`` to fall back to the current wall-clock time. + ingestor: Canonical node ID of the host ingestor, or ``None``. + """ + rx_time = int(time.time()) + pt = position_time or rx_time + # Stable 63-bit pseudo-ID unique to (node, position_time) so that the web + # app ON CONFLICT clause de-duplicates repeated advertisements of the same + # position without collisions between different nodes. + digest = hashlib.sha256(f"{node_id}:{pt}".encode()).digest() + pos_id = int.from_bytes(digest[:8], "big") & 0x7FFFFFFFFFFFFFFF + node_num = _node_num_from_id(node_id) + payload = { + "id": pos_id, + "rx_time": rx_time, + "rx_iso": _iso(rx_time), + "node_id": node_id, + "node_num": node_num, + "from_id": node_id, + "latitude": lat, + "longitude": lon, + "position_time": pt, + "ingestor": ingestor, + } + _queue._queue_post_json("/api/positions", payload) + + def _to_json_safe(value: object) -> object: """Recursively convert *value* to a JSON-serialisable form. @@ -673,7 +724,18 @@ def _process_self_info( if node_id: iface.host_node_id = node_id handlers.register_host_node_id(node_id) + # Queue the ingestor registration BEFORE any node upserts so the web + # backend assigns the correct protocol to all subsequent records. + # Radio metadata (LORA_FREQ, MODEM_PRESET) is captured just above and + # will be included in the heartbeat payload by queue_ingestor_heartbeat. + _ingestors.queue_ingestor_heartbeat(force=True, node_id=node_id) handlers.upsert_node(node_id, _self_info_to_node_dict(payload)) + lat = payload.get("adv_lat") + lon = payload.get("adv_lon") + if lat is not None and lon is not None and (lat or lon): + _store_meshcore_position( + node_id, lat, lon, int(time.time()), handlers.host_node_id() + ) config._debug_log( "MeshCore radio metadata captured", @@ -708,6 +770,16 @@ def _process_contacts( continue iface._update_contact(contact) handlers.upsert_node(node_id, _contact_to_node_dict(contact)) + lat = contact.get("adv_lat") + lon = contact.get("adv_lon") + if lat is not None and lon is not None and (lat or lon): + _store_meshcore_position( + node_id, + lat, + lon, + contact.get("last_advert"), + handlers.host_node_id(), + ) handlers._mark_packet_seen() @@ -727,6 +799,16 @@ def _process_contact_update( return iface._update_contact(contact) handlers.upsert_node(node_id, _contact_to_node_dict(contact)) + lat = contact.get("adv_lat") + lon = contact.get("adv_lon") + if lat is not None and lon is not None and (lat or lon): + _store_meshcore_position( + node_id, + lat, + lon, + contact.get("last_advert"), + handlers.host_node_id(), + ) handlers._mark_packet_seen() config._debug_log( "MeshCore contact updated", diff --git a/data/mesh_ingestor/queue.py b/data/mesh_ingestor/queue.py index b782e0e..3d1af4e 100644 --- a/data/mesh_ingestor/queue.py +++ b/data/mesh_ingestor/queue.py @@ -73,13 +73,14 @@ def _payload_key_value_pairs(payload: Mapping[str, object]) -> str: return " ".join(pairs) -_MESSAGE_POST_PRIORITY = 10 -_INGESTOR_POST_PRIORITY = 80 -_NEIGHBOR_POST_PRIORITY = 20 -_TRACE_POST_PRIORITY = 25 -_POSITION_POST_PRIORITY = 30 -_TELEMETRY_POST_PRIORITY = 40 -_NODE_POST_PRIORITY = 50 +_INGESTOR_POST_PRIORITY = 0 +_CHANNEL_POST_PRIORITY = 10 +_NODE_POST_PRIORITY = 20 +_MESSAGE_POST_PRIORITY = 30 +_NEIGHBOR_POST_PRIORITY = 40 +_TRACE_POST_PRIORITY = 50 +_POSITION_POST_PRIORITY = 60 +_TELEMETRY_POST_PRIORITY = 70 _DEFAULT_POST_PRIORITY = 90 @@ -262,9 +263,10 @@ def _clear_post_queue(state: QueueState = STATE) -> None: __all__ = [ "STATE", "QueueState", + "_CHANNEL_POST_PRIORITY", "_DEFAULT_POST_PRIORITY", - "_MESSAGE_POST_PRIORITY", "_INGESTOR_POST_PRIORITY", + "_MESSAGE_POST_PRIORITY", "_NEIGHBOR_POST_PRIORITY", "_NODE_POST_PRIORITY", "_POSITION_POST_PRIORITY", diff --git a/tests/test_mesh.py b/tests/test_mesh.py index 83bc087..964496b 100644 --- a/tests/test_mesh.py +++ b/tests/test_mesh.py @@ -2696,7 +2696,8 @@ def test_traceroute_packet_without_identifiers_is_ignored(mesh_module, monkeypat assert captured == [] -def test_post_queue_prioritises_messages(mesh_module, monkeypatch): +def test_post_queue_prioritises_nodes_over_messages(mesh_module, monkeypatch): + """Nodes (priority 20) must be processed before messages (priority 30).""" mesh = mesh_module mesh._clear_post_queue() calls = [] @@ -2713,7 +2714,7 @@ def test_post_queue_prioritises_messages(mesh_module, monkeypatch): mesh._drain_post_queue() - assert [path for path, _ in calls] == ["/api/messages", "/api/nodes"] + assert [path for path, _ in calls] == ["/api/nodes", "/api/messages"] def test_drain_post_queue_handles_enqueued_items_during_send(mesh_module): diff --git a/tests/test_provider_unit.py b/tests/test_provider_unit.py index 1c9b3b7..e61b9b2 100644 --- a/tests/test_provider_unit.py +++ b/tests/test_provider_unit.py @@ -55,6 +55,7 @@ from data.mesh_ingestor.protocols.meshcore import ( # noqa: E402 - path setup _pubkey_prefix_to_node_id, _record_meshcore_message, _self_info_to_node_dict, + _store_meshcore_position, _synthetic_node_dict, _to_json_safe, ) @@ -969,6 +970,31 @@ def test_contact_to_node_dict_sets_protocol_meshcore(): assert _contact_to_node_dict(contact)["protocol"] == "meshcore" +def test_contact_to_node_dict_position_includes_time_from_last_advert(): + """position['time'] must equal last_advert when it is present.""" + contact = { + "public_key": "aa" * 32, + "adv_name": "Node", + "adv_lat": 51.5, + "adv_lon": -0.1, + "last_advert": 1700001234, + } + node = _contact_to_node_dict(contact) + assert node["position"]["time"] == 1700001234 + + +def test_contact_to_node_dict_position_omits_time_without_last_advert(): + """position dict must not include 'time' when last_advert is absent.""" + contact = { + "public_key": "aa" * 32, + "adv_name": "Node", + "adv_lat": 51.5, + "adv_lon": -0.1, + } + node = _contact_to_node_dict(contact) + assert "time" not in node["position"] + + # --------------------------------------------------------------------------- # _self_info_to_node_dict # --------------------------------------------------------------------------- @@ -1016,6 +1042,109 @@ def test_self_info_to_node_dict_sets_protocol_meshcore(): assert _self_info_to_node_dict(self_info)["protocol"] == "meshcore" +def test_self_info_to_node_dict_position_includes_time(): + """position['time'] must be set to a recent integer when lat/lon are present.""" + import time as _time + + before = int(_time.time()) + self_info = { + "name": "N", + "public_key": "cc" * 32, + "adv_lat": 48.8, + "adv_lon": 2.35, + } + node = _self_info_to_node_dict(self_info) + after = int(_time.time()) + assert "time" in node["position"] + assert before <= node["position"]["time"] <= after + + +# --------------------------------------------------------------------------- +# _store_meshcore_position +# --------------------------------------------------------------------------- + + +def test_store_meshcore_position_queues_to_api_positions(monkeypatch): + """_store_meshcore_position must enqueue a POST to /api/positions.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append((route, payload)), + ) + + _store_meshcore_position("!aabbccdd", 51.5, -0.1, 1700001234, "!ingestor1") + + assert len(posted) == 1 + route, payload = posted[0] + assert route == "/api/positions" + assert payload["node_id"] == "!aabbccdd" + assert payload["latitude"] == pytest.approx(51.5) + assert payload["longitude"] == pytest.approx(-0.1) + assert payload["position_time"] == 1700001234 + assert payload["from_id"] == "!aabbccdd" + assert isinstance(payload["id"], int) + assert payload["id"] >= 0 + + +def test_store_meshcore_position_id_is_stable_for_same_node_and_time(monkeypatch): + """The pseudo-ID must be identical for repeated calls with the same arguments.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + ids: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: ids.append(payload["id"]), + ) + + _store_meshcore_position("!aabbccdd", 51.5, -0.1, 1700001234, None) + _store_meshcore_position("!aabbccdd", 51.5, -0.1, 1700001234, None) + + assert ids[0] == ids[1] + + +def test_store_meshcore_position_id_differs_for_different_times(monkeypatch): + """The pseudo-ID must differ when position_time changes.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + ids: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: ids.append(payload["id"]), + ) + + _store_meshcore_position("!aabbccdd", 51.5, -0.1, 1700001234, None) + _store_meshcore_position("!aabbccdd", 51.5, -0.1, 1700009999, None) + + assert ids[0] != ids[1] + + +def test_store_meshcore_position_falls_back_to_rx_time_when_no_position_time( + monkeypatch, +): + """When position_time is None, rx_time must be used as position_time.""" + import time as _time + import data.mesh_ingestor.protocols.meshcore as _mod + + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append(payload), + ) + + before = int(_time.time()) + _store_meshcore_position("!aabbccdd", 51.5, -0.1, None, None) + after = int(_time.time()) + + payload = posted[0] + assert before <= payload["position_time"] <= after + + # --------------------------------------------------------------------------- # _MeshcoreInterface contact management # --------------------------------------------------------------------------- @@ -1122,6 +1251,7 @@ def _make_stub_handlers_module(): mod = types.SimpleNamespace( upsert_node=lambda *_a, **_k: None, register_host_node_id=lambda *_a, **_k: None, + host_node_id=lambda: None, _mark_packet_seen=lambda: None, store_packet_dict=lambda *_a, **_k: None, ) @@ -1510,8 +1640,13 @@ def test_to_json_safe_unknown_type_stringified(): # --------------------------------------------------------------------------- -def test_process_self_info_sets_host_node_id(): +def test_process_self_info_sets_host_node_id(monkeypatch): """_process_self_info must set iface.host_node_id and call register_host_node_id.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() registered: list = [] stub.register_host_node_id = lambda nid: registered.append(nid) @@ -1538,8 +1673,13 @@ def test_process_self_info_skips_empty_key(): assert registered == [] -def test_process_self_info_caches_payload(): +def test_process_self_info_caches_payload(monkeypatch): """_process_self_info must store the payload on iface._self_info_payload.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() iface = _MeshcoreInterface(target=None) payload = {"public_key": "aabbccdd" + "00" * 28, "name": "Host"} @@ -1578,6 +1718,9 @@ def test_process_self_info_radio_metadata_set_before_upsert(monkeypatch): monkeypatch.setattr(_mod.config, "LORA_FREQ", None) monkeypatch.setattr(_mod.config, "MODEM_PRESET", None) monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) captured_lora_freq_at_upsert: list = [] captured_modem_preset_at_upsert: list = [] @@ -1616,6 +1759,9 @@ def test_process_self_info_captures_radio_freq(monkeypatch): monkeypatch.setattr(_mod.config, "LORA_FREQ", None) monkeypatch.setattr(_mod.config, "MODEM_PRESET", None) monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() payload = { @@ -1637,6 +1783,9 @@ def test_process_self_info_captures_modem_preset(monkeypatch): monkeypatch.setattr(_mod.config, "LORA_FREQ", None) monkeypatch.setattr(_mod.config, "MODEM_PRESET", None) monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() payload = { @@ -1658,6 +1807,9 @@ def test_process_self_info_no_overwrite_lora_freq(monkeypatch): monkeypatch.setattr(_mod.config, "LORA_FREQ", 915) monkeypatch.setattr(_mod.config, "MODEM_PRESET", None) monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() payload = { @@ -1679,6 +1831,9 @@ def test_process_self_info_no_overwrite_modem_preset(monkeypatch): monkeypatch.setattr(_mod.config, "LORA_FREQ", None) monkeypatch.setattr(_mod.config, "MODEM_PRESET", "LongFast") monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() payload = { @@ -1700,6 +1855,9 @@ def test_process_self_info_missing_radio_fields_leaves_config_none(monkeypatch): monkeypatch.setattr(_mod.config, "LORA_FREQ", None) monkeypatch.setattr(_mod.config, "MODEM_PRESET", None) monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) stub = _make_stub_handlers_module() _process_self_info( @@ -1712,6 +1870,39 @@ def test_process_self_info_missing_radio_fields_leaves_config_none(monkeypatch): assert _mod.config.MODEM_PRESET is None +def test_process_self_info_queues_ingestor_heartbeat_before_upsert(monkeypatch): + """_process_self_info must queue the ingestor heartbeat before upsert_node. + + The ingestor report carries priority 0 (highest) so the web backend assigns + the correct protocol to all subsequent node and message records. The + heartbeat must therefore be queued before the node upsert so that the + web backend knows the ingestor protocol before it processes the node. + """ + import data.mesh_ingestor.protocols.meshcore as _mod + + monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + + call_order: list[str] = [] + + def _spy_heartbeat(*, force, node_id, **_kw): + call_order.append("heartbeat") + return True + + stub = _make_stub_handlers_module() + stub.upsert_node = lambda *_a, **_k: call_order.append("upsert") + stub.register_host_node_id = lambda *_a, **_k: None + + monkeypatch.setattr(_mod._ingestors, "queue_ingestor_heartbeat", _spy_heartbeat) + + payload = {"public_key": "aabbccdd" + "00" * 28, "name": "Host"} + _process_self_info(payload, _MeshcoreInterface(target=None), stub) + + assert call_order[:2] == [ + "heartbeat", + "upsert", + ], "Ingestor heartbeat must be queued before node upsert" + + # --------------------------------------------------------------------------- # _derive_modem_preset # --------------------------------------------------------------------------- @@ -1977,6 +2168,98 @@ def test_process_contacts_marks_packet_seen(): assert seen == [True] +def test_process_contacts_queues_position_for_contacts_with_latlon(monkeypatch): + """_process_contacts must post to /api/positions for each contact with a position.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append((route, payload)), + ) + + stub = _make_stub_handlers_module() + iface = _MeshcoreInterface(target=None) + pub_key = "aabbccdd" + "00" * 28 + _process_contacts( + { + pub_key: { + "public_key": pub_key, + "adv_name": "Alice", + "adv_lat": 51.5, + "adv_lon": -0.1, + "last_advert": 1700001234, + } + }, + iface, + stub, + ) + + position_posts = [p for r, p in posted if r == "/api/positions"] + assert len(position_posts) == 1 + assert position_posts[0]["node_id"] == "!aabbccdd" + assert position_posts[0]["latitude"] == pytest.approx(51.5) + assert position_posts[0]["position_time"] == 1700001234 + + +def test_process_contacts_skips_position_for_contacts_without_latlon(monkeypatch): + """_process_contacts must not post to /api/positions when lat/lon are absent.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append(route), + ) + + stub = _make_stub_handlers_module() + iface = _MeshcoreInterface(target=None) + pub_key = "aabbccdd" + "00" * 28 + _process_contacts( + {pub_key: {"public_key": pub_key, "adv_name": "Alice"}}, + iface, + stub, + ) + + assert "/api/positions" not in posted + + +def test_process_contacts_only_posts_positions_for_located_contacts(monkeypatch): + """Bulk CONTACTS: only contacts with lat/lon must produce a /api/positions POST.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append((route, payload)), + ) + + stub = _make_stub_handlers_module() + iface = _MeshcoreInterface(target=None) + key_with_pos = "aabbccdd" + "00" * 28 + key_without_pos = "11223344" + "00" * 28 + _process_contacts( + { + key_with_pos: { + "public_key": key_with_pos, + "adv_name": "A", + "adv_lat": 10.0, + "adv_lon": 20.0, + }, + key_without_pos: {"public_key": key_without_pos, "adv_name": "B"}, + }, + iface, + stub, + ) + + position_posts = [p for r, p in posted if r == "/api/positions"] + assert len(position_posts) == 1 + assert position_posts[0]["node_id"] == "!aabbccdd" + + # --------------------------------------------------------------------------- # _process_contact_update # --------------------------------------------------------------------------- @@ -2017,6 +2300,60 @@ def test_process_contact_update_skips_empty_key(monkeypatch): assert upserted == [] +def test_process_contact_update_queues_position_when_latlon_present(monkeypatch): + """_process_contact_update must POST to /api/positions when the contact has lat/lon.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append((route, payload)), + ) + + stub = _make_stub_handlers_module() + iface = _MeshcoreInterface(target=None) + pub_key = "aabbccdd" + "00" * 28 + _process_contact_update( + { + "public_key": pub_key, + "adv_name": "Bob", + "adv_lat": 52.0, + "adv_lon": 4.0, + "last_advert": 1700005678, + }, + iface, + stub, + ) + + position_posts = [p for r, p in posted if r == "/api/positions"] + assert len(position_posts) == 1 + assert position_posts[0]["node_id"] == "!aabbccdd" + assert position_posts[0]["latitude"] == pytest.approx(52.0) + assert position_posts[0]["position_time"] == 1700005678 + + +def test_process_contact_update_skips_position_when_no_latlon(monkeypatch): + """_process_contact_update must not POST to /api/positions when lat/lon are absent.""" + import data.mesh_ingestor.protocols.meshcore as _mod + + monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + posted: list = [] + monkeypatch.setattr( + _mod._queue, + "_queue_post_json", + lambda route, payload, **_k: posted.append(route), + ) + + stub = _make_stub_handlers_module() + iface = _MeshcoreInterface(target=None) + pub_key = "aabbccdd" + "00" * 28 + _process_contact_update({"public_key": pub_key, "adv_name": "Bob"}, iface, stub) + + assert "/api/positions" not in posted + + # --------------------------------------------------------------------------- # on_self_info via _make_event_handlers # --------------------------------------------------------------------------- @@ -2034,6 +2371,9 @@ def test_on_self_info_registers_and_upserts(monkeypatch): stub.register_host_node_id = lambda nid: registered.append(nid) stub.upsert_node = lambda nid, nd: upserted.append(nid) monkeypatch.setattr(_mod.config, "_debug_log", lambda *_a, **_k: None) + monkeypatch.setattr( + _mod._ingestors, "queue_ingestor_heartbeat", lambda *_a, **_k: True + ) monkeypatch.setattr(_mesh_pkg, "handlers", stub) class _Evt: diff --git a/tests/test_queue_unit.py b/tests/test_queue_unit.py index 75b5253..71352ae 100644 --- a/tests/test_queue_unit.py +++ b/tests/test_queue_unit.py @@ -36,9 +36,15 @@ from data.mesh_ingestor.queue import ( _enqueue_post_json, _post_json, _queue_post_json, + _CHANNEL_POST_PRIORITY, _DEFAULT_POST_PRIORITY, + _INGESTOR_POST_PRIORITY, _MESSAGE_POST_PRIORITY, + _NEIGHBOR_POST_PRIORITY, _NODE_POST_PRIORITY, + _POSITION_POST_PRIORITY, + _TELEMETRY_POST_PRIORITY, + _TRACE_POST_PRIORITY, ) @@ -47,6 +53,29 @@ def _fresh_state() -> QueueState: return QueueState() +# --------------------------------------------------------------------------- +# Priority constant ordering +# --------------------------------------------------------------------------- + + +def test_priority_constants_ordering(): + """Verify the intended priority hierarchy: ingestor first, telemetry last. + + Lower numeric values are dequeued first (min-heap semantics). The ordering + must be: ingestor < channel < node < message < neighbor < trace < position + < telemetry < default. Any regression in this order means the web backend + may assign the wrong protocol to nodes and messages on startup. + """ + assert _INGESTOR_POST_PRIORITY < _CHANNEL_POST_PRIORITY + assert _CHANNEL_POST_PRIORITY < _NODE_POST_PRIORITY + assert _NODE_POST_PRIORITY < _MESSAGE_POST_PRIORITY + assert _MESSAGE_POST_PRIORITY < _NEIGHBOR_POST_PRIORITY + assert _NEIGHBOR_POST_PRIORITY < _TRACE_POST_PRIORITY + assert _TRACE_POST_PRIORITY < _POSITION_POST_PRIORITY + assert _POSITION_POST_PRIORITY < _TELEMETRY_POST_PRIORITY + assert _TELEMETRY_POST_PRIORITY < _DEFAULT_POST_PRIORITY + + # --------------------------------------------------------------------------- # _post_json # ---------------------------------------------------------------------------