Add LetsMesh structured event parity mappings

This commit is contained in:
yellowcooln
2026-03-01 15:06:25 -05:00
parent 54449aa5fb
commit c22274c4e5
6 changed files with 594 additions and 99 deletions

10
PLAN.md
View File

@@ -489,6 +489,16 @@ ${DATA_HOME}/
|----------|---------|-------------|
| DATABASE_URL | sqlite:///{DATA_HOME}/collector/meshcore.db | SQLAlchemy URL |
| TAGS_FILE | {DATA_HOME}/collector/tags.json | Path to tags JSON file |
| COLLECTOR_INGEST_MODE | native | Ingest mode (`native` or `letsmesh_upload`) |
| COLLECTOR_LETSMESH_DECODER_ENABLED | true | Enable external packet decoding in LetsMesh mode |
LetsMesh compatibility parity note:
- `status` feed packets are stored as informational `letsmesh_status` events and do not create advertisement rows.
- Advertisement rows in LetsMesh mode are created from decoded payload type `4` only.
- Decoded payload type `11` is normalized to native `contact` updates.
- Decoded payload type `9` is normalized to native `trace_data`.
- Decoded payload type `8` is normalized to informational `path_updated`.
- Decoded payload type `1` can map to native response-style events when decrypted structured content is available.
### API
| Variable | Default | Description |

View File

@@ -313,15 +313,19 @@ When `COLLECTOR_INGEST_MODE=letsmesh_upload`, the collector subscribes to:
Normalization behavior:
- `status` packets are mapped to `advertisement` events only when node identity metadata is present (`name`, node type, explicit flags, or location); heartbeat/counter-only status frames are stored as `letsmesh_status` logs.
- Decoder payload types `4` and `11` are also mapped to `advertisement` events when node identity metadata is present.
- `status` packets are stored as informational `letsmesh_status` events and are not mapped to `advertisement` rows.
- Decoder payload type `4` is mapped to `advertisement` when node identity metadata is present.
- Decoder payload type `11` (control discover response) is mapped to `contact`.
- Decoder payload type `9` is mapped to `trace_data`.
- Decoder payload type `8` is mapped to informational `path_updated` events.
- Decoder payload type `1` can map to native response events (`telemetry_response`, `battery`, `path_updated`, `status_response`) when decrypted structured content is available.
- `packet_type=5` packets are mapped to `channel_msg_recv`.
- `packet_type=1`, `2`, and `7` packets are mapped to `contact_msg_recv` when decryptable text is available.
- For channel packets, if a channel key is available, a channel label is attached (for example `Public` or `#test`) for UI display.
- In the messages feed and dashboard channel sections, known channel indexes are preferred for labels (`17 -> Public`, `217 -> #test`) to avoid stale channel-name mismatches.
- Additional channel names are loaded from `COLLECTOR_LETSMESH_DECODER_KEYS` when entries are provided as `label=hex` (for example `bot=<key>`).
- Decoder-advertisement packets with location metadata update node GPS (`lat/lon`) for map display.
- Status `stats.debug_flags` values are not used as advertisement capability flags.
- This keeps advertisement listings closer to native mode behavior (node advert traffic only, not observer status telemetry).
- Packets without decryptable message text are kept as informational `letsmesh_packet` events and are not shown in the messages feed; when decode succeeds the decoded JSON is attached to those packet log events.
- When decoder output includes a human sender (`payload.decoded.decrypted.sender`), message text is normalized to `Name: Message` before storage; receiver/observer names are never used as sender fallback.
- The collector keeps built-in keys for `Public` and `#test`, and merges any additional keys from `COLLECTOR_LETSMESH_DECODER_KEYS`.

View File

@@ -184,10 +184,16 @@ Group/broadcast messages on specific channels.
- When decoder output includes a human sender (`payload.decoded.decrypted.sender`), message text is normalized to `Name: Message`; sender identity remains unknown when only hash/prefix metadata is available.
**Compatibility ingest note (advertisements)**:
- In LetsMesh upload compatibility mode, `status` feed payloads are normalized to `ADVERTISEMENT` only when identity metadata is present (`name`, node type, explicit `flags`, or location). Status heartbeat/counter frames are persisted as informational `letsmesh_status` events.
- In LetsMesh upload compatibility mode, decoded payload types `4` and `11` are normalized to `ADVERTISEMENT` when node identity metadata is present.
- In LetsMesh upload compatibility mode, `status` feed payloads are persisted as informational `letsmesh_status` events and are not normalized to `ADVERTISEMENT`.
- In LetsMesh upload compatibility mode, decoded payload type `4` is normalized to `ADVERTISEMENT` when node identity metadata is present.
- Payload type `4` location metadata (`appData.location.latitude/longitude`) is mapped to node `lat/lon` for map rendering.
- `stats.debug_flags` from LetsMesh status feeds are not persisted as advertisement capability flags.
- This keeps advertisement persistence aligned with native mode expectations (advertisement traffic only).
**Compatibility ingest note (non-message structured events)**:
- Decoded payload type `9` is normalized to `TRACE_DATA` (`traceTag`, flags, auth, path hashes, and SNR values).
- Decoded payload type `11` (`Control/NodeDiscoverResp`) is normalized to `contact` events for node upsert parity.
- Decoded payload type `8` is normalized to informational `PATH_UPDATED` events (`hop_count` + path hashes).
- Decoded payload type `1` can be normalized to `TELEMETRY_RESPONSE`, `BATTERY`, `PATH_UPDATED`, or `STATUS_RESPONSE` when decrypted response content is structured and parseable.
---

View File

@@ -753,6 +753,9 @@ This document tracks implementation progress for the MeshCore Hub project. Each
### Decisions Made
*(Record architectural decisions and answers to clarifying questions here)*
- [x] LetsMesh/native advertisement parity: in `letsmesh_upload` mode, observer `status` feed stays informational (`letsmesh_status`) and does not populate `advertisements`.
- [x] LetsMesh advertisement persistence source: decoded packet payload type `4` maps to `advertisement`; payload type `11` maps to `contact` parity updates.
- [x] LetsMesh native-event parity extensions: payload type `9` maps to `trace_data`, payload type `8` maps to informational `path_updated`, and payload type `1` can map to response-style native events when decryptable structured content exists.
- [ ] Q1 (MQTT Broker):
- [ ] Q2 (Database):
- [ ] Q3 (Web Dashboard Separation):

View File

@@ -10,7 +10,9 @@ The subscriber:
"""
import asyncio
import json
import logging
import re
import signal
import threading
import time
@@ -185,12 +187,9 @@ class Subscriber:
observer_public_key, feed_type = parsed
if feed_type == "status":
normalized_status = self._build_letsmesh_status_advertisement_payload(
payload,
observer_public_key=observer_public_key,
)
if normalized_status:
return observer_public_key, "advertisement", normalized_status
# Keep status feed telemetry as informational event logs only.
# This preserves parity with native mode where advertisements are
# sourced from advertisement event traffic, not observer status frames.
return observer_public_key, "letsmesh_status", dict(payload)
if feed_type == "packets":
@@ -204,6 +203,14 @@ class Subscriber:
event_type, message_payload = normalized_message
return observer_public_key, event_type, message_payload
normalized_structured_event = self._build_letsmesh_structured_event_payload(
payload,
decoded_packet=decoded_packet,
)
if normalized_structured_event:
event_type, structured_payload = normalized_structured_event
return observer_public_key, event_type, structured_payload
normalized_advertisement = self._build_letsmesh_advertisement_payload(
payload,
decoded_packet=decoded_packet,
@@ -340,6 +347,332 @@ class Subscriber:
return event_type, normalized_payload
def _build_letsmesh_structured_event_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> tuple[str, dict[str, Any]] | None:
"""Map LetsMesh packet payloads to native collector event types."""
packet_type = self._resolve_letsmesh_packet_type(payload, decoded_packet)
if packet_type is None:
return None
if packet_type == 9:
trace_payload = self._build_letsmesh_trace_payload(payload, decoded_packet)
if trace_payload:
return "trace_data", trace_payload
return None
if packet_type == 11:
contact_payload = self._build_letsmesh_contact_payload(
payload,
decoded_packet,
)
if contact_payload:
return "contact", contact_payload
status_payload = self._build_letsmesh_status_payload(
payload, decoded_packet
)
if status_payload:
return "status_response", status_payload
return None
if packet_type == 8:
path_payload = self._build_letsmesh_path_updated_payload(
payload,
decoded_packet,
)
if path_payload:
return "path_updated", path_payload
return None
if packet_type == 1:
return self._build_letsmesh_response_payload(payload, decoded_packet)
return None
def _build_letsmesh_trace_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Build native `trace_data` payload from LetsMesh trace packets."""
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded_payload:
return None
trace_tag = (
decoded_payload.get("traceTag")
or payload.get("traceTag")
or payload.get("trace_tag")
)
initiator_tag = self._parse_hex_or_int(trace_tag)
if initiator_tag is None:
return None
path_hashes = self._normalize_hash_list(decoded_payload.get("pathHashes"))
snr_values = self._normalize_float_list(decoded_payload.get("snrValues"))
path_len = self._parse_path_length(payload.get("path"))
if path_len is None:
path_len = self._parse_int(decoded_payload.get("pathLength"))
if path_len is None and path_hashes:
path_len = len(path_hashes)
hop_count: int | None = None
if path_hashes:
hop_count = len(path_hashes)
elif snr_values:
hop_count = len(snr_values)
elif path_len is not None:
hop_count = path_len
normalized_payload: dict[str, Any] = {
"initiator_tag": initiator_tag,
}
flags = self._parse_int(decoded_payload.get("flags"))
auth = self._parse_int(decoded_payload.get("authCode"))
if auth is None:
auth = self._parse_int(decoded_payload.get("auth"))
if path_len is not None:
normalized_payload["path_len"] = path_len
if flags is not None:
normalized_payload["flags"] = flags
if auth is not None:
normalized_payload["auth"] = auth
if path_hashes:
normalized_payload["path_hashes"] = path_hashes
if snr_values:
normalized_payload["snr_values"] = snr_values
if hop_count is not None:
normalized_payload["hop_count"] = hop_count
return normalized_payload
def _build_letsmesh_contact_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Build native `contact` payload from LetsMesh control discovery responses."""
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded_payload:
return None
sub_type = self._parse_int(decoded_payload.get("subType"))
# 0x90 (144): Node discover response with identity metadata.
if sub_type is not None and sub_type != 144:
return None
public_key = self._normalize_full_public_key(
decoded_payload.get("publicKey")
or payload.get("public_key")
or payload.get("origin_id")
)
if not public_key:
return None
normalized_payload: dict[str, Any] = {
"public_key": public_key,
}
node_type_raw = self._parse_int(decoded_payload.get("nodeType"))
node_type = self._normalize_letsmesh_node_type(
decoded_payload.get("nodeType") or decoded_payload.get("nodeTypeName")
)
if node_type_raw in {0, 1, 2, 3}:
normalized_payload["type"] = node_type_raw
elif node_type:
normalized_payload["node_type"] = node_type
flags = self._parse_int(decoded_payload.get("rawFlags"))
if flags is not None:
normalized_payload["flags"] = flags
display_name = payload.get("origin") or payload.get("name")
if isinstance(display_name, str) and display_name.strip():
normalized_payload["adv_name"] = display_name.strip()
return normalized_payload
def _build_letsmesh_status_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Build informational `status_response` payload from control packets."""
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded_payload:
return None
status_payload: dict[str, Any] = {}
node_public_key = self._normalize_full_public_key(
decoded_payload.get("publicKey")
or payload.get("public_key")
or payload.get("origin_id")
)
if node_public_key:
status_payload["node_public_key"] = node_public_key
sub_type = self._parse_int(decoded_payload.get("subType"))
if sub_type is not None:
status_payload["status"] = f"control_subtype_{sub_type}"
status_payload["control_subtype"] = sub_type
tag = self._parse_int(decoded_payload.get("tag"))
if tag is not None:
status_payload["tag"] = tag
snr = self._parse_float(decoded_payload.get("snr"))
if snr is not None:
status_payload["snr"] = snr
if "status" not in status_payload and "node_public_key" not in status_payload:
return None
return status_payload
def _build_letsmesh_path_updated_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> dict[str, Any] | None:
"""Build informational `path_updated` payload from LetsMesh path packets."""
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded_payload:
return None
path_hashes = self._normalize_hash_list(decoded_payload.get("pathHashes"))
hop_count = None
if path_hashes:
hop_count = len(path_hashes)
else:
hop_count = self._parse_int(decoded_payload.get("pathLength"))
if hop_count is None:
return None
normalized_payload: dict[str, Any] = {
"hop_count": hop_count,
}
if path_hashes:
normalized_payload["path_hashes"] = path_hashes
extra_type = self._parse_int(decoded_payload.get("extraType"))
if extra_type is not None:
normalized_payload["extra_type"] = extra_type
extra_data = decoded_payload.get("extraData")
if isinstance(extra_data, str) and extra_data.strip():
clean_extra_data = extra_data.strip().upper()
normalized_payload["extra_data"] = clean_extra_data
extracted_public_key = self._extract_public_key_from_hex(clean_extra_data)
if extracted_public_key:
normalized_payload["node_public_key"] = extracted_public_key
node_public_key = self._normalize_full_public_key(
payload.get("public_key") or payload.get("origin_id")
)
if node_public_key and "node_public_key" not in normalized_payload:
normalized_payload["node_public_key"] = node_public_key
return normalized_payload
def _build_letsmesh_response_payload(
self,
payload: dict[str, Any],
decoded_packet: dict[str, Any] | None = None,
) -> tuple[str, dict[str, Any]] | None:
"""Build native events from decrypted response payloads when possible."""
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
if not decoded_payload:
return None
decrypted = decoded_payload.get("decrypted")
if not isinstance(decrypted, dict):
return None
content_data = self._extract_response_content_data(decrypted.get("content"))
if not content_data:
return None
node_public_key = self._normalize_full_public_key(
content_data.get("node_public_key")
or content_data.get("public_key")
or content_data.get("nodePublicKey")
or payload.get("public_key")
or payload.get("origin_id")
)
battery_voltage = self._parse_float(
content_data.get("battery_voltage") or content_data.get("batteryVoltage")
)
battery_percentage = self._parse_int(
content_data.get("battery_percentage")
or content_data.get("batteryPercentage")
)
if battery_voltage is not None and battery_percentage is not None:
return "battery", {
"battery_voltage": battery_voltage,
"battery_percentage": battery_percentage,
}
telemetry_data = content_data.get("parsed_data")
if not isinstance(telemetry_data, dict):
telemetry_data = content_data.get("parsedData")
if not isinstance(telemetry_data, dict):
telemetry_data = {
key: value
for key, value in content_data.items()
if key
not in {
"node_public_key",
"public_key",
"nodePublicKey",
"lpp_data",
"lppData",
}
}
if not telemetry_data:
telemetry_data = None
if node_public_key and telemetry_data:
telemetry_payload: dict[str, Any] = {
"node_public_key": node_public_key,
"parsed_data": telemetry_data,
}
lpp_data = content_data.get("lpp_data") or content_data.get("lppData")
if isinstance(lpp_data, str) and lpp_data.strip():
telemetry_payload["lpp_data"] = lpp_data.strip()
return "telemetry_response", telemetry_payload
if node_public_key:
hop_count = self._parse_int(content_data.get("hop_count"))
if hop_count is None:
hop_count = self._parse_int(content_data.get("hopCount"))
if hop_count is not None:
return "path_updated", {
"node_public_key": node_public_key,
"hop_count": hop_count,
}
status = content_data.get("status")
if isinstance(status, str) and status.strip():
status_payload: dict[str, Any] = {
"status": status.strip(),
}
if node_public_key:
status_payload["node_public_key"] = node_public_key
uptime = self._parse_int(content_data.get("uptime"))
message_count = self._parse_int(content_data.get("message_count"))
if message_count is None:
message_count = self._parse_int(content_data.get("messageCount"))
if uptime is not None:
status_payload["uptime"] = uptime
if message_count is not None:
status_payload["message_count"] = message_count
return "status_response", status_payload
return None
def _build_letsmesh_advertisement_payload(
self,
payload: dict[str, Any],
@@ -355,7 +688,7 @@ class Subscriber:
decoded_packet
)
# Primary packet forms that carry node identity/role/location metadata.
if decoded_payload_type not in {4, 11}:
if decoded_payload_type != 4:
return None
decoded_payload = self._extract_letsmesh_decoder_payload(decoded_packet)
@@ -425,59 +758,6 @@ class Subscriber:
return normalized_payload
def _build_letsmesh_status_advertisement_payload(
self,
payload: dict[str, Any],
observer_public_key: str,
) -> dict[str, Any] | None:
"""Normalize LetsMesh status feed payloads into advertisement events."""
status_public_key = self._normalize_full_public_key(
payload.get("origin_id") or payload.get("public_key") or observer_public_key
)
if not status_public_key:
return None
normalized_payload: dict[str, Any] = {"public_key": status_public_key}
status_name = payload.get("origin") or payload.get("name")
if isinstance(status_name, str) and status_name.strip():
normalized_payload["name"] = status_name.strip()
normalized_adv_type = self._normalize_letsmesh_adv_type(payload)
if normalized_adv_type:
normalized_payload["adv_type"] = normalized_adv_type
# Only trust explicit status payload flags. stats.debug_flags are observer/debug
# counters and cause false capability flags + inflated dedup churn.
explicit_flags = self._parse_int(payload.get("flags"))
if explicit_flags is not None:
normalized_payload["flags"] = explicit_flags
lat = self._parse_float(payload.get("lat"))
lon = self._parse_float(payload.get("lon"))
if lat is None:
lat = self._parse_float(payload.get("adv_lat"))
if lon is None:
lon = self._parse_float(payload.get("adv_lon"))
location = payload.get("location")
if isinstance(location, dict):
if lat is None:
lat = self._parse_float(location.get("latitude"))
if lon is None:
lon = self._parse_float(location.get("longitude"))
if lat is not None:
normalized_payload["lat"] = lat
if lon is not None:
normalized_payload["lon"] = lon
# Ignore status heartbeat/counter frames that have no node identity metadata.
if not any(
key in normalized_payload
for key in ("name", "adv_type", "flags", "lat", "lon")
):
return None
return normalized_payload
@classmethod
def _extract_letsmesh_text(
cls,
@@ -574,6 +854,83 @@ class Subscriber:
decoded = payload.get("decoded")
return decoded if isinstance(decoded, dict) else None
@staticmethod
def _extract_response_content_data(value: Any) -> dict[str, Any] | None:
"""Parse response `content` payload into a dictionary when possible."""
if isinstance(value, dict):
return value
if not isinstance(value, str):
return None
text = value.strip()
if not text:
return None
if text.startswith("{") and text.endswith("}"):
try:
parsed = json.loads(text)
except json.JSONDecodeError:
return None
return parsed if isinstance(parsed, dict) else None
return None
@staticmethod
def _normalize_hash_list(value: Any) -> list[str] | None:
"""Normalize a list of one-byte hash strings."""
if not isinstance(value, list):
return None
normalized: list[str] = []
for item in value:
if not isinstance(item, str):
continue
token = item.strip().upper()
if len(token) != 2:
continue
if any(ch not in "0123456789ABCDEF" for ch in token):
continue
normalized.append(token)
return normalized or None
@staticmethod
def _normalize_float_list(value: Any) -> list[float] | None:
"""Normalize a list of numeric values as floats."""
if not isinstance(value, list):
return None
normalized: list[float] = []
for item in value:
if isinstance(item, (int, float)):
normalized.append(float(item))
return normalized or None
@staticmethod
def _extract_public_key_from_hex(value: str) -> str | None:
"""Extract the first 64-char hex segment from a payload string."""
match = re.search(r"([0-9A-Fa-f]{64})", value)
if not match:
return None
return match.group(1).upper()
@classmethod
def _parse_hex_or_int(cls, value: Any) -> int | None:
"""Parse integers represented as decimal or hexadecimal strings."""
parsed = cls._parse_int(value)
if parsed is not None:
return parsed
if not isinstance(value, str):
return None
token = value.strip().removeprefix("0x").removeprefix("0X")
if not token:
return None
if any(ch not in "0123456789ABCDEFabcdef" for ch in token):
return None
try:
return int(token, 16)
except ValueError:
return None
@classmethod
def _extract_letsmesh_decoder_payload_type(
cls,

View File

@@ -89,17 +89,19 @@ class TestSubscriber:
mock_mqtt_client.subscribe.assert_has_calls(expected_calls, any_order=False)
assert mock_mqtt_client.subscribe.call_count == 3
def test_letsmesh_status_maps_to_advertisement(
def test_letsmesh_status_maps_to_letsmesh_status(
self, mock_mqtt_client, db_manager
) -> None:
"""LetsMesh status payloads are normalized to advertisement events."""
"""LetsMesh status payloads are stored as informational status events."""
subscriber = Subscriber(
mock_mqtt_client,
db_manager,
ingest_mode="letsmesh_upload",
)
handler = MagicMock()
subscriber.register_handler("advertisement", handler)
advert_handler = MagicMock()
status_handler = MagicMock()
subscriber.register_handler("advertisement", advert_handler)
subscriber.register_handler("letsmesh_status", status_handler)
subscriber.start()
subscriber._handle_mqtt_message(
@@ -114,26 +116,28 @@ class TestSubscriber:
},
)
handler.assert_called_once()
public_key, event_type, payload, _db = handler.call_args.args
advert_handler.assert_not_called()
status_handler.assert_called_once()
public_key, event_type, payload, _db = status_handler.call_args.args
assert public_key == "a" * 64
assert event_type == "advertisement"
assert payload["public_key"] == ("b" * 64).upper()
assert payload["name"] == "Observer Node"
assert payload["adv_type"] == "repeater"
assert payload["flags"] == 7
assert event_type == "letsmesh_status"
assert payload["origin_id"] == "b" * 64
assert payload["origin"] == "Observer Node"
assert payload["mode"] == "repeater"
def test_letsmesh_status_does_not_use_debug_flags_as_advert_flags(
def test_letsmesh_status_with_debug_flags_does_not_emit_advertisement(
self, mock_mqtt_client, db_manager
) -> None:
"""debug_flags should not be stored as node capability flags."""
"""Status debug metadata should remain informational only."""
subscriber = Subscriber(
mock_mqtt_client,
db_manager,
ingest_mode="letsmesh_upload",
)
handler = MagicMock()
subscriber.register_handler("advertisement", handler)
advert_handler = MagicMock()
status_handler = MagicMock()
subscriber.register_handler("advertisement", advert_handler)
subscriber.register_handler("letsmesh_status", status_handler)
subscriber.start()
subscriber._handle_mqtt_message(
@@ -147,14 +151,15 @@ class TestSubscriber:
},
)
handler.assert_called_once()
_public_key, _event_type, payload, _db = handler.call_args.args
assert "flags" not in payload
advert_handler.assert_not_called()
status_handler.assert_called_once()
_public_key, _event_type, payload, _db = status_handler.call_args.args
assert payload["stats"]["debug_flags"] == 7
def test_letsmesh_status_without_identity_maps_to_letsmesh_status(
self, mock_mqtt_client, db_manager
) -> None:
"""Status heartbeat payloads without identity metadata should not inflate adverts."""
"""Status heartbeat payloads without identity metadata stay informational."""
subscriber = Subscriber(
mock_mqtt_client,
db_manager,
@@ -497,10 +502,10 @@ class TestSubscriber:
assert payload["lat"] == 42.470001
assert payload["lon"] == -71.330001
def test_letsmesh_packet_type_11_maps_to_advertisement(
def test_letsmesh_packet_type_11_maps_to_contact(
self, mock_mqtt_client, db_manager
) -> None:
"""Decoder packet type 11 is mapped to advertisement metadata updates."""
"""Decoder packet type 11 is mapped to native contact events."""
mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = (
"a" * 64,
"packets",
@@ -510,8 +515,10 @@ class TestSubscriber:
db_manager,
ingest_mode="letsmesh_upload",
)
handler = MagicMock()
subscriber.register_handler("advertisement", handler)
contact_handler = MagicMock()
advert_handler = MagicMock()
subscriber.register_handler("contact", contact_handler)
subscriber.register_handler("advertisement", advert_handler)
subscriber.start()
with patch.object(
@@ -540,17 +547,126 @@ class TestSubscriber:
},
)
handler.assert_called_once()
_public_key, event_type, payload, _db = handler.call_args.args
assert event_type == "advertisement"
advert_handler.assert_not_called()
contact_handler.assert_called_once()
_public_key, event_type, payload, _db = contact_handler.call_args.args
assert event_type == "contact"
assert payload["public_key"] == "C" * 64
assert payload["adv_type"] == "repeater"
assert payload["type"] == 2
assert payload["flags"] == 146
def test_letsmesh_packet_type_9_maps_to_trace_data(
self, mock_mqtt_client, db_manager
) -> None:
"""Decoder packet type 9 is mapped to native trace_data events."""
mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = (
"a" * 64,
"packets",
)
subscriber = Subscriber(
mock_mqtt_client,
db_manager,
ingest_mode="letsmesh_upload",
)
trace_handler = MagicMock()
subscriber.register_handler("trace_data", trace_handler)
subscriber.start()
with patch.object(
subscriber._letsmesh_decoder,
"decode_payload",
return_value={
"payloadType": 9,
"pathLength": 4,
"payload": {
"decoded": {
"type": 9,
"traceTag": "DF9D7A20",
"authCode": 0,
"flags": 0,
"pathHashes": ["71", "0B", "24", "0B"],
"snrValues": [12.5, 11.5, 10, 6.25],
}
},
},
):
subscriber._handle_mqtt_message(
topic=f"meshcore/BOS/{'a' * 64}/packets",
pattern="meshcore/BOS/+/packets",
payload={
"packet_type": "9",
"hash": "99887766",
"raw": "ABCDEF",
},
)
trace_handler.assert_called_once()
_public_key, event_type, payload, _db = trace_handler.call_args.args
assert event_type == "trace_data"
assert payload["initiator_tag"] == int("DF9D7A20", 16)
assert payload["path_hashes"] == ["71", "0B", "24", "0B"]
assert payload["hop_count"] == 4
assert payload["snr_values"] == [12.5, 11.5, 10.0, 6.25]
def test_letsmesh_packet_type_8_maps_to_path_updated(
self, mock_mqtt_client, db_manager
) -> None:
"""Decoder packet type 8 is mapped to native path_updated events."""
mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = (
"a" * 64,
"packets",
)
subscriber = Subscriber(
mock_mqtt_client,
db_manager,
ingest_mode="letsmesh_upload",
)
path_handler = MagicMock()
packet_handler = MagicMock()
subscriber.register_handler("path_updated", path_handler)
subscriber.register_handler("letsmesh_packet", packet_handler)
subscriber.start()
with patch.object(
subscriber._letsmesh_decoder,
"decode_payload",
return_value={
"payloadType": 8,
"payload": {
"decoded": {
"type": 8,
"isValid": True,
"pathLength": 2,
"pathHashes": ["AA", "BB"],
"extraType": 244,
"extraData": "D" * 64,
}
},
},
):
subscriber._handle_mqtt_message(
topic=f"meshcore/BOS/{'a' * 64}/packets",
pattern="meshcore/BOS/+/packets",
payload={
"packet_type": "8",
"hash": "99887766",
"raw": "ABCDEF",
},
)
packet_handler.assert_not_called()
path_handler.assert_called_once()
_public_key, event_type, payload, _db = path_handler.call_args.args
assert event_type == "path_updated"
assert payload["hop_count"] == 2
assert payload["path_hashes"] == ["AA", "BB"]
assert payload["extra_type"] == 244
assert payload["node_public_key"] == "D" * 64
def test_letsmesh_packet_fallback_logs_decoded_payload(
self, mock_mqtt_client, db_manager
) -> None:
"""Non-mapped packets include decoder output in letsmesh_packet payload."""
"""Unmapped packets include decoder output in letsmesh_packet payload."""
mock_mqtt_client.topic_builder.parse_letsmesh_upload_topic.return_value = (
"a" * 64,
"packets",
@@ -565,12 +681,11 @@ class TestSubscriber:
subscriber.start()
decoded_packet = {
"payloadType": 8,
"payloadType": 10,
"payload": {
"decoded": {
"type": 8,
"type": 10,
"isValid": True,
"pathHashes": ["AA", "BB", "CC"],
}
},
}
@@ -583,7 +698,7 @@ class TestSubscriber:
topic=f"meshcore/BOS/{'a' * 64}/packets",
pattern="meshcore/BOS/+/packets",
payload={
"packet_type": "8",
"packet_type": "10",
"hash": "99887766",
"raw": "ABCDEF",
},
@@ -592,7 +707,7 @@ class TestSubscriber:
packet_handler.assert_called_once()
_public_key, event_type, payload, _db = packet_handler.call_args.args
assert event_type == "letsmesh_packet"
assert payload["decoded_payload_type"] == 8
assert payload["decoded_payload_type"] == 10
assert payload["decoded_packet"] == decoded_packet
def test_letsmesh_packet_sender_fallback_from_payload_fields(