mirror of
https://github.com/l5yth/potato-mesh.git
synced 2026-05-07 22:04:53 +02:00
ac36db19a7
* Floor override frequencies to MHz integers * Handle stub protobuf nodeinfo dictionaries * Handle zero override frequency defaults * Add defensive serialization coverage * Stabilize nodeinfo user role normalization test
881 lines
29 KiB
Python
881 lines
29 KiB
Python
# Copyright © 2025-26 l5yth & contributors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Mesh interface discovery helpers for interacting with Meshtastic hardware."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import glob
|
|
import importlib
|
|
import ipaddress
|
|
import math
|
|
import re
|
|
import sys
|
|
import urllib.parse
|
|
from collections.abc import Mapping
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
try: # pragma: no cover - dependency optional in tests
|
|
import meshtastic # type: ignore
|
|
except Exception: # pragma: no cover - dependency optional in tests
|
|
meshtastic = None # type: ignore[assignment]
|
|
|
|
from . import channels, config, serialization
|
|
|
|
|
|
def _ensure_mapping(value) -> Mapping | None:
|
|
"""Return ``value`` as a mapping when conversion is possible."""
|
|
|
|
if isinstance(value, Mapping):
|
|
return value
|
|
if hasattr(value, "__dict__") and isinstance(value.__dict__, Mapping):
|
|
return value.__dict__
|
|
with contextlib.suppress(Exception):
|
|
converted = serialization._node_to_dict(value)
|
|
if isinstance(converted, Mapping):
|
|
return converted
|
|
return None
|
|
|
|
|
|
def _is_nodeish_identifier(value: Any) -> bool:
|
|
"""Return ``True`` when ``value`` resembles a Meshtastic node identifier."""
|
|
|
|
if isinstance(value, (int, float)):
|
|
return False
|
|
if not isinstance(value, str):
|
|
return False
|
|
|
|
trimmed = value.strip()
|
|
if not trimmed:
|
|
return False
|
|
if trimmed.startswith("^"):
|
|
return True
|
|
if trimmed.startswith("!"):
|
|
trimmed = trimmed[1:]
|
|
elif trimmed.lower().startswith("0x"):
|
|
trimmed = trimmed[2:]
|
|
elif not re.search(r"[a-fA-F]", trimmed):
|
|
# Bare decimal strings should not be treated as node ids when labelled "id".
|
|
return False
|
|
|
|
return bool(re.fullmatch(r"[0-9a-fA-F]{1,8}", trimmed))
|
|
|
|
|
|
def _candidate_node_id(mapping: Mapping | None) -> str | None:
|
|
"""Extract a canonical node identifier from ``mapping`` when present."""
|
|
|
|
if mapping is None:
|
|
return None
|
|
|
|
node_keys = (
|
|
"fromId",
|
|
"from_id",
|
|
"from",
|
|
"nodeId",
|
|
"node_id",
|
|
"nodeNum",
|
|
"node_num",
|
|
"num",
|
|
"userId",
|
|
"user_id",
|
|
)
|
|
|
|
for key in node_keys:
|
|
with contextlib.suppress(Exception):
|
|
node_id = serialization._canonical_node_id(mapping.get(key))
|
|
if node_id:
|
|
return node_id
|
|
|
|
with contextlib.suppress(Exception):
|
|
value = mapping.get("id")
|
|
if _is_nodeish_identifier(value):
|
|
node_id = serialization._canonical_node_id(value)
|
|
if node_id:
|
|
return node_id
|
|
|
|
user_section = _ensure_mapping(mapping.get("user"))
|
|
if user_section is not None:
|
|
for key in ("userId", "user_id", "num", "nodeNum", "node_num"):
|
|
with contextlib.suppress(Exception):
|
|
node_id = serialization._canonical_node_id(user_section.get(key))
|
|
if node_id:
|
|
return node_id
|
|
with contextlib.suppress(Exception):
|
|
user_id_value = user_section.get("id")
|
|
if _is_nodeish_identifier(user_id_value):
|
|
node_id = serialization._canonical_node_id(user_id_value)
|
|
if node_id:
|
|
return node_id
|
|
|
|
decoded_section = _ensure_mapping(mapping.get("decoded"))
|
|
if decoded_section is not None:
|
|
node_id = _candidate_node_id(decoded_section)
|
|
if node_id:
|
|
return node_id
|
|
|
|
payload_section = _ensure_mapping(mapping.get("payload"))
|
|
if payload_section is not None:
|
|
node_id = _candidate_node_id(payload_section)
|
|
if node_id:
|
|
return node_id
|
|
|
|
for key in ("packet", "meta", "info"):
|
|
node_id = _candidate_node_id(_ensure_mapping(mapping.get(key)))
|
|
if node_id:
|
|
return node_id
|
|
|
|
for value in mapping.values():
|
|
if isinstance(value, (list, tuple)):
|
|
for item in value:
|
|
node_id = _candidate_node_id(_ensure_mapping(item))
|
|
if node_id:
|
|
return node_id
|
|
else:
|
|
node_id = _candidate_node_id(_ensure_mapping(value))
|
|
if node_id:
|
|
return node_id
|
|
|
|
return None
|
|
|
|
|
|
def _extract_host_node_id(iface) -> str | None:
|
|
"""Return the canonical node identifier for the connected host device."""
|
|
|
|
if iface is None:
|
|
return None
|
|
|
|
def _as_mapping(candidate) -> Mapping | None:
|
|
mapping = _ensure_mapping(candidate)
|
|
if mapping is not None:
|
|
return mapping
|
|
if callable(candidate):
|
|
with contextlib.suppress(Exception):
|
|
return _ensure_mapping(candidate())
|
|
return None
|
|
|
|
candidates: list[Mapping] = []
|
|
for attr in ("myInfo", "my_node_info", "myNodeInfo", "my_node", "localNode"):
|
|
mapping = _as_mapping(getattr(iface, attr, None))
|
|
if mapping is None:
|
|
continue
|
|
candidates.append(mapping)
|
|
nested_info = _ensure_mapping(mapping.get("info"))
|
|
if nested_info:
|
|
candidates.append(nested_info)
|
|
|
|
for mapping in candidates:
|
|
node_id = _candidate_node_id(mapping)
|
|
if node_id:
|
|
return node_id
|
|
for key in ("myNodeNum", "my_node_num", "myNodeId", "my_node_id"):
|
|
node_id = serialization._canonical_node_id(mapping.get(key))
|
|
if node_id:
|
|
return node_id
|
|
|
|
node_id = serialization._canonical_node_id(getattr(iface, "myNodeNum", None))
|
|
if node_id:
|
|
return node_id
|
|
|
|
return None
|
|
|
|
|
|
def _normalise_nodeinfo_packet(packet) -> dict | None:
|
|
"""Return a dictionary view of ``packet`` with a guaranteed ``id`` when known."""
|
|
|
|
mapping = _ensure_mapping(packet)
|
|
if mapping is None:
|
|
return None
|
|
|
|
try:
|
|
normalised: dict = dict(mapping)
|
|
except Exception:
|
|
try:
|
|
normalised = {key: mapping[key] for key in mapping}
|
|
except Exception:
|
|
return None
|
|
|
|
node_id = _candidate_node_id(normalised)
|
|
if node_id and normalised.get("id") != node_id:
|
|
normalised["id"] = node_id
|
|
|
|
return normalised
|
|
|
|
|
|
if TYPE_CHECKING: # pragma: no cover - import only used for type checking
|
|
from meshtastic.ble_interface import BLEInterface as _BLEInterface
|
|
|
|
BLEInterface = None
|
|
|
|
|
|
def _patch_meshtastic_nodeinfo_handler() -> None:
|
|
"""Ensure Meshtastic nodeinfo packets always include an ``id`` field."""
|
|
|
|
module = sys.modules.get("meshtastic", meshtastic)
|
|
if module is None:
|
|
with contextlib.suppress(Exception):
|
|
module = importlib.import_module("meshtastic")
|
|
if module is None:
|
|
return
|
|
globals()["meshtastic"] = module
|
|
|
|
original = getattr(module, "_onNodeInfoReceive", None)
|
|
if not callable(original):
|
|
return
|
|
|
|
mesh_interface_module = getattr(module, "mesh_interface", None)
|
|
if mesh_interface_module is None:
|
|
with contextlib.suppress(Exception):
|
|
mesh_interface_module = importlib.import_module("meshtastic.mesh_interface")
|
|
|
|
if not getattr(original, "_potato_mesh_safe_wrapper", False):
|
|
module._onNodeInfoReceive = _build_safe_nodeinfo_callback(original)
|
|
|
|
_patch_nodeinfo_handler_class(mesh_interface_module, module)
|
|
|
|
|
|
def _build_safe_nodeinfo_callback(original):
|
|
"""Return a wrapper that injects a missing ``id`` before dispatching."""
|
|
|
|
def _safe_on_node_info_receive(iface, packet): # type: ignore[override]
|
|
normalised = _normalise_nodeinfo_packet(packet)
|
|
if normalised is not None:
|
|
packet = normalised
|
|
|
|
try:
|
|
return original(iface, packet)
|
|
except KeyError as exc: # pragma: no cover - defensive only
|
|
if exc.args and exc.args[0] == "id":
|
|
return None
|
|
raise
|
|
|
|
_safe_on_node_info_receive._potato_mesh_safe_wrapper = True # type: ignore[attr-defined]
|
|
return _safe_on_node_info_receive
|
|
|
|
|
|
def _update_nodeinfo_handler_aliases(original, replacement) -> None:
|
|
"""Ensure Meshtastic modules reference the patched ``NodeInfoHandler``."""
|
|
|
|
for module_name, module in list(sys.modules.items()):
|
|
if not module_name.startswith("meshtastic"):
|
|
continue
|
|
existing = getattr(module, "NodeInfoHandler", None)
|
|
if existing is original:
|
|
setattr(module, "NodeInfoHandler", replacement)
|
|
|
|
|
|
def _patch_nodeinfo_handler_class(
|
|
mesh_interface_module, meshtastic_module=None
|
|
) -> None:
|
|
"""Wrap ``NodeInfoHandler.onReceive`` to normalise packets before callbacks."""
|
|
|
|
if mesh_interface_module is None:
|
|
return
|
|
|
|
handler_class = getattr(mesh_interface_module, "NodeInfoHandler", None)
|
|
if handler_class is None:
|
|
return
|
|
if getattr(handler_class, "_potato_mesh_safe_wrapper", False):
|
|
return
|
|
|
|
original_on_receive = getattr(handler_class, "onReceive", None)
|
|
if not callable(original_on_receive):
|
|
return
|
|
|
|
class _SafeNodeInfoHandler(handler_class): # type: ignore[misc]
|
|
"""Subclass that guards against missing node identifiers."""
|
|
|
|
def onReceive(self, iface, packet): # type: ignore[override]
|
|
normalised = _normalise_nodeinfo_packet(packet)
|
|
if normalised is not None:
|
|
packet = normalised
|
|
|
|
try:
|
|
return super().onReceive(iface, packet)
|
|
except KeyError as exc: # pragma: no cover - defensive only
|
|
if exc.args and exc.args[0] == "id":
|
|
return None
|
|
raise
|
|
|
|
_SafeNodeInfoHandler.__name__ = handler_class.__name__
|
|
_SafeNodeInfoHandler.__qualname__ = getattr(
|
|
handler_class, "__qualname__", handler_class.__name__
|
|
)
|
|
_SafeNodeInfoHandler.__module__ = getattr(
|
|
handler_class, "__module__", mesh_interface_module.__name__
|
|
)
|
|
_SafeNodeInfoHandler.__doc__ = getattr(
|
|
handler_class, "__doc__", _SafeNodeInfoHandler.__doc__
|
|
)
|
|
_SafeNodeInfoHandler._potato_mesh_safe_wrapper = True # type: ignore[attr-defined]
|
|
|
|
setattr(mesh_interface_module, "NodeInfoHandler", _SafeNodeInfoHandler)
|
|
if meshtastic_module is None:
|
|
meshtastic_module = globals().get("meshtastic")
|
|
if meshtastic_module is not None:
|
|
existing_top = getattr(meshtastic_module, "NodeInfoHandler", None)
|
|
if existing_top is handler_class:
|
|
setattr(meshtastic_module, "NodeInfoHandler", _SafeNodeInfoHandler)
|
|
_update_nodeinfo_handler_aliases(handler_class, _SafeNodeInfoHandler)
|
|
|
|
|
|
_patch_meshtastic_nodeinfo_handler()
|
|
|
|
|
|
try: # pragma: no cover - optional dependency may be unavailable
|
|
from meshtastic.serial_interface import SerialInterface # type: ignore
|
|
except Exception: # pragma: no cover - optional dependency may be unavailable
|
|
SerialInterface = None # type: ignore[assignment]
|
|
|
|
try: # pragma: no cover - optional dependency may be unavailable
|
|
from meshtastic.tcp_interface import TCPInterface # type: ignore
|
|
except Exception: # pragma: no cover - optional dependency may be unavailable
|
|
TCPInterface = None # type: ignore[assignment]
|
|
|
|
|
|
def _patch_meshtastic_ble_receive_loop() -> None:
|
|
"""Prevent ``UnboundLocalError`` crashes in Meshtastic's BLE reader."""
|
|
|
|
try:
|
|
from meshtastic import ble_interface as _ble_interface_module # type: ignore
|
|
except Exception: # pragma: no cover - dependency optional in tests
|
|
return
|
|
|
|
ble_class = getattr(_ble_interface_module, "BLEInterface", None)
|
|
if ble_class is None:
|
|
return
|
|
|
|
original = getattr(ble_class, "_receiveFromRadioImpl", None)
|
|
if not callable(original):
|
|
return
|
|
if getattr(original, "_potato_mesh_safe_wrapper", False):
|
|
return
|
|
|
|
FROMRADIO_UUID = getattr(_ble_interface_module, "FROMRADIO_UUID", None)
|
|
BleakDBusError = getattr(_ble_interface_module, "BleakDBusError", ())
|
|
BleakError = getattr(_ble_interface_module, "BleakError", ())
|
|
logger = getattr(_ble_interface_module, "logger", None)
|
|
time = getattr(_ble_interface_module, "time", None)
|
|
|
|
if not FROMRADIO_UUID or logger is None or time is None:
|
|
return
|
|
|
|
def _safe_receive_from_radio(self): # type: ignore[override]
|
|
while self._want_receive:
|
|
if self.should_read:
|
|
self.should_read = False
|
|
retries: int = 0
|
|
while self._want_receive:
|
|
if self.client is None:
|
|
logger.debug("BLE client is None, shutting down")
|
|
self._want_receive = False
|
|
continue
|
|
|
|
payload: bytes = b""
|
|
try:
|
|
payload = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
|
|
except BleakDBusError as exc:
|
|
logger.debug("Device disconnected, shutting down %s", exc)
|
|
self._want_receive = False
|
|
payload = b""
|
|
except BleakError as exc:
|
|
if "Not connected" in str(exc):
|
|
logger.debug("Device disconnected, shutting down %s", exc)
|
|
self._want_receive = False
|
|
payload = b""
|
|
else:
|
|
raise ble_class.BLEError("Error reading BLE") from exc
|
|
|
|
if not payload:
|
|
if not self._want_receive:
|
|
break
|
|
if retries < 5:
|
|
time.sleep(0.1)
|
|
retries += 1
|
|
continue
|
|
break
|
|
|
|
logger.debug("FROMRADIO read: %s", payload.hex())
|
|
self._handleFromRadio(payload)
|
|
else:
|
|
time.sleep(0.01)
|
|
|
|
_safe_receive_from_radio._potato_mesh_safe_wrapper = True # type: ignore[attr-defined]
|
|
ble_class._receiveFromRadioImpl = _safe_receive_from_radio
|
|
|
|
|
|
_patch_meshtastic_ble_receive_loop()
|
|
|
|
|
|
def _has_field(message: Any, field_name: str) -> bool:
|
|
"""Return ``True`` when ``message`` advertises ``field_name`` via ``HasField``."""
|
|
|
|
if message is None:
|
|
return False
|
|
has_field = getattr(message, "HasField", None)
|
|
if callable(has_field):
|
|
try:
|
|
return bool(has_field(field_name))
|
|
except Exception: # pragma: no cover - defensive guard
|
|
return False
|
|
return hasattr(message, field_name)
|
|
|
|
|
|
def _enum_name_from_field(message: Any, field_name: str, value: Any) -> str | None:
|
|
"""Return the enum name for ``value`` using ``message`` descriptors."""
|
|
|
|
descriptor = getattr(message, "DESCRIPTOR", None)
|
|
if descriptor is None:
|
|
return None
|
|
fields_by_name = getattr(descriptor, "fields_by_name", {})
|
|
field_desc = fields_by_name.get(field_name)
|
|
if field_desc is None:
|
|
return None
|
|
enum_type = getattr(field_desc, "enum_type", None)
|
|
if enum_type is None:
|
|
return None
|
|
enum_values = getattr(enum_type, "values_by_number", {})
|
|
enum_value = enum_values.get(value)
|
|
if enum_value is None:
|
|
return None
|
|
return getattr(enum_value, "name", None)
|
|
|
|
|
|
def _resolve_lora_message(local_config: Any) -> Any | None:
|
|
"""Return the LoRa configuration sub-message from ``local_config``."""
|
|
|
|
if local_config is None:
|
|
return None
|
|
if _has_field(local_config, "lora"):
|
|
candidate = getattr(local_config, "lora", None)
|
|
if candidate is not None:
|
|
return candidate
|
|
radio_section = getattr(local_config, "radio", None)
|
|
if radio_section is not None:
|
|
if _has_field(radio_section, "lora"):
|
|
return getattr(radio_section, "lora", None)
|
|
if hasattr(radio_section, "lora"):
|
|
return getattr(radio_section, "lora")
|
|
if hasattr(local_config, "lora"):
|
|
return getattr(local_config, "lora")
|
|
return None
|
|
|
|
|
|
def _region_frequency(lora_message: Any) -> int | float | str | None:
|
|
"""Derive the LoRa region frequency in MHz or the region label from ``lora_message``.
|
|
|
|
Numeric override values are floored to the nearest MHz to align with the
|
|
integer frequencies expected elsewhere in the ingestion pipeline.
|
|
"""
|
|
|
|
if lora_message is None:
|
|
return None
|
|
|
|
override_frequency = getattr(lora_message, "override_frequency", None)
|
|
if override_frequency is not None:
|
|
if isinstance(override_frequency, (int, float)):
|
|
if override_frequency > 0:
|
|
return math.floor(override_frequency)
|
|
elif override_frequency:
|
|
return override_frequency
|
|
|
|
region_value = getattr(lora_message, "region", None)
|
|
if region_value is None:
|
|
return None
|
|
enum_name = _enum_name_from_field(lora_message, "region", region_value)
|
|
if enum_name:
|
|
digits = re.findall(r"\d+", enum_name)
|
|
for token in digits:
|
|
try:
|
|
freq = int(token)
|
|
except ValueError: # pragma: no cover - regex guarantees digits
|
|
continue
|
|
if freq >= 100:
|
|
return freq
|
|
for token in reversed(digits):
|
|
try:
|
|
return int(token)
|
|
except ValueError: # pragma: no cover - defensive only
|
|
continue
|
|
return enum_name
|
|
if isinstance(region_value, int) and region_value >= 100:
|
|
return region_value
|
|
if isinstance(region_value, str) and region_value:
|
|
return region_value
|
|
return None
|
|
|
|
|
|
def _camelcase_enum_name(name: str | None) -> str | None:
|
|
"""Convert ``name`` from ``SCREAMING_SNAKE`` to ``CamelCase``."""
|
|
|
|
if not name:
|
|
return None
|
|
parts = re.split(r"[^0-9A-Za-z]+", name.strip())
|
|
camel_parts = [part.capitalize() for part in parts if part]
|
|
if not camel_parts:
|
|
return None
|
|
return "".join(camel_parts)
|
|
|
|
|
|
def _modem_preset(lora_message: Any) -> str | None:
|
|
"""Return the CamelCase modem preset configured on ``lora_message``."""
|
|
|
|
if lora_message is None:
|
|
return None
|
|
descriptor = getattr(lora_message, "DESCRIPTOR", None)
|
|
fields_by_name = getattr(descriptor, "fields_by_name", {}) if descriptor else {}
|
|
if "modem_preset" in fields_by_name:
|
|
preset_field = "modem_preset"
|
|
elif "preset" in fields_by_name:
|
|
preset_field = "preset"
|
|
elif hasattr(lora_message, "modem_preset"):
|
|
preset_field = "modem_preset"
|
|
elif hasattr(lora_message, "preset"):
|
|
preset_field = "preset"
|
|
else:
|
|
return None
|
|
|
|
preset_value = getattr(lora_message, preset_field, None)
|
|
if preset_value is None:
|
|
return None
|
|
enum_name = _enum_name_from_field(lora_message, preset_field, preset_value)
|
|
if isinstance(enum_name, str) and enum_name:
|
|
return _camelcase_enum_name(enum_name)
|
|
if isinstance(preset_value, str) and preset_value:
|
|
return _camelcase_enum_name(preset_value)
|
|
return None
|
|
|
|
|
|
def _ensure_radio_metadata(iface: Any) -> None:
|
|
"""Populate cached LoRa metadata by inspecting ``iface`` when available."""
|
|
|
|
if iface is None:
|
|
return
|
|
|
|
try:
|
|
wait_for_config = getattr(iface, "waitForConfig", None)
|
|
if callable(wait_for_config):
|
|
wait_for_config()
|
|
except Exception: # pragma: no cover - hardware dependent guard
|
|
pass
|
|
|
|
local_node = getattr(iface, "localNode", None)
|
|
local_config = getattr(local_node, "localConfig", None) if local_node else None
|
|
lora_message = _resolve_lora_message(local_config)
|
|
if lora_message is None:
|
|
return
|
|
|
|
frequency = _region_frequency(lora_message)
|
|
preset = _modem_preset(lora_message)
|
|
|
|
updated = False
|
|
if frequency is not None and getattr(config, "LORA_FREQ", None) is None:
|
|
config.LORA_FREQ = frequency
|
|
updated = True
|
|
if preset is not None and getattr(config, "MODEM_PRESET", None) is None:
|
|
config.MODEM_PRESET = preset
|
|
updated = True
|
|
|
|
if updated:
|
|
config._debug_log(
|
|
"Captured LoRa radio metadata",
|
|
context="interfaces.ensure_radio_metadata",
|
|
severity="info",
|
|
always=True,
|
|
lora_freq=frequency,
|
|
modem_preset=preset,
|
|
)
|
|
|
|
|
|
def _ensure_channel_metadata(iface: Any) -> None:
|
|
"""Capture channel metadata by inspecting ``iface`` once per runtime."""
|
|
|
|
if iface is None:
|
|
return
|
|
|
|
try:
|
|
channels.capture_from_interface(iface)
|
|
except Exception as exc: # pragma: no cover - defensive instrumentation
|
|
config._debug_log(
|
|
"Failed to capture channel metadata",
|
|
context="interfaces.ensure_channel_metadata",
|
|
severity="warn",
|
|
error_class=exc.__class__.__name__,
|
|
error_message=str(exc),
|
|
)
|
|
|
|
|
|
_DEFAULT_TCP_PORT = 4403
|
|
_DEFAULT_TCP_TARGET = "http://127.0.0.1"
|
|
|
|
_DEFAULT_SERIAL_PATTERNS = (
|
|
"/dev/ttyACM*",
|
|
"/dev/ttyUSB*",
|
|
"/dev/tty.usbmodem*",
|
|
"/dev/tty.usbserial*",
|
|
"/dev/cu.usbmodem*",
|
|
"/dev/cu.usbserial*",
|
|
)
|
|
|
|
_BLE_ADDRESS_RE = re.compile(r"^(?:[0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}$")
|
|
|
|
|
|
class _DummySerialInterface:
|
|
"""In-memory replacement for ``meshtastic.serial_interface.SerialInterface``."""
|
|
|
|
def __init__(self) -> None:
|
|
self.nodes: dict = {}
|
|
|
|
def close(self) -> None: # pragma: no cover - nothing to close
|
|
pass
|
|
|
|
|
|
def _parse_ble_target(value: str) -> str | None:
|
|
"""Return an uppercase BLE MAC address when ``value`` matches the format.
|
|
|
|
Parameters:
|
|
value: User-provided target string.
|
|
|
|
Returns:
|
|
The normalised MAC address or ``None`` when validation fails.
|
|
"""
|
|
|
|
if not value:
|
|
return None
|
|
value = value.strip()
|
|
if not value:
|
|
return None
|
|
if _BLE_ADDRESS_RE.fullmatch(value):
|
|
return value.upper()
|
|
return None
|
|
|
|
|
|
def _parse_network_target(value: str) -> tuple[str, int] | None:
|
|
"""Return ``(host, port)`` when ``value`` is a numeric IP address string.
|
|
|
|
Only literal IPv4 or IPv6 addresses are accepted, optionally paired with a
|
|
port or scheme. Callers that start from hostnames should resolve them to an
|
|
address before invoking this helper.
|
|
|
|
Parameters:
|
|
value: Numeric IP literal or URL describing the TCP interface.
|
|
|
|
Returns:
|
|
A ``(host, port)`` tuple or ``None`` when parsing fails.
|
|
"""
|
|
|
|
if not value:
|
|
return None
|
|
|
|
value = value.strip()
|
|
if not value:
|
|
return None
|
|
|
|
def _validated_result(host: str | None, port: int | None) -> tuple[str, int] | None:
|
|
if not host:
|
|
return None
|
|
try:
|
|
ipaddress.ip_address(host)
|
|
except ValueError:
|
|
return None
|
|
return host, port or _DEFAULT_TCP_PORT
|
|
|
|
parsed_values = []
|
|
if "://" in value:
|
|
parsed_values.append(urllib.parse.urlparse(value, scheme="tcp"))
|
|
parsed_values.append(urllib.parse.urlparse(f"//{value}", scheme="tcp"))
|
|
|
|
for parsed in parsed_values:
|
|
try:
|
|
port = parsed.port
|
|
except ValueError:
|
|
port = None
|
|
result = _validated_result(parsed.hostname, port)
|
|
if result:
|
|
return result
|
|
|
|
if value.count(":") == 1 and not value.startswith("["):
|
|
host, _, port_text = value.partition(":")
|
|
try:
|
|
port = int(port_text) if port_text else None
|
|
except ValueError:
|
|
port = None
|
|
result = _validated_result(host, port)
|
|
if result:
|
|
return result
|
|
|
|
return _validated_result(value, None)
|
|
|
|
|
|
def _load_ble_interface():
|
|
"""Return :class:`meshtastic.ble_interface.BLEInterface` when available.
|
|
|
|
Returns:
|
|
The resolved BLE interface class.
|
|
|
|
Raises:
|
|
RuntimeError: If the BLE dependencies are not installed.
|
|
"""
|
|
|
|
global BLEInterface
|
|
if BLEInterface is not None:
|
|
return BLEInterface
|
|
|
|
try:
|
|
from meshtastic.ble_interface import BLEInterface as _resolved_interface
|
|
except ImportError as exc: # pragma: no cover - exercised in non-BLE envs
|
|
raise RuntimeError(
|
|
"BLE interface requested but the Meshtastic BLE dependencies are not installed. "
|
|
"Install the 'meshtastic[ble]' extra to enable BLE support."
|
|
) from exc
|
|
BLEInterface = _resolved_interface
|
|
try:
|
|
import sys
|
|
|
|
for module_name in ("data.mesh_ingestor", "data.mesh"):
|
|
mesh_module = sys.modules.get(module_name)
|
|
if mesh_module is not None:
|
|
setattr(mesh_module, "BLEInterface", BLEInterface)
|
|
except Exception: # pragma: no cover - defensive only
|
|
pass
|
|
return _resolved_interface
|
|
|
|
|
|
def _create_serial_interface(port: str) -> tuple[object, str]:
|
|
"""Return an appropriate mesh interface for ``port``.
|
|
|
|
Parameters:
|
|
port: User-supplied port string which may represent serial, BLE or TCP.
|
|
|
|
Returns:
|
|
``(interface, resolved_target)`` describing the created interface.
|
|
"""
|
|
|
|
port_value = (port or "").strip()
|
|
if port_value.lower() in {"", "mock", "none", "null", "disabled"}:
|
|
config._debug_log(
|
|
"Using dummy serial interface",
|
|
context="interfaces.serial",
|
|
port=port_value,
|
|
)
|
|
return _DummySerialInterface(), "mock"
|
|
ble_target = _parse_ble_target(port_value)
|
|
if ble_target:
|
|
config._debug_log(
|
|
"Using BLE interface",
|
|
context="interfaces.ble",
|
|
address=ble_target,
|
|
)
|
|
return _load_ble_interface()(address=ble_target), ble_target
|
|
network_target = _parse_network_target(port_value)
|
|
if network_target:
|
|
host, tcp_port = network_target
|
|
config._debug_log(
|
|
"Using TCP interface",
|
|
context="interfaces.tcp",
|
|
host=host,
|
|
port=tcp_port,
|
|
)
|
|
return (
|
|
TCPInterface(hostname=host, portNumber=tcp_port),
|
|
f"tcp://{host}:{tcp_port}",
|
|
)
|
|
config._debug_log(
|
|
"Using serial interface",
|
|
context="interfaces.serial",
|
|
port=port_value,
|
|
)
|
|
return SerialInterface(devPath=port_value), port_value
|
|
|
|
|
|
class NoAvailableMeshInterface(RuntimeError):
|
|
"""Raised when no default mesh interface can be created."""
|
|
|
|
|
|
def _default_serial_targets() -> list[str]:
|
|
"""Return candidate serial device paths for auto-discovery."""
|
|
|
|
candidates: list[str] = []
|
|
seen: set[str] = set()
|
|
for pattern in _DEFAULT_SERIAL_PATTERNS:
|
|
for path in sorted(glob.glob(pattern)):
|
|
if path not in seen:
|
|
candidates.append(path)
|
|
seen.add(path)
|
|
if "/dev/ttyACM0" not in seen:
|
|
candidates.append("/dev/ttyACM0")
|
|
return candidates
|
|
|
|
|
|
def _create_default_interface() -> tuple[object, str]:
|
|
"""Attempt to create the default mesh interface, raising on failure.
|
|
|
|
Returns:
|
|
``(interface, resolved_target)`` for the discovered connection.
|
|
|
|
Raises:
|
|
NoAvailableMeshInterface: When no usable connection can be created.
|
|
"""
|
|
|
|
errors: list[tuple[str, Exception]] = []
|
|
for candidate in _default_serial_targets():
|
|
try:
|
|
return _create_serial_interface(candidate)
|
|
except Exception as exc: # pragma: no cover - hardware dependent
|
|
errors.append((candidate, exc))
|
|
config._debug_log(
|
|
"Failed to open serial candidate",
|
|
context="interfaces.auto_discovery",
|
|
target=candidate,
|
|
error_class=exc.__class__.__name__,
|
|
error_message=str(exc),
|
|
)
|
|
try:
|
|
return _create_serial_interface(_DEFAULT_TCP_TARGET)
|
|
except Exception as exc: # pragma: no cover - network dependent
|
|
errors.append((_DEFAULT_TCP_TARGET, exc))
|
|
config._debug_log(
|
|
"Failed to open TCP fallback",
|
|
context="interfaces.auto_discovery",
|
|
target=_DEFAULT_TCP_TARGET,
|
|
error_class=exc.__class__.__name__,
|
|
error_message=str(exc),
|
|
)
|
|
if errors:
|
|
summary = "; ".join(f"{target}: {error}" for target, error in errors)
|
|
raise NoAvailableMeshInterface(
|
|
f"no mesh interface available ({summary})"
|
|
) from errors[-1][1]
|
|
raise NoAvailableMeshInterface("no mesh interface available")
|
|
|
|
|
|
__all__ = [
|
|
"BLEInterface",
|
|
"NoAvailableMeshInterface",
|
|
"_ensure_channel_metadata",
|
|
"_ensure_radio_metadata",
|
|
"_extract_host_node_id",
|
|
"_DummySerialInterface",
|
|
"_DEFAULT_TCP_PORT",
|
|
"_DEFAULT_TCP_TARGET",
|
|
"_create_default_interface",
|
|
"_create_serial_interface",
|
|
"_default_serial_targets",
|
|
"_load_ble_interface",
|
|
"_parse_ble_target",
|
|
"_parse_network_target",
|
|
"SerialInterface",
|
|
"TCPInterface",
|
|
]
|