diff --git a/config.yaml.example b/config.yaml.example index ee1bcaf..d6b6799 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -396,7 +396,10 @@ mqtt_brokers: # use_jwt_auth: true|false # Does this endpoint require JWT auth. Mutually Exclusive with Username & Password fields # username: "" # Username for basic auth. If empty or missing, uses anonymous access # password: "" # Password for basic auth. Required if username is set - # format: letsmesh|mqtt + # format: meshcoretomqtt|letsmesh|waev|mqtt + # meshcoretomqtt - canonical open-source MC2MQTT topic structure + # letsmesh, waev - MC2MQTT family flavors (same topic structure, network-specific identity) + # mqtt - legacy pyMC_Repeater local-broker convention (custom topic, singular 'packet') # retain_status: true|false # Sets MQTT "retain" on status messages so they remain on the broker when disconnected. Also enforces a QOS of 1 (guaranteed delivery) # tls: # enabled: true|false # Enable TLS. If the endpoint's certificate is self-signed, the Root CA should be added to the OS's certificate store. @@ -419,21 +422,34 @@ mqtt_brokers: # - TRACE # Don't publish trace packets # - RAW_CUSTOM # Don't publish custom raw packets - # Example of using the US and EU LetsMesh endpoints + # Bundled network presets (recommended). Endpoints ship with the package, + # so URL/audience updates arrive via `pip install -U`. Available presets: + # waev, letsmesh. + # # brokers: - # - name: US West (LetsMesh v1) - # host: mqtt-us-v1.letsmesh.net - # port: 443 - # audience: mqtt-us-v1.letsmesh.net - # use_jwt_auth: true + # - preset: waev + # - preset: letsmesh + # + # Override a preset broker by listing it again with the same name AFTER + # the preset entry. Later entries win on name collision. + # + # brokers: + # - preset: waev + # - name: waev-b + # enabled: false + # + # Mix presets with fully custom brokers in the same list: + # + # brokers: + # - preset: waev + # - name: my-lan-mqtt # enabled: true - - # - name: Europe (LetsMesh v1) - # host: mqtt-eu-v1.letsmesh.net - # port: 443 - # audience: mqtt-eu-v1.letsmesh.net - # use_jwt_auth: true - # enabled: true + # host: mqtt.lan + # port: 1883 + # transport: tcp + # format: mqtt + # username: repeater + # password: secret # pyMC_Glass control-plane integration (optional) glass: diff --git a/pyproject.toml b/pyproject.toml index fa8869d..b189948 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ repeater = [ "web/html/assets/**/*", "web/*.yaml", "web/*.html", + "presets/*.yaml", ] [tool.black] diff --git a/repeater/data_acquisition/mqtt_handler.py b/repeater/data_acquisition/mqtt_handler.py index e0b0117..c1a78ad 100644 --- a/repeater/data_acquisition/mqtt_handler.py +++ b/repeater/data_acquisition/mqtt_handler.py @@ -5,7 +5,7 @@ import logging import string import threading from datetime import datetime, timedelta -from typing import Callable, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import paho.mqtt.client as mqtt from nacl.signing import SigningKey @@ -18,6 +18,7 @@ except Exception: UTC = timezone.utc from repeater import __version__, config +from repeater.presets import get_preset # Try to import paho-mqtt error code mappings try: @@ -25,6 +26,7 @@ try: HAS_REASON_CODES = True except ImportError: + ReasonCode = None HAS_REASON_CODES = False logger = logging.getLogger("MQTTHandler") @@ -36,30 +38,118 @@ logger = logging.getLogger("MQTTHandler") def b64url(x: bytes) -> str: return base64.urlsafe_b64encode(x).rstrip(b"=").decode() -LETSMESH_BROKERS = [ - { - "name": "Europe (LetsMesh v1)", - "host": "mqtt-eu-v1.letsmesh.net", - "port": 443, - "audience": "mqtt-eu-v1.letsmesh.net", - "use_jwt_auth": True, - "tls": { - "enabled": True, - "insecure": False, - }, - }, - { - "name": "US West (LetsMesh v1)", - "host": "mqtt-us-v1.letsmesh.net", - "port": 443, - "audience": "mqtt-us-v1.letsmesh.net", - "use_jwt_auth": True, - "tls": { - "enabled": True, - "insecure": False, - }, - }, -] + +def _summarize_payload_for_log(payload: Any, message: Optional[str] = None) -> str: + """Return a compact single-line payload summary for debug logging.""" + if message is None: + try: + message = json.dumps(payload) + except Exception: + message = str(payload) + + if isinstance(payload, list): + item_bits = [] + if payload and isinstance(payload[0], dict): + item_bits.append(_summarize_payload_for_log(payload[0])) + return f"items={len(payload)}, bytes={len(message.encode('utf-8'))}" + ( + f", first={{ {item_bits[0]} }}" if item_bits else "" + ) + + if not isinstance(payload, dict): + return f"type={type(payload).__name__}, bytes={len(message.encode('utf-8'))}" + + summary_fields = [] + for key in ("type", "packet_type", "route", "origin", "status", "hash", "RSSI", "SNR"): + value = payload.get(key) + if value not in (None, ""): + summary_fields.append(f"{key}={value}") + + if "len" in payload: + summary_fields.append(f"len={payload['len']}") + if "payload_len" in payload: + summary_fields.append(f"payload_len={payload['payload_len']}") + if "raw" in payload: + summary_fields.append(f"raw_bytes={len(str(payload['raw'])) // 2}") + + if not summary_fields: + keys = ",".join(sorted(payload.keys())[:6]) + if len(payload) > 6: + keys += ",..." + summary_fields.append(f"keys={keys}") + + summary_fields.append(f"bytes={len(message.encode('utf-8'))}") + return ", ".join(summary_fields) + + +def _truncate_middle(text: str, prefix: int = 24, suffix: int = 16) -> str: + """Truncate long strings by keeping the beginning and end visible.""" + if len(text) <= prefix + suffix + 5: + return text + return f"{text[:prefix]} ... {text[-suffix:]}" + + +# -------------------------------------------------------------------- +# Format family +# -------------------------------------------------------------------- +# Format values that belong to the MeshCoreToMQTT (MC2MQTT) protocol family. +# All MC2MQTT brokers share the topic structure: meshcore/{IATA}/{PUBLIC_KEY}/... +# Adding a new MC2MQTT-compatible platform = add its format value here AND +# drop a matching presets/.yaml. The legacy "mqtt" format is NOT in this +# tuple - it speaks the operator-defined custom topic structure instead. +MC2MQTT_FORMATS = ("meshcoretomqtt", "letsmesh", "waev") + + +# -------------------------------------------------------------------- +# Preset expansion helpers (used during broker-list assembly below). +# +# A user's brokers: list may contain a mix of: +# * preset references: {preset: } -> expands to N broker dicts +# * full broker dicts: {name, host, port, ...} -> used as-is +# +# Expansion is two passes so the rules are obvious and order-independent +# in spirit, but order-dependent in practice (later wins on name collision): +# Pass 1 - replace every {preset: ...} entry in place with the bundled +# broker list. Unknown preset is dropped with a warning. +# Pass 2 - walk left-to-right; if an entry's name already appeared +# earlier, shallow-merge the LATER entry onto the EARLIER one +# and drop the duplicate. Place override entries AFTER preset +# entries to make them win. +# -------------------------------------------------------------------- +def _expand_preset_entries(brokers: List[dict]) -> List[dict]: + """Pass 1: replace every {preset: } entry with the preset's brokers.""" + expanded: List[dict] = [] + for entry in brokers: + if isinstance(entry, dict) and "preset" in entry and "name" not in entry: + preset_name = entry["preset"] + preset = get_preset(preset_name) + preset_brokers = preset.get("brokers", []) if preset else [] + if not preset_brokers: + logger.warning(f"Unknown or empty broker preset '{preset_name}' - skipping") + continue + logger.info(f"Expanded preset '{preset_name}' into {len(preset_brokers)} broker(s)") + expanded.extend(preset_brokers) + else: + expanded.append(entry) + return expanded + + +def _merge_overrides_by_name(brokers: List[dict]) -> List[dict]: + """Pass 2: collapse duplicates by name; the LATER entry's fields win.""" + by_index: Dict[str, int] = {} + result: List[dict] = [] + for entry in brokers: + if not isinstance(entry, dict): + result.append(entry) + continue + name = entry.get("name") + if name and name in by_index: + # Shallow-merge: later entry's keys overwrite earlier entry's keys. + result[by_index[name]] = {**result[by_index[name]], **entry} + else: + if name: + by_index[name] = len(result) + result.append(entry) + return result # ==================================================================== @@ -101,8 +191,10 @@ class _BrokerConnection: self._reconnect_attempts = 0 self._reconnect_timer = None self._max_reconnect_delay = 300 # 5 minutes max + self._keepalive = broker.get("keepalive", 30) # default tighter than paho's 60s to beat NAT/proxy timeouts self._jwt_refresh_timer = None self._shutdown_requested = False + self._last_jwt_claims = None self.transport = broker.get('transport', 'websockets') self.use_jwt_auth = broker.get('use_jwt_auth', False) @@ -117,6 +209,8 @@ class _BrokerConnection: client_id = f"meshcore_{self.public_key}_{broker['host']}_{self.format}" self.client = mqtt.Client(client_id=client_id, transport=self.transport) + if hasattr(self.client, "on_pre_connect"): + self.client.on_pre_connect = self._on_pre_connect self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect @@ -130,11 +224,13 @@ class _BrokerConnection: if self.base_topic is None: if self.format == "mqtt": + # Custom MQTT family: operator-defined topic, not meshcore convention. self.base_topic = f"meshcore/repeater/{self.node_name}" - elif self.format == "letsmesh": + elif self.format in MC2MQTT_FORMATS: + # MC2MQTT family: canonical meshcoretomqtt topic structure. self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}" else: - logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, using default base topic") + logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, defaulting to MC2MQTT topic structure") self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}" from pymc_core.protocol.utils import PAYLOAD_TYPES @@ -170,6 +266,13 @@ class _BrokerConnection: payload["email"] = "" payload["owner"] = "" + self._last_jwt_claims = { + "aud": payload.get("aud"), + "iat": payload.get("iat"), + "exp": payload.get("exp"), + "public_key_suffix": self.public_key[-12:], + } + # Encode header and payload (compact JSON - no spaces) header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode()) payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode()) @@ -198,6 +301,10 @@ class _BrokerConnection: logger.info(f"Connected to {self.broker['name']}") self._running = True self._reconnect_attempts = 0 # Reset counter on success + # Successful connect can race with a previously scheduled timer; cancel it. + if self._reconnect_timer: + self._reconnect_timer.cancel() + self._reconnect_timer = None if self.use_jwt_auth: self._schedule_jwt_refresh() # Schedule proactive JWT refresh if self._on_connect_callback: @@ -205,7 +312,14 @@ class _BrokerConnection: else: error_msg = get_mqtt_error_message(rc, is_disconnect=False) logger.error(f"Failed to connect to {self.broker['name']}: {error_msg}") - self._schedule_reconnect() + self._schedule_reconnect(reason=error_msg) + + def _on_pre_connect(self, client, userdata): + """Refresh credentials before each connect/reconnect attempt.""" + if self._shutdown_requested: + return + if self.use_jwt_auth: + self._set_credentials() def _on_disconnect(self, client, userdata, rc): """MQTT disconnection callback""" @@ -220,7 +334,13 @@ class _BrokerConnection: if rc != 0: # Unexpected disconnect error_msg = get_mqtt_error_message(rc, is_disconnect=True) - logger.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}") + if was_running: + logger.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}") + else: + logger.debug( + f"Duplicate disconnect callback from {self.broker['name']} while already disconnected " + f"(rc={rc}): {error_msg}" + ) if was_running: # Only reconnect if we were intentionally connected self._schedule_reconnect(reason=error_msg) else: @@ -253,6 +373,13 @@ class _BrokerConnection: if self._shutdown_requested: return + # Timer has fired; clear handle so state reflects reality. + self._reconnect_timer = None + + if self._running: + logger.debug(f"Skipping reconnect to {self.broker['name']} - already connected") + return + try: logger.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...") @@ -265,7 +392,7 @@ class _BrokerConnection: self._set_credentials() # Reconnect and restart loop - self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) + self.client.connect(self.broker["host"], self.broker["port"], keepalive=self._keepalive) self.client.loop_start() self._loop_running = True except Exception as e: @@ -280,9 +407,10 @@ class _BrokerConnection: token = self._generate_jwt() username = f"v1_{self.public_key}" self.client.username_pw_set(username=username, password=token) - logger.debug(f"Credentials set for {self.broker['name']}") - logger.debug(f"Using username: {username}") - logger.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}") + logger.debug( + f"Credentials set for {self.broker['name']}: " + f"user=v1_{self.public_key[:8]}...{self.public_key[-8:]}" + ) elif self.username and self.password: logger.info(f"Using provided credentials for {self.broker['name']} (username: {self.username})") self.client.username_pw_set(username=self.username, password=self.password) @@ -330,7 +458,7 @@ class _BrokerConnection: f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..." ) - self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) + self.client.connect(self.broker["host"], self.broker["port"], keepalive=self._keepalive) self.client.loop_start() self._loop_running = True @@ -363,9 +491,13 @@ class _BrokerConnection: retain = self.retain_status qos = 1 if self.retain_status else 0 - logger.debug(f"Publishing to topic '{self.base_topic}/{subtopic}' with payload: [{payload}]. Running={self._running}. Retain={retain}, QoS={qos}") + full_topic = f"{self.base_topic}/{subtopic}" + logger.debug( + f"Publishing topic='{_truncate_middle(full_topic)}', bytes={len(payload.encode('utf-8'))}, " + f"running={self._running}, retain={retain}, qos={qos}" + ) if self._running: - result = self.client.publish(f"{self.base_topic}/{subtopic}", payload, retain=retain, qos=qos) + result = self.client.publish(full_topic, payload, retain=retain, qos=qos) return result else: logger.warning(f"Cannot publish to {self.broker['name']} - not connected") @@ -488,12 +620,38 @@ class MeshCoreToMqttPusher: imported_letsmesh_configs = self.convert_letsmesh_to_broker_config(letsmesh_config) brokers.extend(imported_letsmesh_configs) + # Expand any {preset: } entries (from user config OR legacy + # migrators) into concrete broker dicts, then collapse duplicates so + # later overrides win. Result feeds the validation loop unchanged. + brokers = _expand_preset_entries(brokers) + brokers = _merge_overrides_by_name(brokers) + + # Known MC2MQTT hostnames that must never use format: mqtt. + # If a user's saved config has the wrong format (common after manual editing + # or an old UI version), auto-correct it so diagnostic topics aren't sent + # to brokers that will reject them and close the connection (rc=16). + _MC2MQTT_HOSTS = { + "letsmesh.net", + "waev.app", + "meshcoretomqtt", + } + self.brokers = [] if brokers: for broker_config in brokers: if all(k in broker_config for k in ["name", "host", "port", "enabled"]): + host = broker_config.get("host", "") + fmt = broker_config.get("format", "") + if fmt == "mqtt" and any(mc2 in host for mc2 in _MC2MQTT_HOSTS): + corrected = broker_config.get("name", host) + logger.warning( + f"Broker '{corrected}' has format=mqtt but host '{host}' is a MC2MQTT " + f"endpoint — auto-correcting to format=letsmesh. " + f"Update your config.yaml to silence this warning." + ) + broker_config = {**broker_config, "format": "letsmesh"} self.brokers.append(broker_config) - logger.info(f"Added broker: {broker_config['name']}") + logger.info(f"Added broker: {broker_config['name']} (format={broker_config.get('format', 'unknown')})") else: logger.warning(f"Skipping invalid broker config: {broker_config}") @@ -552,62 +710,47 @@ class MeshCoreToMqttPusher: } def convert_letsmesh_to_broker_config(self, letsmesh_cfg: dict) -> List[dict]: - """Convert LetsMesh config format to internal broker config format""" - - brokers = [] - + """Migrate the legacy ``letsmesh:`` config block into preset-style entries. + + Emits a list that the preset-expansion pass will turn into the same + broker dicts the old hard-coded migrator used to produce. Specifically: + + * ``broker_index == -1`` -> both LetsMesh brokers (the preset default). + * ``broker_index == 0`` -> Europe only (US disabled via override). + * ``broker_index == 1`` -> US only (Europe disabled via override). + * ``additional_brokers`` -> appended verbatim after the preset reference. + * ``enabled: false`` -> overrides every preset broker as disabled. + """ enabled = letsmesh_cfg.get("enabled", False) + idx = letsmesh_cfg.get("broker_index", -1) - idx = letsmesh_cfg.get("broker_index", None) - if idx == 0 or idx == 1: - broker_info = LETSMESH_BROKERS[idx] - logger.info(f"Imported LetsMesh broker from 'letsmesh' config: {broker_info['name']}") - brokers.append({ - "enabled": enabled, - "name": broker_info["name"], - "host": broker_info["host"], - "port": broker_info["port"], - "audience": broker_info["audience"], - "use_jwt_auth": True, - "transport": "websockets", - "format": "letsmesh", - "base_topic": None, - "retain_status": False, - "tls": { - "enabled": True, - "insecure": False, - }, - }) - elif idx < 0: - if idx == -1: - brokers.extend({ - "enabled": enabled, - "name": broker_info["name"], - "host": broker_info["host"], - "port": broker_info["port"], - "audience": broker_info["audience"], - "use_jwt_auth": True, - "transport": "websockets", - "format": "letsmesh", - "base_topic": None, - "retain_status": False, - "tls": { - "enabled": True, - "insecure": False, - }, - } for broker_info in LETSMESH_BROKERS) + # Resolve preset broker names so we can build accurate "disable" overrides. + preset_brokers = get_preset("letsmesh").get("brokers", []) + eu_name = preset_brokers[0]["name"] if len(preset_brokers) > 0 else "Europe (LetsMesh v1)" + us_name = preset_brokers[1]["name"] if len(preset_brokers) > 1 else "US West (LetsMesh v1)" - additional = letsmesh_cfg.get("additional_brokers", []) - for add_broker in additional: - logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker['name']}") - brokers.append({ + entries: List[dict] = [{"preset": "letsmesh"}] + + # Honor broker_index by disabling the broker the legacy user did not pick. + if idx == 0: + entries.append({"name": us_name, "enabled": False}) + elif idx == 1: + entries.append({"name": eu_name, "enabled": False}) + + # Honor the legacy enabled flag by overriding every preset broker. + if not enabled: + for b in preset_brokers: + entries.append({"name": b["name"], "enabled": False}) + + # Append any user-defined additional brokers as full entries. + for add_broker in letsmesh_cfg.get("additional_brokers", []): + logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker.get('name')}") + entries.append({ "enabled": enabled, "name": add_broker["name"], "host": add_broker["host"], "port": add_broker["port"], "audience": add_broker["audience"], - "use_jwt_auth": True, - "transport": "websockets", "use_jwt_auth": add_broker.get("use_jwt_auth", True), "transport": add_broker.get("transport", "websockets"), "format": "letsmesh", @@ -616,10 +759,10 @@ class MeshCoreToMqttPusher: "tls": { "enabled": add_broker.get("tls", {}).get("enabled", True), "insecure": add_broker.get("tls", {}).get("insecure", False), - } + }, }) - return brokers + return entries def _on_broker_connected(self, broker_name: str): """Callback when a broker connects""" @@ -787,7 +930,7 @@ class MeshCoreToMqttPusher: message = json.dumps(payload) # _BrokerConnection now handles topic prefixing, so we only log the subtopic here - logger.debug(f"Publishing to topic '{subtopic}' with payload: {message}") + logger.debug(f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}") packet_type = payload.get("type") @@ -802,7 +945,7 @@ class MeshCoreToMqttPusher: results.append((conn.broker["name"], result)) logger.debug(f"Published to {conn.broker['name']} -- {subtopic}") elif conn.enabled == False: - results.append((conn.broker["name"], "Skipped due to being disabled")) # Indicate skipped due to format mismatch + results.append((conn.broker["name"], "Skipped due to being disabled")) if not results: logger.warning(f"No active broker connections for publishing to {subtopic}") @@ -811,25 +954,35 @@ class MeshCoreToMqttPusher: def publish_mqtt(self, payload: dict, subtopic: str, retain: bool = False, qos: int = 0): - """Publish message to all connected brokers""" + """Publish message to brokers using the legacy custom-MQTT format only. + + This path exists for diagnostic streams (advert, noise_floor, crc_errors) + that were originally only published to ``format: mqtt`` brokers. MC2MQTT + family brokers (letsmesh, waev, meshcoretomqtt) intentionally skip these + topics today. + """ message = json.dumps(payload) # _BrokerConnection now handles topic prefixing, so we only log the subtopic here - logger.debug(f"Publishing to topic '{subtopic}' with payload: {message}") + logger.debug(f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}") results = [] with self._lock: for conn in self.connections: if conn.enabled and conn.is_connected(): if conn.format != "mqtt": - logger.debug(f"Skipped publishing to {conn.broker['name']} (wrong format)") - results.append((conn.broker["name"], None)) # Indicate skipped due to format mismatch + # Custom-MQTT-only path; MC2MQTT brokers are intentionally skipped here. + logger.debug( + f"Skipped publishing to {conn.broker['name']} " + f"(intentional: publish_mqtt only targets legacy mqtt format; broker format={conn.format})" + ) + results.append((conn.broker["name"], None)) continue result = conn.publish(subtopic, message, retain=retain, qos=qos) results.append((conn.broker["name"], result)) - logger.debug(f"Published to {conn.broker['name']} -- {subtopic}") + logger.debug(f"Published to {conn.broker['name']} (format={conn.format}) -- {subtopic}") elif conn.enabled == False: - results.append((conn.broker["name"], "Skipped due to being disabled")) # Indicate skipped due to format mismatch + results.append((conn.broker["name"], "Skipped due to being disabled")) if not results: logger.warning(f"No active broker connections for publishing to {subtopic}") @@ -893,7 +1046,7 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str: 4: "Bad username or password", 5: "Not authorized", 7: "Connection lost / network error", - 16: "Connection lost / protocol error", + 16: "The connection was lost.", 17: "Client timeout", # MQTT v5 codes 4: "Disconnect with Will message", @@ -926,7 +1079,7 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str: 162: "Wildcard subscriptions not supported", } - if HAS_REASON_CODES: + if HAS_REASON_CODES and ReasonCode is not None: try: reason = ReasonCode(mqtt.CONNACK if not is_disconnect else mqtt.DISCONNECT, identifier=rc) @@ -938,5 +1091,13 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str: if _fallback is None: logger.debug(f"Could not decode reason code {rc}: {e}") + if is_disconnect: + try: + paho_error = mqtt.error_string(rc) + if paho_error and paho_error != "Unknown error.": + return paho_error + except Exception: + pass + error_dict = disconnect_errors if is_disconnect else connect_errors return error_dict.get(rc, f"Unknown error code {rc}") diff --git a/repeater/presets/__init__.py b/repeater/presets/__init__.py new file mode 100644 index 0000000..432c92e --- /dev/null +++ b/repeater/presets/__init__.py @@ -0,0 +1,60 @@ +"""Bundled MQTT broker presets. + +Each sibling ``*.yaml`` file in this package defines a named preset - a +ready-to-use list of broker dicts for a known MeshCoreToMQTT (MC2MQTT) +network. Presets ship with the package, so a ``pip install -U`` is enough +to pick up new endpoints without editing user config. + +Public API: + get_preset(name) -> dict (the parsed YAML, or {} if unknown) + list_presets() -> sorted list of preset names + +The loader is lazy: nothing is read or parsed at import time. The first +call discovers sibling YAML files via ``importlib.resources`` and caches +the parsed dicts for the lifetime of the process. +""" + +from __future__ import annotations + +import logging +from importlib.resources import files +from pathlib import Path +from typing import Dict, List + +import yaml + +logger = logging.getLogger("Presets") + +# Cache of parsed presets, keyed by name (e.g. "waev"). Populated on first +# call to _load_all(); never cleared. +_CACHE: Dict[str, dict] = {} +_LOADED: bool = False + + +def _load_all() -> Dict[str, dict]: + """Discover and parse every bundled ``*.yaml`` file once.""" + global _LOADED + if _LOADED: + return _CACHE + for resource in files(__package__).iterdir(): + # importlib.resources.Traversable: only consider real files ending in .yaml + name = getattr(resource, "name", "") + if not name.endswith(".yaml"): + continue + try: + with resource.open("r", encoding="utf-8") as f: + _CACHE[Path(name).stem] = yaml.safe_load(f) or {} + except Exception as e: # pragma: no cover - defensive + logger.warning(f"Failed to load preset '{name}': {e}") + _LOADED = True + return _CACHE + + +def get_preset(name: str) -> dict: + """Return the parsed preset dict, or ``{}`` if no such preset exists.""" + return _load_all().get(name, {}) + + +def list_presets() -> List[str]: + """Return the sorted list of bundled preset names.""" + return sorted(_load_all().keys()) diff --git a/repeater/presets/letsmesh.yaml b/repeater/presets/letsmesh.yaml new file mode 100644 index 0000000..f35ba8e --- /dev/null +++ b/repeater/presets/letsmesh.yaml @@ -0,0 +1,37 @@ +# LetsMesh MC2MQTT broker preset. +# +# LetsMesh is the operator of the public MeshCore Packet Analyzer +# infrastructure. These brokers speak the MeshCoreToMQTT (MC2MQTT) +# protocol with the "letsmesh" format flavor, which today shares the +# canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...). +# +# Reference all LetsMesh endpoints with: brokers: [{preset: letsmesh}] +# +# Note: order matters for backward compatibility with the legacy +# letsmesh.broker_index field. Index 0 is Europe, index 1 is US West. +brokers: + - name: "Europe (LetsMesh v1)" + enabled: true + host: mqtt-eu-v1.letsmesh.net + port: 443 + transport: "websockets" + audience: "mqtt-eu-v1.letsmesh.net" + use_jwt_auth: true + format: letsmesh + retain_status: false + tls: + enabled: true + insecure: false + + - name: "US West (LetsMesh v1)" + enabled: true + host: mqtt-us-v1.letsmesh.net + port: 443 + transport: "websockets" + audience: "mqtt-us-v1.letsmesh.net" + use_jwt_auth: true + format: letsmesh + retain_status: false + tls: + enabled: true + insecure: false diff --git a/repeater/presets/waev.yaml b/repeater/presets/waev.yaml new file mode 100644 index 0000000..fe4f7a4 --- /dev/null +++ b/repeater/presets/waev.yaml @@ -0,0 +1,35 @@ +# Waev MC2MQTT broker preset. +# +# Waev is a real-time telemetry, monitoring, and analytics platform for +# MeshCore radio networks. These brokers speak the MeshCoreToMQTT +# (MC2MQTT) protocol with the "waev" format flavor, which today shares +# the canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...) +# and is reserved for future Waev-specific deviations. +# +# Reference all Waev endpoints with: brokers: [{preset: waev}] +brokers: + - name: "waev-a" + enabled: true + host: mqtt-a.waev.app + port: 443 + transport: "websockets" + audience: "mqtt-a.waev.app" + use_jwt_auth: true + format: waev + retain_status: true + tls: + enabled: true + insecure: false + + - name: "waev-b" + enabled: true + host: mqtt-b.waev.app + port: 443 + transport: "websockets" + audience: "mqtt-b.waev.app" + use_jwt_auth: true + format: waev + retain_status: true + tls: + enabled: true + insecure: false diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py index 397d0f0..974026c 100644 --- a/repeater/web/api_endpoints.py +++ b/repeater/web/api_endpoints.py @@ -1209,6 +1209,13 @@ class APIEndpoints: for i, b in enumerate(brokers): if not isinstance(b, dict): return self._error(f"Broker at index {i} must be an object") + + # Bundled preset reference: {preset: }. Pass through + # unchanged - the MQTT handler expands it on next start. + if "preset" in b and "name" not in b: + validated.append({"preset": str(b["preset"]).strip()}) + continue + for field in ("name", "host", "port", "format"): if not b.get(field, ""): return self._error(f"Broker at index {i} missing required field: {field}") diff --git a/tests/test_presets.py b/tests/test_presets.py new file mode 100644 index 0000000..4253bff --- /dev/null +++ b/tests/test_presets.py @@ -0,0 +1,246 @@ +"""Tests for the bundled MQTT broker preset system. + +Locks the public contract documented in `config.yaml.example` and the +behavior contract in the feat/generalized-mqtt PR. +""" + +import logging + +import pytest + +from repeater.data_acquisition.mqtt_handler import ( + MC2MQTT_FORMATS, + MeshCoreToMqttPusher, + _BrokerConnection, + _expand_preset_entries, + _merge_overrides_by_name, + _summarize_payload_for_log, + _truncate_middle, + get_mqtt_error_message, +) +from repeater.presets import get_preset, list_presets + + +# -------------------------------------------------------------------- +# Preset loader contract +# -------------------------------------------------------------------- +def test_list_presets_returns_bundled_names(): + """The shipped wheel must contain at least 'waev' and 'letsmesh'.""" + names = list_presets() + assert "waev" in names + assert "letsmesh" in names + + +def test_get_preset_waev_has_two_brokers(): + """Waev preset shape: top-level 'brokers' list with two MC2MQTT entries.""" + preset = get_preset("waev") + brokers = preset.get("brokers", []) + assert len(brokers) == 2 + for b in brokers: + assert "name" in b + assert "host" in b + assert b.get("format") == "waev" + + +def test_get_preset_unknown_returns_empty_dict(): + """Unknown preset names resolve to {} - no exception.""" + assert get_preset("definitely-not-a-real-preset") == {} + + +# -------------------------------------------------------------------- +# Pass 1: preset expansion +# -------------------------------------------------------------------- +def test_expand_preset_entries_inlines_bundled_brokers(): + """A {preset: waev} entry expands to the two Waev broker dicts.""" + expanded = _expand_preset_entries([{"preset": "waev"}]) + assert len(expanded) == 2 + names = [b["name"] for b in expanded] + assert "waev-a" in names + assert "waev-b" in names + + +def test_expand_preset_entries_drops_unknown_preset_with_warning(caplog): + """An unknown preset is dropped; the daemon does not crash.""" + with caplog.at_level(logging.WARNING, logger="MQTTHandler"): + expanded = _expand_preset_entries([{"preset": "bogus"}]) + assert expanded == [] + assert any("bogus" in record.message for record in caplog.records) + + +# -------------------------------------------------------------------- +# Pass 2: override-by-name merge +# -------------------------------------------------------------------- +def test_merge_overrides_by_name_disables_one_preset_broker(): + """Override AFTER preset wins: documented happy-path.""" + pre_expanded = _expand_preset_entries([{"preset": "waev"}]) + merged = _merge_overrides_by_name(pre_expanded + [{"name": "waev-b", "enabled": False}]) + assert len(merged) == 2 + by_name = {b["name"]: b for b in merged} + assert by_name["waev-a"]["enabled"] is True + assert by_name["waev-b"]["enabled"] is False + + +def test_merge_overrides_by_name_later_wins_documented_rule(): + """Override BEFORE preset is overwritten - locks the documented rule. + + The preset-expanded entry comes after the user's override in this case, + so the preset wins and the user's `enabled: False` is silently lost. This + is the published rule ("place override entries AFTER preset entries"); + this test exists so a future refactor can't quietly flip it. + """ + user_first = [{"name": "waev-b", "enabled": False}] + pipeline = _merge_overrides_by_name(user_first + _expand_preset_entries([{"preset": "waev"}])) + by_name = {b["name"]: b for b in pipeline} + # Preset wins - waev-b is enabled despite the user trying to disable it earlier. + assert by_name["waev-b"]["enabled"] is True + + +# -------------------------------------------------------------------- +# MC2MQTT family parity in topic resolution +# -------------------------------------------------------------------- +def _make_broker_connection(format_value: str) -> _BrokerConnection: + """Build a minimal _BrokerConnection for topic-structure assertions.""" + broker = { + "name": f"test-{format_value}", + "host": "test.example", + "port": 443, + "format": format_value, + "enabled": True, + } + return _BrokerConnection( + broker=broker, + local_identity=object(), + public_key="ABCD" * 16, # 64-char hex stand-in + iata_code="LAX", + jwt_expiry_minutes=10, + email="", + owner="", + broker_index=0, + node_name="testnode", + ) + + +def test_mc2mqtt_formats_share_topic_structure(): + """Every MC2MQTT family member resolves to the canonical topic prefix.""" + expected_mc2mqtt = "meshcore/LAX/" + ("ABCD" * 16) + for fmt in MC2MQTT_FORMATS: + conn = _make_broker_connection(fmt) + assert conn.base_topic == expected_mc2mqtt, f"format '{fmt}' should be MC2MQTT family" + + # Legacy custom-MQTT format uses a different (operator-defined) prefix. + legacy = _make_broker_connection("mqtt") + assert legacy.base_topic == "meshcore/repeater/testnode" + + +# -------------------------------------------------------------------- +# Legacy `letsmesh:` block migration +# -------------------------------------------------------------------- +@pytest.mark.parametrize( + "broker_index, expected_disabled_names", + [ + (-1, set()), # both brokers enabled (preset default) + (0, {"US West (LetsMesh v1)"}), # EU only - US disabled + (1, {"Europe (LetsMesh v1)"}), # US only - EU disabled + ], +) +def test_legacy_letsmesh_block_migrates_to_preset_for_each_broker_index( + broker_index, expected_disabled_names +): + """Legacy letsmesh.broker_index produces the same broker set as before. + + The new migrator emits {preset: letsmesh} plus disable overrides; running + that through the expansion+merge pipeline must preserve the legacy + enabled/disabled topology. + """ + legacy_cfg = {"enabled": True, "broker_index": broker_index} + # Call the unbound method - it doesn't read instance state. + entries = MeshCoreToMqttPusher.convert_letsmesh_to_broker_config( + MeshCoreToMqttPusher.__new__(MeshCoreToMqttPusher), legacy_cfg + ) + + expanded = _expand_preset_entries(entries) + merged = _merge_overrides_by_name(expanded) + + # Always two LetsMesh brokers come out of the pipeline. + assert len(merged) == 2 + by_name = {b["name"]: b for b in merged} + for name, broker in by_name.items(): + if name in expected_disabled_names: + assert broker["enabled"] is False, f"{name} should be disabled for index {broker_index}" + else: + assert broker["enabled"] is True, f"{name} should be enabled for index {broker_index}" + + +def test_disconnect_error_message_uses_paho_legacy_connection_lost_string(): + """Legacy paho disconnect rc=16 should not be mislabeled as a protocol error.""" + assert get_mqtt_error_message(16, is_disconnect=True) == "The connection was lost." + + +def test_disconnect_error_message_preserves_mqtt_v5_reason_codes(): + """Real MQTT v5 disconnect reason codes should still decode to their reason names.""" + assert get_mqtt_error_message(130, is_disconnect=True) == "Protocol error (code 130)" + + +def test_connect_failure_schedules_reconnect_with_actual_error_reason(monkeypatch): + """Reconnect logs should reflect the connect failure, not the default reason string.""" + conn = _make_broker_connection("letsmesh") + captured = {} + + def fake_schedule_reconnect(reason="connection lost"): + captured["reason"] = reason + + monkeypatch.setattr(conn, "_schedule_reconnect", fake_schedule_reconnect) + + conn._on_connect(client=None, userdata=None, flags=None, rc=5) + + assert captured["reason"] == "Not authorized (JWT signature/format invalid)" + + +def test_on_pre_connect_refreshes_jwt_credentials(monkeypatch): + """JWT credentials should be refreshed on each (re)connect attempt.""" + conn = _make_broker_connection("letsmesh") + conn.use_jwt_auth = True + + called = {"count": 0} + + def fake_set_credentials(): + called["count"] += 1 + + monkeypatch.setattr(conn, "_set_credentials", fake_set_credentials) + + conn._on_pre_connect(client=None, userdata=None) + + assert called["count"] == 1 + + +def test_payload_summary_omits_full_raw_dump_for_packet_logs(): + """MQTT debug logging should summarize packet payloads instead of dumping JSON blobs.""" + payload = { + "type": "PACKET", + "packet_type": "4", + "route": "F", + "origin": "NWTBASE02", + "len": "120", + "payload_len": "115", + "raw": "aa" * 120, + "hash": "DD63C8077B5912FC", + } + + summary = _summarize_payload_for_log(payload) + + assert "type=PACKET" in summary + assert "route=F" in summary + assert "raw_bytes=120" in summary + assert '"raw"' not in summary + assert "aa" * 20 not in summary + + +def test_truncate_middle_preserves_topic_prefix_and_suffix(): + """Long MQTT topics should keep both routing context and the final path segment visible.""" + topic = "meshcore/BOH/BEEF2F7F8632ADE3461D42D1653A0229310E424C37324A6768071A629DFDAA32/packets" + + truncated = _truncate_middle(topic) + + assert truncated.startswith("meshcore/BOH/BEEF2F7F863") + assert truncated.endswith("9DFDAA32/packets") + assert " ... " in truncated