Merge pull request #231 from pyMC-dev/pr-227

Pr 227
This commit is contained in:
Lloyd
2026-05-06 16:13:30 +01:00
committed by GitHub
8 changed files with 674 additions and 111 deletions
+30 -14
View File
@@ -396,7 +396,10 @@ mqtt_brokers:
# use_jwt_auth: true|false # Does this endpoint require JWT auth. Mutually Exclusive with Username & Password fields
# username: "" # Username for basic auth. If empty or missing, uses anonymous access
# password: "" # Password for basic auth. Required if username is set
# format: letsmesh|mqtt
# format: meshcoretomqtt|letsmesh|waev|mqtt
# meshcoretomqtt - canonical open-source MC2MQTT topic structure
# letsmesh, waev - MC2MQTT family flavors (same topic structure, network-specific identity)
# mqtt - legacy pyMC_Repeater local-broker convention (custom topic, singular 'packet')
# retain_status: true|false # Sets MQTT "retain" on status messages so they remain on the broker when disconnected. Also enforces a QOS of 1 (guaranteed delivery)
# tls:
# enabled: true|false # Enable TLS. If the endpoint's certificate is self-signed, the Root CA should be added to the OS's certificate store.
@@ -419,21 +422,34 @@ mqtt_brokers:
# - TRACE # Don't publish trace packets
# - RAW_CUSTOM # Don't publish custom raw packets
# Example of using the US and EU LetsMesh endpoints
# Bundled network presets (recommended). Endpoints ship with the package,
# so URL/audience updates arrive via `pip install -U`. Available presets:
# waev, letsmesh.
#
# brokers:
# - name: US West (LetsMesh v1)
# host: mqtt-us-v1.letsmesh.net
# port: 443
# audience: mqtt-us-v1.letsmesh.net
# use_jwt_auth: true
# - preset: waev
# - preset: letsmesh
#
# Override a preset broker by listing it again with the same name AFTER
# the preset entry. Later entries win on name collision.
#
# brokers:
# - preset: waev
# - name: waev-b
# enabled: false
#
# Mix presets with fully custom brokers in the same list:
#
# brokers:
# - preset: waev
# - name: my-lan-mqtt
# enabled: true
# - name: Europe (LetsMesh v1)
# host: mqtt-eu-v1.letsmesh.net
# port: 443
# audience: mqtt-eu-v1.letsmesh.net
# use_jwt_auth: true
# enabled: true
# host: mqtt.lan
# port: 1883
# transport: tcp
# format: mqtt
# username: repeater
# password: secret
# pyMC_Glass control-plane integration (optional)
glass:
+1
View File
@@ -75,6 +75,7 @@ repeater = [
"web/html/assets/**/*",
"web/*.yaml",
"web/*.html",
"presets/*.yaml",
]
[tool.black]
+258 -97
View File
@@ -5,7 +5,7 @@ import logging
import string
import threading
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
import paho.mqtt.client as mqtt
from nacl.signing import SigningKey
@@ -18,6 +18,7 @@ except Exception:
UTC = timezone.utc
from repeater import __version__, config
from repeater.presets import get_preset
# Try to import paho-mqtt error code mappings
try:
@@ -25,6 +26,7 @@ try:
HAS_REASON_CODES = True
except ImportError:
ReasonCode = None
HAS_REASON_CODES = False
logger = logging.getLogger("MQTTHandler")
@@ -36,30 +38,118 @@ logger = logging.getLogger("MQTTHandler")
def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
LETSMESH_BROKERS = [
{
"name": "Europe (LetsMesh v1)",
"host": "mqtt-eu-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-eu-v1.letsmesh.net",
"use_jwt_auth": True,
"tls": {
"enabled": True,
"insecure": False,
},
},
{
"name": "US West (LetsMesh v1)",
"host": "mqtt-us-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-us-v1.letsmesh.net",
"use_jwt_auth": True,
"tls": {
"enabled": True,
"insecure": False,
},
},
]
def _summarize_payload_for_log(payload: Any, message: Optional[str] = None) -> str:
"""Return a compact single-line payload summary for debug logging."""
if message is None:
try:
message = json.dumps(payload)
except Exception:
message = str(payload)
if isinstance(payload, list):
item_bits = []
if payload and isinstance(payload[0], dict):
item_bits.append(_summarize_payload_for_log(payload[0]))
return f"items={len(payload)}, bytes={len(message.encode('utf-8'))}" + (
f", first={{ {item_bits[0]} }}" if item_bits else ""
)
if not isinstance(payload, dict):
return f"type={type(payload).__name__}, bytes={len(message.encode('utf-8'))}"
summary_fields = []
for key in ("type", "packet_type", "route", "origin", "status", "hash", "RSSI", "SNR"):
value = payload.get(key)
if value not in (None, ""):
summary_fields.append(f"{key}={value}")
if "len" in payload:
summary_fields.append(f"len={payload['len']}")
if "payload_len" in payload:
summary_fields.append(f"payload_len={payload['payload_len']}")
if "raw" in payload:
summary_fields.append(f"raw_bytes={len(str(payload['raw'])) // 2}")
if not summary_fields:
keys = ",".join(sorted(payload.keys())[:6])
if len(payload) > 6:
keys += ",..."
summary_fields.append(f"keys={keys}")
summary_fields.append(f"bytes={len(message.encode('utf-8'))}")
return ", ".join(summary_fields)
def _truncate_middle(text: str, prefix: int = 24, suffix: int = 16) -> str:
"""Truncate long strings by keeping the beginning and end visible."""
if len(text) <= prefix + suffix + 5:
return text
return f"{text[:prefix]} ... {text[-suffix:]}"
# --------------------------------------------------------------------
# Format family
# --------------------------------------------------------------------
# Format values that belong to the MeshCoreToMQTT (MC2MQTT) protocol family.
# All MC2MQTT brokers share the topic structure: meshcore/{IATA}/{PUBLIC_KEY}/...
# Adding a new MC2MQTT-compatible platform = add its format value here AND
# drop a matching presets/<name>.yaml. The legacy "mqtt" format is NOT in this
# tuple - it speaks the operator-defined custom topic structure instead.
MC2MQTT_FORMATS = ("meshcoretomqtt", "letsmesh", "waev")
# --------------------------------------------------------------------
# Preset expansion helpers (used during broker-list assembly below).
#
# A user's brokers: list may contain a mix of:
# * preset references: {preset: <name>} -> expands to N broker dicts
# * full broker dicts: {name, host, port, ...} -> used as-is
#
# Expansion is two passes so the rules are obvious and order-independent
# in spirit, but order-dependent in practice (later wins on name collision):
# Pass 1 - replace every {preset: ...} entry in place with the bundled
# broker list. Unknown preset is dropped with a warning.
# Pass 2 - walk left-to-right; if an entry's name already appeared
# earlier, shallow-merge the LATER entry onto the EARLIER one
# and drop the duplicate. Place override entries AFTER preset
# entries to make them win.
# --------------------------------------------------------------------
def _expand_preset_entries(brokers: List[dict]) -> List[dict]:
"""Pass 1: replace every {preset: <name>} entry with the preset's brokers."""
expanded: List[dict] = []
for entry in brokers:
if isinstance(entry, dict) and "preset" in entry and "name" not in entry:
preset_name = entry["preset"]
preset = get_preset(preset_name)
preset_brokers = preset.get("brokers", []) if preset else []
if not preset_brokers:
logger.warning(f"Unknown or empty broker preset '{preset_name}' - skipping")
continue
logger.info(f"Expanded preset '{preset_name}' into {len(preset_brokers)} broker(s)")
expanded.extend(preset_brokers)
else:
expanded.append(entry)
return expanded
def _merge_overrides_by_name(brokers: List[dict]) -> List[dict]:
"""Pass 2: collapse duplicates by name; the LATER entry's fields win."""
by_index: Dict[str, int] = {}
result: List[dict] = []
for entry in brokers:
if not isinstance(entry, dict):
result.append(entry)
continue
name = entry.get("name")
if name and name in by_index:
# Shallow-merge: later entry's keys overwrite earlier entry's keys.
result[by_index[name]] = {**result[by_index[name]], **entry}
else:
if name:
by_index[name] = len(result)
result.append(entry)
return result
# ====================================================================
@@ -101,8 +191,10 @@ class _BrokerConnection:
self._reconnect_attempts = 0
self._reconnect_timer = None
self._max_reconnect_delay = 300 # 5 minutes max
self._keepalive = broker.get("keepalive", 30) # default tighter than paho's 60s to beat NAT/proxy timeouts
self._jwt_refresh_timer = None
self._shutdown_requested = False
self._last_jwt_claims = None
self.transport = broker.get('transport', 'websockets')
self.use_jwt_auth = broker.get('use_jwt_auth', False)
@@ -117,6 +209,8 @@ class _BrokerConnection:
client_id = f"meshcore_{self.public_key}_{broker['host']}_{self.format}"
self.client = mqtt.Client(client_id=client_id, transport=self.transport)
if hasattr(self.client, "on_pre_connect"):
self.client.on_pre_connect = self._on_pre_connect
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
@@ -130,11 +224,13 @@ class _BrokerConnection:
if self.base_topic is None:
if self.format == "mqtt":
# Custom MQTT family: operator-defined topic, not meshcore convention.
self.base_topic = f"meshcore/repeater/{self.node_name}"
elif self.format == "letsmesh":
elif self.format in MC2MQTT_FORMATS:
# MC2MQTT family: canonical meshcoretomqtt topic structure.
self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}"
else:
logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, using default base topic")
logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, defaulting to MC2MQTT topic structure")
self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}"
from pymc_core.protocol.utils import PAYLOAD_TYPES
@@ -170,6 +266,13 @@ class _BrokerConnection:
payload["email"] = ""
payload["owner"] = ""
self._last_jwt_claims = {
"aud": payload.get("aud"),
"iat": payload.get("iat"),
"exp": payload.get("exp"),
"public_key_suffix": self.public_key[-12:],
}
# Encode header and payload (compact JSON - no spaces)
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
@@ -198,6 +301,10 @@ class _BrokerConnection:
logger.info(f"Connected to {self.broker['name']}")
self._running = True
self._reconnect_attempts = 0 # Reset counter on success
# Successful connect can race with a previously scheduled timer; cancel it.
if self._reconnect_timer:
self._reconnect_timer.cancel()
self._reconnect_timer = None
if self.use_jwt_auth:
self._schedule_jwt_refresh() # Schedule proactive JWT refresh
if self._on_connect_callback:
@@ -205,7 +312,14 @@ class _BrokerConnection:
else:
error_msg = get_mqtt_error_message(rc, is_disconnect=False)
logger.error(f"Failed to connect to {self.broker['name']}: {error_msg}")
self._schedule_reconnect()
self._schedule_reconnect(reason=error_msg)
def _on_pre_connect(self, client, userdata):
"""Refresh credentials before each connect/reconnect attempt."""
if self._shutdown_requested:
return
if self.use_jwt_auth:
self._set_credentials()
def _on_disconnect(self, client, userdata, rc):
"""MQTT disconnection callback"""
@@ -220,7 +334,13 @@ class _BrokerConnection:
if rc != 0: # Unexpected disconnect
error_msg = get_mqtt_error_message(rc, is_disconnect=True)
logger.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}")
if was_running:
logger.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}")
else:
logger.debug(
f"Duplicate disconnect callback from {self.broker['name']} while already disconnected "
f"(rc={rc}): {error_msg}"
)
if was_running: # Only reconnect if we were intentionally connected
self._schedule_reconnect(reason=error_msg)
else:
@@ -253,6 +373,13 @@ class _BrokerConnection:
if self._shutdown_requested:
return
# Timer has fired; clear handle so state reflects reality.
self._reconnect_timer = None
if self._running:
logger.debug(f"Skipping reconnect to {self.broker['name']} - already connected")
return
try:
logger.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...")
@@ -265,7 +392,7 @@ class _BrokerConnection:
self._set_credentials()
# Reconnect and restart loop
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.connect(self.broker["host"], self.broker["port"], keepalive=self._keepalive)
self.client.loop_start()
self._loop_running = True
except Exception as e:
@@ -280,9 +407,10 @@ class _BrokerConnection:
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
logger.debug(f"Credentials set for {self.broker['name']}")
logger.debug(f"Using username: {username}")
logger.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}")
logger.debug(
f"Credentials set for {self.broker['name']}: "
f"user=v1_{self.public_key[:8]}...{self.public_key[-8:]}"
)
elif self.username and self.password:
logger.info(f"Using provided credentials for {self.broker['name']} (username: {self.username})")
self.client.username_pw_set(username=self.username, password=self.password)
@@ -330,7 +458,7 @@ class _BrokerConnection:
f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..."
)
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.connect(self.broker["host"], self.broker["port"], keepalive=self._keepalive)
self.client.loop_start()
self._loop_running = True
@@ -363,9 +491,13 @@ class _BrokerConnection:
retain = self.retain_status
qos = 1 if self.retain_status else 0
logger.debug(f"Publishing to topic '{self.base_topic}/{subtopic}' with payload: [{payload}]. Running={self._running}. Retain={retain}, QoS={qos}")
full_topic = f"{self.base_topic}/{subtopic}"
logger.debug(
f"Publishing topic='{_truncate_middle(full_topic)}', bytes={len(payload.encode('utf-8'))}, "
f"running={self._running}, retain={retain}, qos={qos}"
)
if self._running:
result = self.client.publish(f"{self.base_topic}/{subtopic}", payload, retain=retain, qos=qos)
result = self.client.publish(full_topic, payload, retain=retain, qos=qos)
return result
else:
logger.warning(f"Cannot publish to {self.broker['name']} - not connected")
@@ -488,12 +620,38 @@ class MeshCoreToMqttPusher:
imported_letsmesh_configs = self.convert_letsmesh_to_broker_config(letsmesh_config)
brokers.extend(imported_letsmesh_configs)
# Expand any {preset: <name>} entries (from user config OR legacy
# migrators) into concrete broker dicts, then collapse duplicates so
# later overrides win. Result feeds the validation loop unchanged.
brokers = _expand_preset_entries(brokers)
brokers = _merge_overrides_by_name(brokers)
# Known MC2MQTT hostnames that must never use format: mqtt.
# If a user's saved config has the wrong format (common after manual editing
# or an old UI version), auto-correct it so diagnostic topics aren't sent
# to brokers that will reject them and close the connection (rc=16).
_MC2MQTT_HOSTS = {
"letsmesh.net",
"waev.app",
"meshcoretomqtt",
}
self.brokers = []
if brokers:
for broker_config in brokers:
if all(k in broker_config for k in ["name", "host", "port", "enabled"]):
host = broker_config.get("host", "")
fmt = broker_config.get("format", "")
if fmt == "mqtt" and any(mc2 in host for mc2 in _MC2MQTT_HOSTS):
corrected = broker_config.get("name", host)
logger.warning(
f"Broker '{corrected}' has format=mqtt but host '{host}' is a MC2MQTT "
f"endpoint — auto-correcting to format=letsmesh. "
f"Update your config.yaml to silence this warning."
)
broker_config = {**broker_config, "format": "letsmesh"}
self.brokers.append(broker_config)
logger.info(f"Added broker: {broker_config['name']}")
logger.info(f"Added broker: {broker_config['name']} (format={broker_config.get('format', 'unknown')})")
else:
logger.warning(f"Skipping invalid broker config: {broker_config}")
@@ -552,62 +710,47 @@ class MeshCoreToMqttPusher:
}
def convert_letsmesh_to_broker_config(self, letsmesh_cfg: dict) -> List[dict]:
"""Convert LetsMesh config format to internal broker config format"""
brokers = []
"""Migrate the legacy ``letsmesh:`` config block into preset-style entries.
Emits a list that the preset-expansion pass will turn into the same
broker dicts the old hard-coded migrator used to produce. Specifically:
* ``broker_index == -1`` -> both LetsMesh brokers (the preset default).
* ``broker_index == 0`` -> Europe only (US disabled via override).
* ``broker_index == 1`` -> US only (Europe disabled via override).
* ``additional_brokers`` -> appended verbatim after the preset reference.
* ``enabled: false`` -> overrides every preset broker as disabled.
"""
enabled = letsmesh_cfg.get("enabled", False)
idx = letsmesh_cfg.get("broker_index", -1)
idx = letsmesh_cfg.get("broker_index", None)
if idx == 0 or idx == 1:
broker_info = LETSMESH_BROKERS[idx]
logger.info(f"Imported LetsMesh broker from 'letsmesh' config: {broker_info['name']}")
brokers.append({
"enabled": enabled,
"name": broker_info["name"],
"host": broker_info["host"],
"port": broker_info["port"],
"audience": broker_info["audience"],
"use_jwt_auth": True,
"transport": "websockets",
"format": "letsmesh",
"base_topic": None,
"retain_status": False,
"tls": {
"enabled": True,
"insecure": False,
},
})
elif idx < 0:
if idx == -1:
brokers.extend({
"enabled": enabled,
"name": broker_info["name"],
"host": broker_info["host"],
"port": broker_info["port"],
"audience": broker_info["audience"],
"use_jwt_auth": True,
"transport": "websockets",
"format": "letsmesh",
"base_topic": None,
"retain_status": False,
"tls": {
"enabled": True,
"insecure": False,
},
} for broker_info in LETSMESH_BROKERS)
# Resolve preset broker names so we can build accurate "disable" overrides.
preset_brokers = get_preset("letsmesh").get("brokers", [])
eu_name = preset_brokers[0]["name"] if len(preset_brokers) > 0 else "Europe (LetsMesh v1)"
us_name = preset_brokers[1]["name"] if len(preset_brokers) > 1 else "US West (LetsMesh v1)"
additional = letsmesh_cfg.get("additional_brokers", [])
for add_broker in additional:
logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker['name']}")
brokers.append({
entries: List[dict] = [{"preset": "letsmesh"}]
# Honor broker_index by disabling the broker the legacy user did not pick.
if idx == 0:
entries.append({"name": us_name, "enabled": False})
elif idx == 1:
entries.append({"name": eu_name, "enabled": False})
# Honor the legacy enabled flag by overriding every preset broker.
if not enabled:
for b in preset_brokers:
entries.append({"name": b["name"], "enabled": False})
# Append any user-defined additional brokers as full entries.
for add_broker in letsmesh_cfg.get("additional_brokers", []):
logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker.get('name')}")
entries.append({
"enabled": enabled,
"name": add_broker["name"],
"host": add_broker["host"],
"port": add_broker["port"],
"audience": add_broker["audience"],
"use_jwt_auth": True,
"transport": "websockets",
"use_jwt_auth": add_broker.get("use_jwt_auth", True),
"transport": add_broker.get("transport", "websockets"),
"format": "letsmesh",
@@ -616,10 +759,10 @@ class MeshCoreToMqttPusher:
"tls": {
"enabled": add_broker.get("tls", {}).get("enabled", True),
"insecure": add_broker.get("tls", {}).get("insecure", False),
}
},
})
return brokers
return entries
def _on_broker_connected(self, broker_name: str):
"""Callback when a broker connects"""
@@ -787,7 +930,7 @@ class MeshCoreToMqttPusher:
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
logger.debug(f"Publishing to topic '{subtopic}' with payload: {message}")
logger.debug(f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}")
packet_type = payload.get("type")
@@ -802,7 +945,7 @@ class MeshCoreToMqttPusher:
results.append((conn.broker["name"], result))
logger.debug(f"Published to {conn.broker['name']} -- {subtopic}")
elif conn.enabled == False:
results.append((conn.broker["name"], "Skipped due to being disabled")) # Indicate skipped due to format mismatch
results.append((conn.broker["name"], "Skipped due to being disabled"))
if not results:
logger.warning(f"No active broker connections for publishing to {subtopic}")
@@ -811,25 +954,35 @@ class MeshCoreToMqttPusher:
def publish_mqtt(self, payload: dict, subtopic: str, retain: bool = False, qos: int = 0):
"""Publish message to all connected brokers"""
"""Publish message to brokers using the legacy custom-MQTT format only.
This path exists for diagnostic streams (advert, noise_floor, crc_errors)
that were originally only published to ``format: mqtt`` brokers. MC2MQTT
family brokers (letsmesh, waev, meshcoretomqtt) intentionally skip these
topics today.
"""
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
logger.debug(f"Publishing to topic '{subtopic}' with payload: {message}")
logger.debug(f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}")
results = []
with self._lock:
for conn in self.connections:
if conn.enabled and conn.is_connected():
if conn.format != "mqtt":
logger.debug(f"Skipped publishing to {conn.broker['name']} (wrong format)")
results.append((conn.broker["name"], None)) # Indicate skipped due to format mismatch
# Custom-MQTT-only path; MC2MQTT brokers are intentionally skipped here.
logger.debug(
f"Skipped publishing to {conn.broker['name']} "
f"(intentional: publish_mqtt only targets legacy mqtt format; broker format={conn.format})"
)
results.append((conn.broker["name"], None))
continue
result = conn.publish(subtopic, message, retain=retain, qos=qos)
results.append((conn.broker["name"], result))
logger.debug(f"Published to {conn.broker['name']} -- {subtopic}")
logger.debug(f"Published to {conn.broker['name']} (format={conn.format}) -- {subtopic}")
elif conn.enabled == False:
results.append((conn.broker["name"], "Skipped due to being disabled")) # Indicate skipped due to format mismatch
results.append((conn.broker["name"], "Skipped due to being disabled"))
if not results:
logger.warning(f"No active broker connections for publishing to {subtopic}")
@@ -893,7 +1046,7 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
4: "Bad username or password",
5: "Not authorized",
7: "Connection lost / network error",
16: "Connection lost / protocol error",
16: "The connection was lost.",
17: "Client timeout",
# MQTT v5 codes
4: "Disconnect with Will message",
@@ -926,7 +1079,7 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
162: "Wildcard subscriptions not supported",
}
if HAS_REASON_CODES:
if HAS_REASON_CODES and ReasonCode is not None:
try:
reason = ReasonCode(mqtt.CONNACK if not is_disconnect else mqtt.DISCONNECT, identifier=rc)
@@ -938,5 +1091,13 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
if _fallback is None:
logger.debug(f"Could not decode reason code {rc}: {e}")
if is_disconnect:
try:
paho_error = mqtt.error_string(rc)
if paho_error and paho_error != "Unknown error.":
return paho_error
except Exception:
pass
error_dict = disconnect_errors if is_disconnect else connect_errors
return error_dict.get(rc, f"Unknown error code {rc}")
+60
View File
@@ -0,0 +1,60 @@
"""Bundled MQTT broker presets.
Each sibling ``*.yaml`` file in this package defines a named preset - a
ready-to-use list of broker dicts for a known MeshCoreToMQTT (MC2MQTT)
network. Presets ship with the package, so a ``pip install -U`` is enough
to pick up new endpoints without editing user config.
Public API:
get_preset(name) -> dict (the parsed YAML, or {} if unknown)
list_presets() -> sorted list of preset names
The loader is lazy: nothing is read or parsed at import time. The first
call discovers sibling YAML files via ``importlib.resources`` and caches
the parsed dicts for the lifetime of the process.
"""
from __future__ import annotations
import logging
from importlib.resources import files
from pathlib import Path
from typing import Dict, List
import yaml
logger = logging.getLogger("Presets")
# Cache of parsed presets, keyed by name (e.g. "waev"). Populated on first
# call to _load_all(); never cleared.
_CACHE: Dict[str, dict] = {}
_LOADED: bool = False
def _load_all() -> Dict[str, dict]:
"""Discover and parse every bundled ``*.yaml`` file once."""
global _LOADED
if _LOADED:
return _CACHE
for resource in files(__package__).iterdir():
# importlib.resources.Traversable: only consider real files ending in .yaml
name = getattr(resource, "name", "")
if not name.endswith(".yaml"):
continue
try:
with resource.open("r", encoding="utf-8") as f:
_CACHE[Path(name).stem] = yaml.safe_load(f) or {}
except Exception as e: # pragma: no cover - defensive
logger.warning(f"Failed to load preset '{name}': {e}")
_LOADED = True
return _CACHE
def get_preset(name: str) -> dict:
"""Return the parsed preset dict, or ``{}`` if no such preset exists."""
return _load_all().get(name, {})
def list_presets() -> List[str]:
"""Return the sorted list of bundled preset names."""
return sorted(_load_all().keys())
+37
View File
@@ -0,0 +1,37 @@
# LetsMesh MC2MQTT broker preset.
#
# LetsMesh is the operator of the public MeshCore Packet Analyzer
# infrastructure. These brokers speak the MeshCoreToMQTT (MC2MQTT)
# protocol with the "letsmesh" format flavor, which today shares the
# canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...).
#
# Reference all LetsMesh endpoints with: brokers: [{preset: letsmesh}]
#
# Note: order matters for backward compatibility with the legacy
# letsmesh.broker_index field. Index 0 is Europe, index 1 is US West.
brokers:
- name: "Europe (LetsMesh v1)"
enabled: true
host: mqtt-eu-v1.letsmesh.net
port: 443
transport: "websockets"
audience: "mqtt-eu-v1.letsmesh.net"
use_jwt_auth: true
format: letsmesh
retain_status: false
tls:
enabled: true
insecure: false
- name: "US West (LetsMesh v1)"
enabled: true
host: mqtt-us-v1.letsmesh.net
port: 443
transport: "websockets"
audience: "mqtt-us-v1.letsmesh.net"
use_jwt_auth: true
format: letsmesh
retain_status: false
tls:
enabled: true
insecure: false
+35
View File
@@ -0,0 +1,35 @@
# Waev MC2MQTT broker preset.
#
# Waev is a real-time telemetry, monitoring, and analytics platform for
# MeshCore radio networks. These brokers speak the MeshCoreToMQTT
# (MC2MQTT) protocol with the "waev" format flavor, which today shares
# the canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...)
# and is reserved for future Waev-specific deviations.
#
# Reference all Waev endpoints with: brokers: [{preset: waev}]
brokers:
- name: "waev-a"
enabled: true
host: mqtt-a.waev.app
port: 443
transport: "websockets"
audience: "mqtt-a.waev.app"
use_jwt_auth: true
format: waev
retain_status: true
tls:
enabled: true
insecure: false
- name: "waev-b"
enabled: true
host: mqtt-b.waev.app
port: 443
transport: "websockets"
audience: "mqtt-b.waev.app"
use_jwt_auth: true
format: waev
retain_status: true
tls:
enabled: true
insecure: false
+7
View File
@@ -1209,6 +1209,13 @@ class APIEndpoints:
for i, b in enumerate(brokers):
if not isinstance(b, dict):
return self._error(f"Broker at index {i} must be an object")
# Bundled preset reference: {preset: <name>}. Pass through
# unchanged - the MQTT handler expands it on next start.
if "preset" in b and "name" not in b:
validated.append({"preset": str(b["preset"]).strip()})
continue
for field in ("name", "host", "port", "format"):
if not b.get(field, ""):
return self._error(f"Broker at index {i} missing required field: {field}")
+246
View File
@@ -0,0 +1,246 @@
"""Tests for the bundled MQTT broker preset system.
Locks the public contract documented in `config.yaml.example` and the
behavior contract in the feat/generalized-mqtt PR.
"""
import logging
import pytest
from repeater.data_acquisition.mqtt_handler import (
MC2MQTT_FORMATS,
MeshCoreToMqttPusher,
_BrokerConnection,
_expand_preset_entries,
_merge_overrides_by_name,
_summarize_payload_for_log,
_truncate_middle,
get_mqtt_error_message,
)
from repeater.presets import get_preset, list_presets
# --------------------------------------------------------------------
# Preset loader contract
# --------------------------------------------------------------------
def test_list_presets_returns_bundled_names():
"""The shipped wheel must contain at least 'waev' and 'letsmesh'."""
names = list_presets()
assert "waev" in names
assert "letsmesh" in names
def test_get_preset_waev_has_two_brokers():
"""Waev preset shape: top-level 'brokers' list with two MC2MQTT entries."""
preset = get_preset("waev")
brokers = preset.get("brokers", [])
assert len(brokers) == 2
for b in brokers:
assert "name" in b
assert "host" in b
assert b.get("format") == "waev"
def test_get_preset_unknown_returns_empty_dict():
"""Unknown preset names resolve to {} - no exception."""
assert get_preset("definitely-not-a-real-preset") == {}
# --------------------------------------------------------------------
# Pass 1: preset expansion
# --------------------------------------------------------------------
def test_expand_preset_entries_inlines_bundled_brokers():
"""A {preset: waev} entry expands to the two Waev broker dicts."""
expanded = _expand_preset_entries([{"preset": "waev"}])
assert len(expanded) == 2
names = [b["name"] for b in expanded]
assert "waev-a" in names
assert "waev-b" in names
def test_expand_preset_entries_drops_unknown_preset_with_warning(caplog):
"""An unknown preset is dropped; the daemon does not crash."""
with caplog.at_level(logging.WARNING, logger="MQTTHandler"):
expanded = _expand_preset_entries([{"preset": "bogus"}])
assert expanded == []
assert any("bogus" in record.message for record in caplog.records)
# --------------------------------------------------------------------
# Pass 2: override-by-name merge
# --------------------------------------------------------------------
def test_merge_overrides_by_name_disables_one_preset_broker():
"""Override AFTER preset wins: documented happy-path."""
pre_expanded = _expand_preset_entries([{"preset": "waev"}])
merged = _merge_overrides_by_name(pre_expanded + [{"name": "waev-b", "enabled": False}])
assert len(merged) == 2
by_name = {b["name"]: b for b in merged}
assert by_name["waev-a"]["enabled"] is True
assert by_name["waev-b"]["enabled"] is False
def test_merge_overrides_by_name_later_wins_documented_rule():
"""Override BEFORE preset is overwritten - locks the documented rule.
The preset-expanded entry comes after the user's override in this case,
so the preset wins and the user's `enabled: False` is silently lost. This
is the published rule ("place override entries AFTER preset entries");
this test exists so a future refactor can't quietly flip it.
"""
user_first = [{"name": "waev-b", "enabled": False}]
pipeline = _merge_overrides_by_name(user_first + _expand_preset_entries([{"preset": "waev"}]))
by_name = {b["name"]: b for b in pipeline}
# Preset wins - waev-b is enabled despite the user trying to disable it earlier.
assert by_name["waev-b"]["enabled"] is True
# --------------------------------------------------------------------
# MC2MQTT family parity in topic resolution
# --------------------------------------------------------------------
def _make_broker_connection(format_value: str) -> _BrokerConnection:
"""Build a minimal _BrokerConnection for topic-structure assertions."""
broker = {
"name": f"test-{format_value}",
"host": "test.example",
"port": 443,
"format": format_value,
"enabled": True,
}
return _BrokerConnection(
broker=broker,
local_identity=object(),
public_key="ABCD" * 16, # 64-char hex stand-in
iata_code="LAX",
jwt_expiry_minutes=10,
email="",
owner="",
broker_index=0,
node_name="testnode",
)
def test_mc2mqtt_formats_share_topic_structure():
"""Every MC2MQTT family member resolves to the canonical topic prefix."""
expected_mc2mqtt = "meshcore/LAX/" + ("ABCD" * 16)
for fmt in MC2MQTT_FORMATS:
conn = _make_broker_connection(fmt)
assert conn.base_topic == expected_mc2mqtt, f"format '{fmt}' should be MC2MQTT family"
# Legacy custom-MQTT format uses a different (operator-defined) prefix.
legacy = _make_broker_connection("mqtt")
assert legacy.base_topic == "meshcore/repeater/testnode"
# --------------------------------------------------------------------
# Legacy `letsmesh:` block migration
# --------------------------------------------------------------------
@pytest.mark.parametrize(
"broker_index, expected_disabled_names",
[
(-1, set()), # both brokers enabled (preset default)
(0, {"US West (LetsMesh v1)"}), # EU only - US disabled
(1, {"Europe (LetsMesh v1)"}), # US only - EU disabled
],
)
def test_legacy_letsmesh_block_migrates_to_preset_for_each_broker_index(
broker_index, expected_disabled_names
):
"""Legacy letsmesh.broker_index produces the same broker set as before.
The new migrator emits {preset: letsmesh} plus disable overrides; running
that through the expansion+merge pipeline must preserve the legacy
enabled/disabled topology.
"""
legacy_cfg = {"enabled": True, "broker_index": broker_index}
# Call the unbound method - it doesn't read instance state.
entries = MeshCoreToMqttPusher.convert_letsmesh_to_broker_config(
MeshCoreToMqttPusher.__new__(MeshCoreToMqttPusher), legacy_cfg
)
expanded = _expand_preset_entries(entries)
merged = _merge_overrides_by_name(expanded)
# Always two LetsMesh brokers come out of the pipeline.
assert len(merged) == 2
by_name = {b["name"]: b for b in merged}
for name, broker in by_name.items():
if name in expected_disabled_names:
assert broker["enabled"] is False, f"{name} should be disabled for index {broker_index}"
else:
assert broker["enabled"] is True, f"{name} should be enabled for index {broker_index}"
def test_disconnect_error_message_uses_paho_legacy_connection_lost_string():
"""Legacy paho disconnect rc=16 should not be mislabeled as a protocol error."""
assert get_mqtt_error_message(16, is_disconnect=True) == "The connection was lost."
def test_disconnect_error_message_preserves_mqtt_v5_reason_codes():
"""Real MQTT v5 disconnect reason codes should still decode to their reason names."""
assert get_mqtt_error_message(130, is_disconnect=True) == "Protocol error (code 130)"
def test_connect_failure_schedules_reconnect_with_actual_error_reason(monkeypatch):
"""Reconnect logs should reflect the connect failure, not the default reason string."""
conn = _make_broker_connection("letsmesh")
captured = {}
def fake_schedule_reconnect(reason="connection lost"):
captured["reason"] = reason
monkeypatch.setattr(conn, "_schedule_reconnect", fake_schedule_reconnect)
conn._on_connect(client=None, userdata=None, flags=None, rc=5)
assert captured["reason"] == "Not authorized (JWT signature/format invalid)"
def test_on_pre_connect_refreshes_jwt_credentials(monkeypatch):
"""JWT credentials should be refreshed on each (re)connect attempt."""
conn = _make_broker_connection("letsmesh")
conn.use_jwt_auth = True
called = {"count": 0}
def fake_set_credentials():
called["count"] += 1
monkeypatch.setattr(conn, "_set_credentials", fake_set_credentials)
conn._on_pre_connect(client=None, userdata=None)
assert called["count"] == 1
def test_payload_summary_omits_full_raw_dump_for_packet_logs():
"""MQTT debug logging should summarize packet payloads instead of dumping JSON blobs."""
payload = {
"type": "PACKET",
"packet_type": "4",
"route": "F",
"origin": "NWTBASE02",
"len": "120",
"payload_len": "115",
"raw": "aa" * 120,
"hash": "DD63C8077B5912FC",
}
summary = _summarize_payload_for_log(payload)
assert "type=PACKET" in summary
assert "route=F" in summary
assert "raw_bytes=120" in summary
assert '"raw"' not in summary
assert "aa" * 20 not in summary
def test_truncate_middle_preserves_topic_prefix_and_suffix():
"""Long MQTT topics should keep both routing context and the final path segment visible."""
topic = "meshcore/BOH/BEEF2F7F8632ADE3461D42D1653A0229310E424C37324A6768071A629DFDAA32/packets"
truncated = _truncate_middle(topic)
assert truncated.startswith("meshcore/BOH/BEEF2F7F863")
assert truncated.endswith("9DFDAA32/packets")
assert " ... " in truncated