From c22274c4e5150e78f347f0468fd4f54e5349acb2 Mon Sep 17 00:00:00 2001 From: yellowcooln Date: Sun, 1 Mar 2026 15:06:25 -0500 Subject: [PATCH] Add LetsMesh structured event parity mappings --- PLAN.md | 10 + README.md | 10 +- SCHEMAS.md | 12 +- TASKS.md | 3 + src/meshcore_hub/collector/subscriber.py | 477 ++++++++++++++++++++--- tests/test_collector/test_subscriber.py | 181 +++++++-- 6 files changed, 594 insertions(+), 99 deletions(-) diff --git a/PLAN.md b/PLAN.md index 7c823a8..bb8d5ac 100644 --- a/PLAN.md +++ b/PLAN.md @@ -489,6 +489,16 @@ ${DATA_HOME}/ |----------|---------|-------------| | DATABASE_URL | sqlite:///{DATA_HOME}/collector/meshcore.db | SQLAlchemy URL | | TAGS_FILE | {DATA_HOME}/collector/tags.json | Path to tags JSON file | +| COLLECTOR_INGEST_MODE | native | Ingest mode (`native` or `letsmesh_upload`) | +| COLLECTOR_LETSMESH_DECODER_ENABLED | true | Enable external packet decoding in LetsMesh mode | + +LetsMesh compatibility parity note: +- `status` feed packets are stored as informational `letsmesh_status` events and do not create advertisement rows. +- Advertisement rows in LetsMesh mode are created from decoded payload type `4` only. +- Decoded payload type `11` is normalized to native `contact` updates. +- Decoded payload type `9` is normalized to native `trace_data`. +- Decoded payload type `8` is normalized to informational `path_updated`. +- Decoded payload type `1` can map to native response-style events when decrypted structured content is available. ### API | Variable | Default | Description | diff --git a/README.md b/README.md index 04fb1f9..c604331 100644 --- a/README.md +++ b/README.md @@ -313,15 +313,19 @@ When `COLLECTOR_INGEST_MODE=letsmesh_upload`, the collector subscribes to: Normalization behavior: -- `status` packets are mapped to `advertisement` events only when node identity metadata is present (`name`, node type, explicit flags, or location); heartbeat/counter-only status frames are stored as `letsmesh_status` logs. -- Decoder payload types `4` and `11` are also mapped to `advertisement` events when node identity metadata is present. +- `status` packets are stored as informational `letsmesh_status` events and are not mapped to `advertisement` rows. +- Decoder payload type `4` is mapped to `advertisement` when node identity metadata is present. +- Decoder payload type `11` (control discover response) is mapped to `contact`. +- Decoder payload type `9` is mapped to `trace_data`. +- Decoder payload type `8` is mapped to informational `path_updated` events. +- Decoder payload type `1` can map to native response events (`telemetry_response`, `battery`, `path_updated`, `status_response`) when decrypted structured content is available. - `packet_type=5` packets are mapped to `channel_msg_recv`. - `packet_type=1`, `2`, and `7` packets are mapped to `contact_msg_recv` when decryptable text is available. - For channel packets, if a channel key is available, a channel label is attached (for example `Public` or `#test`) for UI display. - In the messages feed and dashboard channel sections, known channel indexes are preferred for labels (`17 -> Public`, `217 -> #test`) to avoid stale channel-name mismatches. - Additional channel names are loaded from `COLLECTOR_LETSMESH_DECODER_KEYS` when entries are provided as `label=hex` (for example `bot=`). - Decoder-advertisement packets with location metadata update node GPS (`lat/lon`) for map display. -- Status `stats.debug_flags` values are not used as advertisement capability flags. +- This keeps advertisement listings closer to native mode behavior (node advert traffic only, not observer status telemetry). - Packets without decryptable message text are kept as informational `letsmesh_packet` events and are not shown in the messages feed; when decode succeeds the decoded JSON is attached to those packet log events. - When decoder output includes a human sender (`payload.decoded.decrypted.sender`), message text is normalized to `Name: Message` before storage; receiver/observer names are never used as sender fallback. - The collector keeps built-in keys for `Public` and `#test`, and merges any additional keys from `COLLECTOR_LETSMESH_DECODER_KEYS`. diff --git a/SCHEMAS.md b/SCHEMAS.md index 7448593..7a16663 100644 --- a/SCHEMAS.md +++ b/SCHEMAS.md @@ -184,10 +184,16 @@ Group/broadcast messages on specific channels. - When decoder output includes a human sender (`payload.decoded.decrypted.sender`), message text is normalized to `Name: Message`; sender identity remains unknown when only hash/prefix metadata is available. **Compatibility ingest note (advertisements)**: -- In LetsMesh upload compatibility mode, `status` feed payloads are normalized to `ADVERTISEMENT` only when identity metadata is present (`name`, node type, explicit `flags`, or location). Status heartbeat/counter frames are persisted as informational `letsmesh_status` events. -- In LetsMesh upload compatibility mode, decoded payload types `4` and `11` are normalized to `ADVERTISEMENT` when node identity metadata is present. +- In LetsMesh upload compatibility mode, `status` feed payloads are persisted as informational `letsmesh_status` events and are not normalized to `ADVERTISEMENT`. +- In LetsMesh upload compatibility mode, decoded payload type `4` is normalized to `ADVERTISEMENT` when node identity metadata is present. - Payload type `4` location metadata (`appData.location.latitude/longitude`) is mapped to node `lat/lon` for map rendering. -- `stats.debug_flags` from LetsMesh status feeds are not persisted as advertisement capability flags. +- This keeps advertisement persistence aligned with native mode expectations (advertisement traffic only). + +**Compatibility ingest note (non-message structured events)**: +- Decoded payload type `9` is normalized to `TRACE_DATA` (`traceTag`, flags, auth, path hashes, and SNR values). +- Decoded payload type `11` (`Control/NodeDiscoverResp`) is normalized to `contact` events for node upsert parity. +- Decoded payload type `8` is normalized to informational `PATH_UPDATED` events (`hop_count` + path hashes). +- Decoded payload type `1` can be normalized to `TELEMETRY_RESPONSE`, `BATTERY`, `PATH_UPDATED`, or `STATUS_RESPONSE` when decrypted response content is structured and parseable. --- diff --git a/TASKS.md b/TASKS.md index 96f719f..2745a0e 100644 --- a/TASKS.md +++ b/TASKS.md @@ -753,6 +753,9 @@ This document tracks implementation progress for the MeshCore Hub project. Each ### Decisions Made *(Record architectural decisions and answers to clarifying questions here)* +- [x] LetsMesh/native advertisement parity: in `letsmesh_upload` mode, observer `status` feed stays informational (`letsmesh_status`) and does not populate `advertisements`. +- [x] LetsMesh advertisement persistence source: decoded packet payload type `4` maps to `advertisement`; payload type `11` maps to `contact` parity updates. +- [x] LetsMesh native-event parity extensions: payload type `9` maps to `trace_data`, payload type `8` maps to informational `path_updated`, and payload type `1` can map to response-style native events when decryptable structured content exists. - [ ] Q1 (MQTT Broker): - [ ] Q2 (Database): - [ ] Q3 (Web Dashboard Separation): diff --git a/src/meshcore_hub/collector/subscriber.py b/src/meshcore_hub/collector/subscriber.py index 63b3410..bee7c6b 100644 --- a/src/meshcore_hub/collector/subscriber.py +++ b/src/meshcore_hub/collector/subscriber.py @@ -10,7 +10,9 @@ The subscriber: """ import asyncio +import json import logging +import re import signal import threading import time @@ -185,12 +187,9 @@ class Subscriber: observer_public_key, feed_type = parsed if feed_type == "status": - normalized_status = self._build_letsmesh_status_advertisement_payload( - payload, - observer_public_key=observer_public_key, - ) - if normalized_status: - return observer_public_key, "advertisement", normalized_status + # Keep status feed telemetry as informational event logs only. + # This preserves parity with native mode where advertisements are + # sourced from advertisement event traffic, not observer status frames. return observer_public_key, "letsmesh_status", dict(payload) if feed_type == "packets": @@ -204,6 +203,14 @@ class Subscriber: event_type, message_payload = normalized_message return observer_public_key, event_type, message_payload + normalized_structured_event = self._build_letsmesh_structured_event_payload( + payload, + decoded_packet=decoded_packet, + ) + if normalized_structured_event: + event_type, structured_payload = normalized_structured_event + return observer_public_key, event_type, structured_payload + normalized_advertisement = self._build_letsmesh_advertisement_payload( payload, decoded_packet=decoded_packet, @@ -340,6 +347,332 @@ class Subscriber: return event_type, normalized_payload + def _build_letsmesh_structured_event_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> tuple[str, dict[str, Any]] | None: + """Map LetsMesh packet payloads to native collector event types.""" + packet_type = self._resolve_letsmesh_packet_type(payload, decoded_packet) + if packet_type is None: + return None + + if packet_type == 9: + trace_payload = self._build_letsmesh_trace_payload(payload, decoded_packet) + if trace_payload: + return "trace_data", trace_payload + return None + + if packet_type == 11: + contact_payload = self._build_letsmesh_contact_payload( + payload, + decoded_packet, + ) + if contact_payload: + return "contact", contact_payload + status_payload = self._build_letsmesh_status_payload( + payload, decoded_packet + ) + if status_payload: + return "status_response", status_payload + return None + + if packet_type == 8: + path_payload = self._build_letsmesh_path_updated_payload( + payload, + decoded_packet, + ) + if path_payload: + return "path_updated", path_payload + return None + + if packet_type == 1: + return self._build_letsmesh_response_payload(payload, decoded_packet) + + return None + + def _build_letsmesh_trace_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + """Build native `trace_data` payload from LetsMesh trace packets.""" + decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded_payload: + return None + + trace_tag = ( + decoded_payload.get("traceTag") + or payload.get("traceTag") + or payload.get("trace_tag") + ) + initiator_tag = self._parse_hex_or_int(trace_tag) + if initiator_tag is None: + return None + + path_hashes = self._normalize_hash_list(decoded_payload.get("pathHashes")) + snr_values = self._normalize_float_list(decoded_payload.get("snrValues")) + path_len = self._parse_path_length(payload.get("path")) + if path_len is None: + path_len = self._parse_int(decoded_payload.get("pathLength")) + if path_len is None and path_hashes: + path_len = len(path_hashes) + + hop_count: int | None = None + if path_hashes: + hop_count = len(path_hashes) + elif snr_values: + hop_count = len(snr_values) + elif path_len is not None: + hop_count = path_len + + normalized_payload: dict[str, Any] = { + "initiator_tag": initiator_tag, + } + flags = self._parse_int(decoded_payload.get("flags")) + auth = self._parse_int(decoded_payload.get("authCode")) + if auth is None: + auth = self._parse_int(decoded_payload.get("auth")) + if path_len is not None: + normalized_payload["path_len"] = path_len + if flags is not None: + normalized_payload["flags"] = flags + if auth is not None: + normalized_payload["auth"] = auth + if path_hashes: + normalized_payload["path_hashes"] = path_hashes + if snr_values: + normalized_payload["snr_values"] = snr_values + if hop_count is not None: + normalized_payload["hop_count"] = hop_count + + return normalized_payload + + def _build_letsmesh_contact_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + """Build native `contact` payload from LetsMesh control discovery responses.""" + decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded_payload: + return None + + sub_type = self._parse_int(decoded_payload.get("subType")) + # 0x90 (144): Node discover response with identity metadata. + if sub_type is not None and sub_type != 144: + return None + + public_key = self._normalize_full_public_key( + decoded_payload.get("publicKey") + or payload.get("public_key") + or payload.get("origin_id") + ) + if not public_key: + return None + + normalized_payload: dict[str, Any] = { + "public_key": public_key, + } + + node_type_raw = self._parse_int(decoded_payload.get("nodeType")) + node_type = self._normalize_letsmesh_node_type( + decoded_payload.get("nodeType") or decoded_payload.get("nodeTypeName") + ) + if node_type_raw in {0, 1, 2, 3}: + normalized_payload["type"] = node_type_raw + elif node_type: + normalized_payload["node_type"] = node_type + + flags = self._parse_int(decoded_payload.get("rawFlags")) + if flags is not None: + normalized_payload["flags"] = flags + + display_name = payload.get("origin") or payload.get("name") + if isinstance(display_name, str) and display_name.strip(): + normalized_payload["adv_name"] = display_name.strip() + + return normalized_payload + + def _build_letsmesh_status_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + """Build informational `status_response` payload from control packets.""" + decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded_payload: + return None + + status_payload: dict[str, Any] = {} + node_public_key = self._normalize_full_public_key( + decoded_payload.get("publicKey") + or payload.get("public_key") + or payload.get("origin_id") + ) + if node_public_key: + status_payload["node_public_key"] = node_public_key + + sub_type = self._parse_int(decoded_payload.get("subType")) + if sub_type is not None: + status_payload["status"] = f"control_subtype_{sub_type}" + status_payload["control_subtype"] = sub_type + + tag = self._parse_int(decoded_payload.get("tag")) + if tag is not None: + status_payload["tag"] = tag + + snr = self._parse_float(decoded_payload.get("snr")) + if snr is not None: + status_payload["snr"] = snr + + if "status" not in status_payload and "node_public_key" not in status_payload: + return None + return status_payload + + def _build_letsmesh_path_updated_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: + """Build informational `path_updated` payload from LetsMesh path packets.""" + decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded_payload: + return None + + path_hashes = self._normalize_hash_list(decoded_payload.get("pathHashes")) + hop_count = None + if path_hashes: + hop_count = len(path_hashes) + else: + hop_count = self._parse_int(decoded_payload.get("pathLength")) + + if hop_count is None: + return None + + normalized_payload: dict[str, Any] = { + "hop_count": hop_count, + } + if path_hashes: + normalized_payload["path_hashes"] = path_hashes + + extra_type = self._parse_int(decoded_payload.get("extraType")) + if extra_type is not None: + normalized_payload["extra_type"] = extra_type + + extra_data = decoded_payload.get("extraData") + if isinstance(extra_data, str) and extra_data.strip(): + clean_extra_data = extra_data.strip().upper() + normalized_payload["extra_data"] = clean_extra_data + extracted_public_key = self._extract_public_key_from_hex(clean_extra_data) + if extracted_public_key: + normalized_payload["node_public_key"] = extracted_public_key + + node_public_key = self._normalize_full_public_key( + payload.get("public_key") or payload.get("origin_id") + ) + if node_public_key and "node_public_key" not in normalized_payload: + normalized_payload["node_public_key"] = node_public_key + + return normalized_payload + + def _build_letsmesh_response_payload( + self, + payload: dict[str, Any], + decoded_packet: dict[str, Any] | None = None, + ) -> tuple[str, dict[str, Any]] | None: + """Build native events from decrypted response payloads when possible.""" + decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) + if not decoded_payload: + return None + + decrypted = decoded_payload.get("decrypted") + if not isinstance(decrypted, dict): + return None + + content_data = self._extract_response_content_data(decrypted.get("content")) + if not content_data: + return None + + node_public_key = self._normalize_full_public_key( + content_data.get("node_public_key") + or content_data.get("public_key") + or content_data.get("nodePublicKey") + or payload.get("public_key") + or payload.get("origin_id") + ) + + battery_voltage = self._parse_float( + content_data.get("battery_voltage") or content_data.get("batteryVoltage") + ) + battery_percentage = self._parse_int( + content_data.get("battery_percentage") + or content_data.get("batteryPercentage") + ) + if battery_voltage is not None and battery_percentage is not None: + return "battery", { + "battery_voltage": battery_voltage, + "battery_percentage": battery_percentage, + } + + telemetry_data = content_data.get("parsed_data") + if not isinstance(telemetry_data, dict): + telemetry_data = content_data.get("parsedData") + if not isinstance(telemetry_data, dict): + telemetry_data = { + key: value + for key, value in content_data.items() + if key + not in { + "node_public_key", + "public_key", + "nodePublicKey", + "lpp_data", + "lppData", + } + } + if not telemetry_data: + telemetry_data = None + + if node_public_key and telemetry_data: + telemetry_payload: dict[str, Any] = { + "node_public_key": node_public_key, + "parsed_data": telemetry_data, + } + lpp_data = content_data.get("lpp_data") or content_data.get("lppData") + if isinstance(lpp_data, str) and lpp_data.strip(): + telemetry_payload["lpp_data"] = lpp_data.strip() + return "telemetry_response", telemetry_payload + + if node_public_key: + hop_count = self._parse_int(content_data.get("hop_count")) + if hop_count is None: + hop_count = self._parse_int(content_data.get("hopCount")) + if hop_count is not None: + return "path_updated", { + "node_public_key": node_public_key, + "hop_count": hop_count, + } + + status = content_data.get("status") + if isinstance(status, str) and status.strip(): + status_payload: dict[str, Any] = { + "status": status.strip(), + } + if node_public_key: + status_payload["node_public_key"] = node_public_key + uptime = self._parse_int(content_data.get("uptime")) + message_count = self._parse_int(content_data.get("message_count")) + if message_count is None: + message_count = self._parse_int(content_data.get("messageCount")) + if uptime is not None: + status_payload["uptime"] = uptime + if message_count is not None: + status_payload["message_count"] = message_count + return "status_response", status_payload + + return None + def _build_letsmesh_advertisement_payload( self, payload: dict[str, Any], @@ -355,7 +688,7 @@ class Subscriber: decoded_packet ) # Primary packet forms that carry node identity/role/location metadata. - if decoded_payload_type not in {4, 11}: + if decoded_payload_type != 4: return None decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet) @@ -425,59 +758,6 @@ class Subscriber: return normalized_payload - def _build_letsmesh_status_advertisement_payload( - self, - payload: dict[str, Any], - observer_public_key: str, - ) -> dict[str, Any] | None: - """Normalize LetsMesh status feed payloads into advertisement events.""" - status_public_key = self._normalize_full_public_key( - payload.get("origin_id") or payload.get("public_key") or observer_public_key - ) - if not status_public_key: - return None - - normalized_payload: dict[str, Any] = {"public_key": status_public_key} - - status_name = payload.get("origin") or payload.get("name") - if isinstance(status_name, str) and status_name.strip(): - normalized_payload["name"] = status_name.strip() - - normalized_adv_type = self._normalize_letsmesh_adv_type(payload) - if normalized_adv_type: - normalized_payload["adv_type"] = normalized_adv_type - - # Only trust explicit status payload flags. stats.debug_flags are observer/debug - # counters and cause false capability flags + inflated dedup churn. - explicit_flags = self._parse_int(payload.get("flags")) - if explicit_flags is not None: - normalized_payload["flags"] = explicit_flags - - lat = self._parse_float(payload.get("lat")) - lon = self._parse_float(payload.get("lon")) - if lat is None: - lat = self._parse_float(payload.get("adv_lat")) - if lon is None: - lon = self._parse_float(payload.get("adv_lon")) - location = payload.get("location") - if isinstance(location, dict): - if lat is None: - lat = self._parse_float(location.get("latitude")) - if lon is None: - lon = self._parse_float(location.get("longitude")) - if lat is not None: - normalized_payload["lat"] = lat - if lon is not None: - normalized_payload["lon"] = lon - - # Ignore status heartbeat/counter frames that have no node identity metadata. - if not any( - key in normalized_payload - for key in ("name", "adv_type", "flags", "lat", "lon") - ): - return None - return normalized_payload - @classmethod def _extract_letsmesh_text( cls, @@ -574,6 +854,83 @@ class Subscriber: decoded = payload.get("decoded") return decoded if isinstance(decoded, dict) else None + @staticmethod + def _extract_response_content_data(value: Any) -> dict[str, Any] | None: + """Parse response `content` payload into a dictionary when possible.""" + if isinstance(value, dict): + return value + if not isinstance(value, str): + return None + + text = value.strip() + if not text: + return None + + if text.startswith("{") and text.endswith("}"): + try: + parsed = json.loads(text) + except json.JSONDecodeError: + return None + return parsed if isinstance(parsed, dict) else None + + return None + + @staticmethod + def _normalize_hash_list(value: Any) -> list[str] | None: + """Normalize a list of one-byte hash strings.""" + if not isinstance(value, list): + return None + + normalized: list[str] = [] + for item in value: + if not isinstance(item, str): + continue + token = item.strip().upper() + if len(token) != 2: + continue + if any(ch not in "0123456789ABCDEF" for ch in token): + continue + normalized.append(token) + return normalized or None + + @staticmethod + def _normalize_float_list(value: Any) -> list[float] | None: + """Normalize a list of numeric values as floats.""" + if not isinstance(value, list): + return None + + normalized: list[float] = [] + for item in value: + if isinstance(item, (int, float)): + normalized.append(float(item)) + return normalized or None + + @staticmethod + def _extract_public_key_from_hex(value: str) -> str | None: + """Extract the first 64-char hex segment from a payload string.""" + match = re.search(r"([0-9A-Fa-f]{64})", value) + if not match: + return None + return match.group(1).upper() + + @classmethod + def _parse_hex_or_int(cls, value: Any) -> int | None: + """Parse integers represented as decimal or hexadecimal strings.""" + parsed = cls._parse_int(value) + if parsed is not None: + return parsed + if not isinstance(value, str): + return None + token = value.strip().removeprefix("0x").removeprefix("0X") + if not token: + return None + if any(ch not in "0123456789ABCDEFabcdef" for ch in token): + return None + try: + return int(token, 16) + except ValueError: + return None + @classmethod def _extract_letsmesh_decoder_payload_type( cls, diff --git a/tests/test_collector/test_subscriber.py b/tests/test_collector/test_subscriber.py index adc29ae..1ca3037 100644 --- a/tests/test_collector/test_subscriber.py +++ b/tests/test_collector/test_subscriber.py @@ -89,17 +89,19 @@ class TestSubscriber: mock_mqtt_client.subscribe.assert_has_calls(expected_calls, any_order=False) assert mock_mqtt_client.subscribe.call_count == 3 - def test_letsmesh_status_maps_to_advertisement( + def test_letsmesh_status_maps_to_letsmesh_status( self, mock_mqtt_client, db_manager ) -> None: - """LetsMesh status payloads are normalized to advertisement events.""" + """LetsMesh status payloads are stored as informational status events.""" subscriber = Subscriber( mock_mqtt_client, db_manager, ingest_mode="letsmesh_upload", ) - handler = MagicMock() - subscriber.register_handler("advertisement", handler) + advert_handler = MagicMock() + status_handler = MagicMock() + subscriber.register_handler("advertisement", advert_handler) + subscriber.register_handler("letsmesh_status", status_handler) subscriber.start() subscriber._handle_mqtt_message( @@ -114,26 +116,28 @@ class TestSubscriber: }, ) - handler.assert_called_once() - public_key, event_type, payload, _db = handler.call_args.args + advert_handler.assert_not_called() + status_handler.assert_called_once() + public_key, event_type, payload, _db = status_handler.call_args.args assert public_key == "a" * 64 - assert event_type == "advertisement" - assert payload["public_key"] == ("b" * 64).upper() - assert payload["name"] == "Observer Node" - assert payload["adv_type"] == "repeater" - assert payload["flags"] == 7 + assert event_type == "letsmesh_status" + assert payload["origin_id"] == "b" * 64 + assert payload["origin"] == "Observer Node" + assert payload["mode"] == "repeater" - def test_letsmesh_status_does_not_use_debug_flags_as_advert_flags( + def test_letsmesh_status_with_debug_flags_does_not_emit_advertisement( self, mock_mqtt_client, db_manager ) -> None: - """debug_flags should not be stored as node capability flags.""" + """Status debug metadata should remain informational only.""" subscriber = Subscriber( mock_mqtt_client, db_manager, ingest_mode="letsmesh_upload", ) - handler = MagicMock() - subscriber.register_handler("advertisement", handler) + advert_handler = MagicMock() + status_handler = MagicMock() + subscriber.register_handler("advertisement", advert_handler) + subscriber.register_handler("letsmesh_status", status_handler) subscriber.start() subscriber._handle_mqtt_message( @@ -147,14 +151,15 @@ class TestSubscriber: }, ) - handler.assert_called_once() - _public_key, _event_type, payload, _db = handler.call_args.args - assert "flags" not in payload + advert_handler.assert_not_called() + status_handler.assert_called_once() + _public_key, _event_type, payload, _db = status_handler.call_args.args + assert payload["stats"]["debug_flags"] == 7 def test_letsmesh_status_without_identity_maps_to_letsmesh_status( self, mock_mqtt_client, db_manager ) -> None: - """Status heartbeat payloads without identity metadata should not inflate adverts.""" + """Status heartbeat payloads without identity metadata stay informational.""" subscriber = Subscriber( mock_mqtt_client, db_manager, @@ -497,10 +502,10 @@ class TestSubscriber: assert payload["lat"] == 42.470001 assert payload["lon"] == -71.330001 - def test_letsmesh_packet_type_11_maps_to_advertisement( + def test_letsmesh_packet_type_11_maps_to_contact( self, mock_mqtt_client, db_manager ) -> None: - """Decoder packet type 11 is mapped to advertisement metadata updates.""" + """Decoder packet type 11 is mapped to native contact events.""" mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( "a" * 64, "packets", @@ -510,8 +515,10 @@ class TestSubscriber: db_manager, ingest_mode="letsmesh_upload", ) - handler = MagicMock() - subscriber.register_handler("advertisement", handler) + contact_handler = MagicMock() + advert_handler = MagicMock() + subscriber.register_handler("contact", contact_handler) + subscriber.register_handler("advertisement", advert_handler) subscriber.start() with patch.object( @@ -540,17 +547,126 @@ class TestSubscriber: }, ) - handler.assert_called_once() - _public_key, event_type, payload, _db = handler.call_args.args - assert event_type == "advertisement" + advert_handler.assert_not_called() + contact_handler.assert_called_once() + _public_key, event_type, payload, _db = contact_handler.call_args.args + assert event_type == "contact" assert payload["public_key"] == "C" * 64 - assert payload["adv_type"] == "repeater" + assert payload["type"] == 2 assert payload["flags"] == 146 + def test_letsmesh_packet_type_9_maps_to_trace_data( + self, mock_mqtt_client, db_manager + ) -> None: + """Decoder packet type 9 is mapped to native trace_data events.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + trace_handler = MagicMock() + subscriber.register_handler("trace_data", trace_handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 9, + "pathLength": 4, + "payload": { + "decoded": { + "type": 9, + "traceTag": "DF9D7A20", + "authCode": 0, + "flags": 0, + "pathHashes": ["71", "0B", "24", "0B"], + "snrValues": [12.5, 11.5, 10, 6.25], + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "9", + "hash": "99887766", + "raw": "ABCDEF", + }, + ) + + trace_handler.assert_called_once() + _public_key, event_type, payload, _db = trace_handler.call_args.args + assert event_type == "trace_data" + assert payload["initiator_tag"] == int("DF9D7A20", 16) + assert payload["path_hashes"] == ["71", "0B", "24", "0B"] + assert payload["hop_count"] == 4 + assert payload["snr_values"] == [12.5, 11.5, 10.0, 6.25] + + def test_letsmesh_packet_type_8_maps_to_path_updated( + self, mock_mqtt_client, db_manager + ) -> None: + """Decoder packet type 8 is mapped to native path_updated events.""" + mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( + "a" * 64, + "packets", + ) + subscriber = Subscriber( + mock_mqtt_client, + db_manager, + ingest_mode="letsmesh_upload", + ) + path_handler = MagicMock() + packet_handler = MagicMock() + subscriber.register_handler("path_updated", path_handler) + subscriber.register_handler("letsmesh_packet", packet_handler) + subscriber.start() + + with patch.object( + subscriber._letsmesh_decoder, + "decode_payload", + return_value={ + "payloadType": 8, + "payload": { + "decoded": { + "type": 8, + "isValid": True, + "pathLength": 2, + "pathHashes": ["AA", "BB"], + "extraType": 244, + "extraData": "D" * 64, + } + }, + }, + ): + subscriber._handle_mqtt_message( + topic=f"meshcore/BOS/{'a' * 64}/packets", + pattern="meshcore/BOS/+/packets", + payload={ + "packet_type": "8", + "hash": "99887766", + "raw": "ABCDEF", + }, + ) + + packet_handler.assert_not_called() + path_handler.assert_called_once() + _public_key, event_type, payload, _db = path_handler.call_args.args + assert event_type == "path_updated" + assert payload["hop_count"] == 2 + assert payload["path_hashes"] == ["AA", "BB"] + assert payload["extra_type"] == 244 + assert payload["node_public_key"] == "D" * 64 + def test_letsmesh_packet_fallback_logs_decoded_payload( self, mock_mqtt_client, db_manager ) -> None: - """Non-mapped packets include decoder output in letsmesh_packet payload.""" + """Unmapped packets include decoder output in letsmesh_packet payload.""" mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = ( "a" * 64, "packets", @@ -565,12 +681,11 @@ class TestSubscriber: subscriber.start() decoded_packet = { - "payloadType": 8, + "payloadType": 10, "payload": { "decoded": { - "type": 8, + "type": 10, "isValid": True, - "pathHashes": ["AA", "BB", "CC"], } }, } @@ -583,7 +698,7 @@ class TestSubscriber: topic=f"meshcore/BOS/{'a' * 64}/packets", pattern="meshcore/BOS/+/packets", payload={ - "packet_type": "8", + "packet_type": "10", "hash": "99887766", "raw": "ABCDEF", }, @@ -592,7 +707,7 @@ class TestSubscriber: packet_handler.assert_called_once() _public_key, event_type, payload, _db = packet_handler.call_args.args assert event_type == "letsmesh_packet" - assert payload["decoded_payload_type"] == 8 + assert payload["decoded_payload_type"] == 10 assert payload["decoded_packet"] == decoded_packet def test_letsmesh_packet_sender_fallback_from_payload_fields(