Normalize numeric client roles using Meshtastic CLI enums (#402)

* Normalize firmware client roles using CLI enums

* Prioritize CLI role lookup before protobuf fallbacks
This commit is contained in:
l5y
2025-10-31 11:43:48 +01:00
committed by GitHub
parent d94d75e605
commit 87b4cd79e7
2 changed files with 202 additions and 0 deletions

View File

@@ -22,6 +22,8 @@ from __future__ import annotations
import base64
import dataclasses
import enum
import importlib
import json
import math
import time
@@ -31,6 +33,18 @@ from google.protobuf.json_format import MessageToDict
from google.protobuf.message import DecodeError
from google.protobuf.message import Message as ProtoMessage
_CLI_ROLE_MODULE_NAMES: tuple[str, ...] = (
"meshtastic.cli.common",
"meshtastic.cli.roles",
"meshtastic.cli.enums",
"meshtastic_cli.common",
"meshtastic_cli.roles",
)
"""Possible module paths that may expose the Meshtastic CLI role enum."""
_CLI_ROLE_LOOKUP: dict[int, str] | None = None
"""Cached mapping of CLI role identifiers to their textual names."""
def _get(obj, key, default=None):
"""Return ``obj[key]`` or ``getattr(obj, key)`` when available.
@@ -49,6 +63,96 @@ def _get(obj, key, default=None):
return getattr(obj, key, default)
def _reset_cli_role_cache() -> None:
"""Clear the cached CLI role lookup mapping.
The helper is primarily used by tests to ensure deterministic behaviour
when substituting stub CLI modules.
Returns:
``None``. The next lookup will trigger a fresh import attempt.
"""
global _CLI_ROLE_LOOKUP
_CLI_ROLE_LOOKUP = None
def _load_cli_role_lookup() -> dict[int, str]:
"""Return a mapping of role identifiers from the Meshtastic CLI.
The Meshtastic CLI exposes extended role enums that may include entries
absent from the protobuf definition shipped with the firmware. This
helper lazily imports the CLI module when present and extracts the
available role names so that numeric values received from the firmware can
be normalised into human-friendly strings.
Returns:
Mapping of integer role identifiers to their canonical string names.
"""
global _CLI_ROLE_LOOKUP
if _CLI_ROLE_LOOKUP is not None:
return _CLI_ROLE_LOOKUP
lookup: dict[int, str] = {}
def _from_candidate(candidate) -> dict[int, str]:
mapping: dict[int, str] = {}
if isinstance(candidate, enum.EnumMeta):
for member in candidate: # pragma: no branch - Enum iteration deterministic
try:
mapping[int(member.value)] = str(member.name)
except Exception: # pragma: no cover - defensive guard
continue
return mapping
members = getattr(candidate, "__members__", None)
if isinstance(members, Mapping):
for name, member in members.items():
value = getattr(member, "value", None)
if isinstance(value, (int, enum.IntEnum)):
try:
mapping[int(value)] = str(name)
except Exception: # pragma: no cover - defensive
continue
if mapping:
return mapping
if isinstance(candidate, Mapping):
for key, value in candidate.items():
try:
key_int = int(key)
except Exception: # pragma: no cover - defensive
continue
mapping[key_int] = str(value)
return mapping
for module_name in _CLI_ROLE_MODULE_NAMES:
try:
module = importlib.import_module(module_name)
except Exception: # pragma: no cover - optional dependency
continue
candidates = []
for attr_name in ("Role", "Roles", "ClientRole", "ClientRoles"):
candidate = getattr(module, attr_name, None)
if candidate is not None:
candidates.append(candidate)
for candidate in candidates:
mapping = _from_candidate(candidate)
if not mapping:
continue
lookup.update(mapping)
if lookup:
break
_CLI_ROLE_LOOKUP = {
key: value.strip().upper()
for key, value in lookup.items()
if isinstance(value, str) and value.strip()
}
return _CLI_ROLE_LOOKUP
def _node_to_dict(n) -> dict:
"""Convert ``n`` into a JSON-serialisable mapping.
@@ -99,6 +203,57 @@ def _node_to_dict(n) -> dict:
return _convert(n)
def _normalize_user_role(value) -> str | None:
"""Return a canonical role string for ``value`` when possible.
Parameters:
value: Raw role descriptor emitted by the Meshtastic firmware or
decoded JSON payloads.
Returns:
Uppercase role string or ``None`` if the value cannot be resolved.
"""
if value is None:
return None
if isinstance(value, str):
cleaned = value.strip()
if not cleaned:
return None
return cleaned.upper()
numeric = _coerce_int(value)
if numeric is None:
return None
role_name = None
cli_lookup = _load_cli_role_lookup()
role_name = cli_lookup.get(numeric)
if not role_name:
try: # pragma: no branch - minimal control flow
from meshtastic.protobuf import mesh_pb2
role_name = mesh_pb2.User.Role.Name(numeric)
except Exception: # pragma: no cover - depends on protobuf version
role_name = None
if not role_name:
try:
from meshtastic.protobuf import config_pb2
role_name = config_pb2.Config.DeviceConfig.Role.Name(numeric)
except Exception: # pragma: no cover - depends on protobuf version
role_name = None
if role_name:
return role_name.strip().upper()
return str(numeric)
def upsert_payload(node_id, node) -> dict:
"""Return the payload expected by ``/api/nodes`` upsert requests.
@@ -587,6 +742,11 @@ def _nodeinfo_user_dict(node_info, decoded_user):
if canonical:
user_dict = dict(user_dict)
user_dict["id"] = canonical
role_value = user_dict.get("role")
normalized_role = _normalize_user_role(role_value)
if normalized_role and normalized_role != role_value:
user_dict = dict(user_dict)
user_dict["role"] = normalized_role
return user_dict
@@ -594,6 +754,8 @@ __all__ = [
"_canonical_node_id",
"_coerce_float",
"_coerce_int",
"_load_cli_role_lookup",
"_normalize_user_role",
"_decode_nodeinfo_payload",
"_extract_payload_bytes",
"_first",
@@ -606,6 +768,7 @@ __all__ = [
"_nodeinfo_position_dict",
"_nodeinfo_user_dict",
"_pkt_to_dict",
"_reset_cli_role_cache",
"DecodeError",
"MessageToDict",
"ProtoMessage",

View File

@@ -13,6 +13,7 @@
# limitations under the License.
import base64
import enum
import importlib
import re
import sys
@@ -2191,6 +2192,44 @@ def test_nodeinfo_helpers_cover_fallbacks(mesh_module, monkeypatch):
assert user["id"] == "!11223344"
def test_nodeinfo_user_role_falls_back_to_cli_enum(mesh_module, monkeypatch):
mesh = mesh_module
mesh._reset_cli_role_cache()
cli_module = types.ModuleType("meshtastic.cli")
cli_common = types.ModuleType("meshtastic.cli.common")
class DummyRole(enum.IntEnum):
CLIENT = 0
CLIENT_BASE = 12
cli_common.Role = DummyRole
cli_module.common = cli_common
monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_module)
monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_common)
user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12})
assert user["role"] == "CLIENT_BASE"
mesh._reset_cli_role_cache()
cli_dict_module = types.ModuleType("meshtastic.cli")
cli_dict_common = types.ModuleType("meshtastic.cli.common")
cli_dict_common.ClientRoles = {12: "client_hidden"}
cli_dict_module.common = cli_dict_common
monkeypatch.setitem(sys.modules, "meshtastic.cli", cli_dict_module)
monkeypatch.setitem(sys.modules, "meshtastic.cli.common", cli_dict_common)
user = mesh._nodeinfo_user_dict(None, {"id": "!11223344", "role": 12})
assert user["role"] == "CLIENT_HIDDEN"
mesh._reset_cli_role_cache()
def test_store_position_packet_defaults(mesh_module, monkeypatch):
mesh = mesh_module
captured = []