diff --git a/data/mesh_ingestor/serialization.py b/data/mesh_ingestor/serialization.py index 4725b56..183932f 100644 --- a/data/mesh_ingestor/serialization.py +++ b/data/mesh_ingestor/serialization.py @@ -22,6 +22,8 @@ from __future__ import annotations import base64 import dataclasses +import enum +import importlib import json import math import time @@ -31,6 +33,18 @@ from google.protobuf.json_format import MessageToDict from google.protobuf.message import DecodeError from google.protobuf.message import Message as ProtoMessage +_CLI_ROLE_MODULE_NAMES: tuple[str, ...] = ( + "meshtastic.cli.common", + "meshtastic.cli.roles", + "meshtastic.cli.enums", + "meshtastic_cli.common", + "meshtastic_cli.roles", +) +"""Possible module paths that may expose the Meshtastic CLI role enum.""" + +_CLI_ROLE_LOOKUP: dict[int, str] | None = None +"""Cached mapping of CLI role identifiers to their textual names.""" + def _get(obj, key, default=None): """Return ``obj[key]`` or ``getattr(obj, key)`` when available. @@ -49,6 +63,96 @@ def _get(obj, key, default=None): return getattr(obj, key, default) +def _reset_cli_role_cache() -> None: + """Clear the cached CLI role lookup mapping. + + The helper is primarily used by tests to ensure deterministic behaviour + when substituting stub CLI modules. + + Returns: + ``None``. The next lookup will trigger a fresh import attempt. + """ + + global _CLI_ROLE_LOOKUP + _CLI_ROLE_LOOKUP = None + + +def _load_cli_role_lookup() -> dict[int, str]: + """Return a mapping of role identifiers from the Meshtastic CLI. + + The Meshtastic CLI exposes extended role enums that may include entries + absent from the protobuf definition shipped with the firmware. This + helper lazily imports the CLI module when present and extracts the + available role names so that numeric values received from the firmware can + be normalised into human-friendly strings. + + Returns: + Mapping of integer role identifiers to their canonical string names. + """ + + global _CLI_ROLE_LOOKUP + if _CLI_ROLE_LOOKUP is not None: + return _CLI_ROLE_LOOKUP + + lookup: dict[int, str] = {} + + def _from_candidate(candidate) -> dict[int, str]: + mapping: dict[int, str] = {} + if isinstance(candidate, enum.EnumMeta): + for member in candidate: # pragma: no branch - Enum iteration deterministic + try: + mapping[int(member.value)] = str(member.name) + except Exception: # pragma: no cover - defensive guard + continue + return mapping + members = getattr(candidate, "__members__", None) + if isinstance(members, Mapping): + for name, member in members.items(): + value = getattr(member, "value", None) + if isinstance(value, (int, enum.IntEnum)): + try: + mapping[int(value)] = str(name) + except Exception: # pragma: no cover - defensive + continue + if mapping: + return mapping + if isinstance(candidate, Mapping): + for key, value in candidate.items(): + try: + key_int = int(key) + except Exception: # pragma: no cover - defensive + continue + mapping[key_int] = str(value) + return mapping + + for module_name in _CLI_ROLE_MODULE_NAMES: + try: + module = importlib.import_module(module_name) + except Exception: # pragma: no cover - optional dependency + continue + + candidates = [] + for attr_name in ("Role", "Roles", "ClientRole", "ClientRoles"): + candidate = getattr(module, attr_name, None) + if candidate is not None: + candidates.append(candidate) + + for candidate in candidates: + mapping = _from_candidate(candidate) + if not mapping: + continue + lookup.update(mapping) + if lookup: + break + + _CLI_ROLE_LOOKUP = { + key: value.strip().upper() + for key, value in lookup.items() + if isinstance(value, str) and value.strip() + } + return _CLI_ROLE_LOOKUP + + def _node_to_dict(n) -> dict: """Convert ``n`` into a JSON-serialisable mapping. @@ -99,6 +203,57 @@ def _node_to_dict(n) -> dict: return _convert(n) +def _normalize_user_role(value) -> str | None: + """Return a canonical role string for ``value`` when possible. + + Parameters: + value: Raw role descriptor emitted by the Meshtastic firmware or + decoded JSON payloads. + + Returns: + Uppercase role string or ``None`` if the value cannot be resolved. + """ + + if value is None: + return None + + if isinstance(value, str): + cleaned = value.strip() + if not cleaned: + return None + return cleaned.upper() + + numeric = _coerce_int(value) + if numeric is None: + return None + + role_name = None + + cli_lookup = _load_cli_role_lookup() + role_name = cli_lookup.get(numeric) + + if not role_name: + try: # pragma: no branch - minimal control flow + from meshtastic.protobuf import mesh_pb2 + + role_name = mesh_pb2.User.Role.Name(numeric) + except Exception: # pragma: no cover - depends on protobuf version + role_name = None + + if not role_name: + try: + from meshtastic.protobuf import config_pb2 + + role_name = config_pb2.Config.DeviceConfig.Role.Name(numeric) + except Exception: # pragma: no cover - depends on protobuf version + role_name = None + + if role_name: + return role_name.strip().upper() + + return str(numeric) + + def upsert_payload(node_id, node) -> dict: """Return the payload expected by ``/api/nodes`` upsert requests. @@ -587,6 +742,11 @@ def _nodeinfo_user_dict(node_info, decoded_user): if canonical: user_dict = dict(user_dict) user_dict["id"] = canonical + role_value = user_dict.get("role") + normalized_role = _normalize_user_role(role_value) + if normalized_role and normalized_role != role_value: + user_dict = dict(user_dict) + user_dict["role"] = normalized_role return user_dict @@ -594,6 +754,8 @@ __all__ = [ "_canonical_node_id", "_coerce_float", "_coerce_int", + "_load_cli_role_lookup", + "_normalize_user_role", "_decode_nodeinfo_payload", "_extract_payload_bytes", "_first", @@ -606,6 +768,7 @@ __all__ = [ "_nodeinfo_position_dict", "_nodeinfo_user_dict", "_pkt_to_dict", + "_reset_cli_role_cache", "DecodeError", "MessageToDict", "ProtoMessage", diff --git a/tests/test_mesh.py b/tests/test_mesh.py index c0b8e99..a51229e 100644 --- a/tests/test_mesh.py +++ b/tests/test_mesh.py @@ -13,6 +13,7 @@ # limitations under the License. import base64 +import enum import importlib import re import sys @@ -2191,6 +2192,44 @@ def test_nodeinfo_helpers_cover_fallbacks(mesh_module, monkeypatch): assert user["id"] == "!11223344" +def test_nodeinfo_user_role_falls_back_to_cli_enum(mesh_module, monkeypatch): + mesh = mesh_module + mesh._reset_cli_role_cache() + + cli_module = types.ModuleType("meshtastic.cli") + cli_common = types.ModuleType("meshtastic.cli.common") + + class DummyRole(enum.IntEnum): + CLIENT = 0 + CLIENT_BASE = 12 + + cli_common.Role = DummyRole + cli_module.common = cli_common + + monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_module) + monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_common) + + user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12}) + + assert user["role"] == "CLIENT_BASE" + + mesh._reset_cli_role_cache() + + cli_dict_module = types.ModuleType("meshtastic.cli") + cli_dict_common = types.ModuleType("meshtastic.cli.common") + cli_dict_common.ClientRoles = {12: "client_hidden"} + cli_dict_module.common = cli_dict_common + + monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_dict_module) + monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_dict_common) + + user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12}) + + assert user["role"] == "CLIENT_HIDDEN" + + mesh._reset_cli_role_cache() + + def test_store_position_packet_defaults(mesh_module, monkeypatch): mesh = mesh_module captured = []