diff --git a/config.yaml.example b/config.yaml.example index ee1bcaf..d6b6799 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -396,7 +396,10 @@ mqtt_brokers: # use_jwt_auth: true|false # Does this endpoint require JWT auth. Mutually Exclusive with Username & Password fields # username: "" # Username for basic auth. If empty or missing, uses anonymous access # password: "" # Password for basic auth. Required if username is set - # format: letsmesh|mqtt + # format: meshcoretomqtt|letsmesh|waev|mqtt + # meshcoretomqtt - canonical open-source MC2MQTT topic structure + # letsmesh, waev - MC2MQTT family flavors (same topic structure, network-specific identity) + # mqtt - legacy pyMC_Repeater local-broker convention (custom topic, singular 'packet') # retain_status: true|false # Sets MQTT "retain" on status messages so they remain on the broker when disconnected. Also enforces a QOS of 1 (guaranteed delivery) # tls: # enabled: true|false # Enable TLS. If the endpoint's certificate is self-signed, the Root CA should be added to the OS's certificate store. @@ -419,21 +422,34 @@ mqtt_brokers: # - TRACE # Don't publish trace packets # - RAW_CUSTOM # Don't publish custom raw packets - # Example of using the US and EU LetsMesh endpoints + # Bundled network presets (recommended). Endpoints ship with the package, + # so URL/audience updates arrive via `pip install -U`. Available presets: + # waev, letsmesh. + # # brokers: - # - name: US West (LetsMesh v1) - # host: mqtt-us-v1.letsmesh.net - # port: 443 - # audience: mqtt-us-v1.letsmesh.net - # use_jwt_auth: true + # - preset: waev + # - preset: letsmesh + # + # Override a preset broker by listing it again with the same name AFTER + # the preset entry. Later entries win on name collision. + # + # brokers: + # - preset: waev + # - name: waev-b + # enabled: false + # + # Mix presets with fully custom brokers in the same list: + # + # brokers: + # - preset: waev + # - name: my-lan-mqtt # enabled: true - - # - name: Europe (LetsMesh v1) - # host: mqtt-eu-v1.letsmesh.net - # port: 443 - # audience: mqtt-eu-v1.letsmesh.net - # use_jwt_auth: true - # enabled: true + # host: mqtt.lan + # port: 1883 + # transport: tcp + # format: mqtt + # username: repeater + # password: secret # pyMC_Glass control-plane integration (optional) glass: diff --git a/pyproject.toml b/pyproject.toml index fa8869d..b189948 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ repeater = [ "web/html/assets/**/*", "web/*.yaml", "web/*.html", + "presets/*.yaml", ] [tool.black] diff --git a/repeater/data_acquisition/mqtt_handler.py b/repeater/data_acquisition/mqtt_handler.py index e0b0117..453d9b1 100644 --- a/repeater/data_acquisition/mqtt_handler.py +++ b/repeater/data_acquisition/mqtt_handler.py @@ -18,6 +18,7 @@ except Exception: UTC = timezone.utc from repeater import __version__, config +from repeater.presets import get_preset # Try to import paho-mqtt error code mappings try: @@ -36,30 +37,69 @@ logger = logging.getLogger("MQTTHandler") def b64url(x: bytes) -> str: return base64.urlsafe_b64encode(x).rstrip(b"=").decode() -LETSMESH_BROKERS = [ - { - "name": "Europe (LetsMesh v1)", - "host": "mqtt-eu-v1.letsmesh.net", - "port": 443, - "audience": "mqtt-eu-v1.letsmesh.net", - "use_jwt_auth": True, - "tls": { - "enabled": True, - "insecure": False, - }, - }, - { - "name": "US West (LetsMesh v1)", - "host": "mqtt-us-v1.letsmesh.net", - "port": 443, - "audience": "mqtt-us-v1.letsmesh.net", - "use_jwt_auth": True, - "tls": { - "enabled": True, - "insecure": False, - }, - }, -] + +# -------------------------------------------------------------------- +# Format family +# -------------------------------------------------------------------- +# Format values that belong to the MeshCoreToMQTT (MC2MQTT) protocol family. +# All MC2MQTT brokers share the topic structure: meshcore/{IATA}/{PUBLIC_KEY}/... +# Adding a new MC2MQTT-compatible platform = add its format value here AND +# drop a matching presets/.yaml. The legacy "mqtt" format is NOT in this +# tuple - it speaks the operator-defined custom topic structure instead. +MC2MQTT_FORMATS = ("meshcoretomqtt", "letsmesh", "waev") + + +# -------------------------------------------------------------------- +# Preset expansion helpers (used during broker-list assembly below). +# +# A user's brokers: list may contain a mix of: +# * preset references: {preset: } -> expands to N broker dicts +# * full broker dicts: {name, host, port, ...} -> used as-is +# +# Expansion is two passes so the rules are obvious and order-independent +# in spirit, but order-dependent in practice (later wins on name collision): +# Pass 1 - replace every {preset: ...} entry in place with the bundled +# broker list. Unknown preset is dropped with a warning. +# Pass 2 - walk left-to-right; if an entry's name already appeared +# earlier, shallow-merge the LATER entry onto the EARLIER one +# and drop the duplicate. Place override entries AFTER preset +# entries to make them win. +# -------------------------------------------------------------------- +def _expand_preset_entries(brokers: List[dict]) -> List[dict]: + """Pass 1: replace every {preset: } entry with the preset's brokers.""" + expanded: List[dict] = [] + for entry in brokers: + if isinstance(entry, dict) and "preset" in entry and "name" not in entry: + preset_name = entry["preset"] + preset = get_preset(preset_name) + preset_brokers = preset.get("brokers", []) if preset else [] + if not preset_brokers: + logger.warning(f"Unknown or empty broker preset '{preset_name}' - skipping") + continue + logger.info(f"Expanded preset '{preset_name}' into {len(preset_brokers)} broker(s)") + expanded.extend(preset_brokers) + else: + expanded.append(entry) + return expanded + + +def _merge_overrides_by_name(brokers: List[dict]) -> List[dict]: + """Pass 2: collapse duplicates by name; the LATER entry's fields win.""" + by_index: Dict[str, int] = {} + result: List[dict] = [] + for entry in brokers: + if not isinstance(entry, dict): + result.append(entry) + continue + name = entry.get("name") + if name and name in by_index: + # Shallow-merge: later entry's keys overwrite earlier entry's keys. + result[by_index[name]] = {**result[by_index[name]], **entry} + else: + if name: + by_index[name] = len(result) + result.append(entry) + return result # ==================================================================== @@ -130,11 +170,13 @@ class _BrokerConnection: if self.base_topic is None: if self.format == "mqtt": + # Custom MQTT family: operator-defined topic, not meshcore convention. self.base_topic = f"meshcore/repeater/{self.node_name}" - elif self.format == "letsmesh": + elif self.format in MC2MQTT_FORMATS: + # MC2MQTT family: canonical meshcoretomqtt topic structure. self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}" else: - logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, using default base topic") + logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, defaulting to MC2MQTT topic structure") self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}" from pymc_core.protocol.utils import PAYLOAD_TYPES @@ -488,6 +530,12 @@ class MeshCoreToMqttPusher: imported_letsmesh_configs = self.convert_letsmesh_to_broker_config(letsmesh_config) brokers.extend(imported_letsmesh_configs) + # Expand any {preset: } entries (from user config OR legacy + # migrators) into concrete broker dicts, then collapse duplicates so + # later overrides win. Result feeds the validation loop unchanged. + brokers = _expand_preset_entries(brokers) + brokers = _merge_overrides_by_name(brokers) + self.brokers = [] if brokers: for broker_config in brokers: @@ -552,62 +600,47 @@ class MeshCoreToMqttPusher: } def convert_letsmesh_to_broker_config(self, letsmesh_cfg: dict) -> List[dict]: - """Convert LetsMesh config format to internal broker config format""" - - brokers = [] - + """Migrate the legacy ``letsmesh:`` config block into preset-style entries. + + Emits a list that the preset-expansion pass will turn into the same + broker dicts the old hard-coded migrator used to produce. Specifically: + + * ``broker_index == -1`` -> both LetsMesh brokers (the preset default). + * ``broker_index == 0`` -> Europe only (US disabled via override). + * ``broker_index == 1`` -> US only (Europe disabled via override). + * ``additional_brokers`` -> appended verbatim after the preset reference. + * ``enabled: false`` -> overrides every preset broker as disabled. + """ enabled = letsmesh_cfg.get("enabled", False) + idx = letsmesh_cfg.get("broker_index", -1) - idx = letsmesh_cfg.get("broker_index", None) - if idx == 0 or idx == 1: - broker_info = LETSMESH_BROKERS[idx] - logger.info(f"Imported LetsMesh broker from 'letsmesh' config: {broker_info['name']}") - brokers.append({ - "enabled": enabled, - "name": broker_info["name"], - "host": broker_info["host"], - "port": broker_info["port"], - "audience": broker_info["audience"], - "use_jwt_auth": True, - "transport": "websockets", - "format": "letsmesh", - "base_topic": None, - "retain_status": False, - "tls": { - "enabled": True, - "insecure": False, - }, - }) - elif idx < 0: - if idx == -1: - brokers.extend({ - "enabled": enabled, - "name": broker_info["name"], - "host": broker_info["host"], - "port": broker_info["port"], - "audience": broker_info["audience"], - "use_jwt_auth": True, - "transport": "websockets", - "format": "letsmesh", - "base_topic": None, - "retain_status": False, - "tls": { - "enabled": True, - "insecure": False, - }, - } for broker_info in LETSMESH_BROKERS) + # Resolve preset broker names so we can build accurate "disable" overrides. + preset_brokers = get_preset("letsmesh").get("brokers", []) + eu_name = preset_brokers[0]["name"] if len(preset_brokers) > 0 else "Europe (LetsMesh v1)" + us_name = preset_brokers[1]["name"] if len(preset_brokers) > 1 else "US West (LetsMesh v1)" - additional = letsmesh_cfg.get("additional_brokers", []) - for add_broker in additional: - logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker['name']}") - brokers.append({ + entries: List[dict] = [{"preset": "letsmesh"}] + + # Honor broker_index by disabling the broker the legacy user did not pick. + if idx == 0: + entries.append({"name": us_name, "enabled": False}) + elif idx == 1: + entries.append({"name": eu_name, "enabled": False}) + + # Honor the legacy enabled flag by overriding every preset broker. + if not enabled: + for b in preset_brokers: + entries.append({"name": b["name"], "enabled": False}) + + # Append any user-defined additional brokers as full entries. + for add_broker in letsmesh_cfg.get("additional_brokers", []): + logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker.get('name')}") + entries.append({ "enabled": enabled, "name": add_broker["name"], "host": add_broker["host"], "port": add_broker["port"], "audience": add_broker["audience"], - "use_jwt_auth": True, - "transport": "websockets", "use_jwt_auth": add_broker.get("use_jwt_auth", True), "transport": add_broker.get("transport", "websockets"), "format": "letsmesh", @@ -616,10 +649,10 @@ class MeshCoreToMqttPusher: "tls": { "enabled": add_broker.get("tls", {}).get("enabled", True), "insecure": add_broker.get("tls", {}).get("insecure", False), - } + }, }) - return brokers + return entries def _on_broker_connected(self, broker_name: str): """Callback when a broker connects""" @@ -811,7 +844,13 @@ class MeshCoreToMqttPusher: def publish_mqtt(self, payload: dict, subtopic: str, retain: bool = False, qos: int = 0): - """Publish message to all connected brokers""" + """Publish message to brokers using the legacy custom-MQTT format only. + + This path exists for diagnostic streams (advert, noise_floor, crc_errors) + that were originally only published to ``format: mqtt`` brokers. MC2MQTT + family brokers (letsmesh, waev, meshcoretomqtt) intentionally skip these + topics today. + """ message = json.dumps(payload) # _BrokerConnection now handles topic prefixing, so we only log the subtopic here @@ -822,6 +861,7 @@ class MeshCoreToMqttPusher: for conn in self.connections: if conn.enabled and conn.is_connected(): if conn.format != "mqtt": + # Custom-MQTT-only path; MC2MQTT brokers are intentionally skipped here. logger.debug(f"Skipped publishing to {conn.broker['name']} (wrong format)") results.append((conn.broker["name"], None)) # Indicate skipped due to format mismatch continue diff --git a/repeater/presets/__init__.py b/repeater/presets/__init__.py new file mode 100644 index 0000000..432c92e --- /dev/null +++ b/repeater/presets/__init__.py @@ -0,0 +1,60 @@ +"""Bundled MQTT broker presets. + +Each sibling ``*.yaml`` file in this package defines a named preset - a +ready-to-use list of broker dicts for a known MeshCoreToMQTT (MC2MQTT) +network. Presets ship with the package, so a ``pip install -U`` is enough +to pick up new endpoints without editing user config. + +Public API: + get_preset(name) -> dict (the parsed YAML, or {} if unknown) + list_presets() -> sorted list of preset names + +The loader is lazy: nothing is read or parsed at import time. The first +call discovers sibling YAML files via ``importlib.resources`` and caches +the parsed dicts for the lifetime of the process. +""" + +from __future__ import annotations + +import logging +from importlib.resources import files +from pathlib import Path +from typing import Dict, List + +import yaml + +logger = logging.getLogger("Presets") + +# Cache of parsed presets, keyed by name (e.g. "waev"). Populated on first +# call to _load_all(); never cleared. +_CACHE: Dict[str, dict] = {} +_LOADED: bool = False + + +def _load_all() -> Dict[str, dict]: + """Discover and parse every bundled ``*.yaml`` file once.""" + global _LOADED + if _LOADED: + return _CACHE + for resource in files(__package__).iterdir(): + # importlib.resources.Traversable: only consider real files ending in .yaml + name = getattr(resource, "name", "") + if not name.endswith(".yaml"): + continue + try: + with resource.open("r", encoding="utf-8") as f: + _CACHE[Path(name).stem] = yaml.safe_load(f) or {} + except Exception as e: # pragma: no cover - defensive + logger.warning(f"Failed to load preset '{name}': {e}") + _LOADED = True + return _CACHE + + +def get_preset(name: str) -> dict: + """Return the parsed preset dict, or ``{}`` if no such preset exists.""" + return _load_all().get(name, {}) + + +def list_presets() -> List[str]: + """Return the sorted list of bundled preset names.""" + return sorted(_load_all().keys()) diff --git a/repeater/presets/letsmesh.yaml b/repeater/presets/letsmesh.yaml new file mode 100644 index 0000000..f35ba8e --- /dev/null +++ b/repeater/presets/letsmesh.yaml @@ -0,0 +1,37 @@ +# LetsMesh MC2MQTT broker preset. +# +# LetsMesh is the operator of the public MeshCore Packet Analyzer +# infrastructure. These brokers speak the MeshCoreToMQTT (MC2MQTT) +# protocol with the "letsmesh" format flavor, which today shares the +# canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...). +# +# Reference all LetsMesh endpoints with: brokers: [{preset: letsmesh}] +# +# Note: order matters for backward compatibility with the legacy +# letsmesh.broker_index field. Index 0 is Europe, index 1 is US West. +brokers: + - name: "Europe (LetsMesh v1)" + enabled: true + host: mqtt-eu-v1.letsmesh.net + port: 443 + transport: "websockets" + audience: "mqtt-eu-v1.letsmesh.net" + use_jwt_auth: true + format: letsmesh + retain_status: false + tls: + enabled: true + insecure: false + + - name: "US West (LetsMesh v1)" + enabled: true + host: mqtt-us-v1.letsmesh.net + port: 443 + transport: "websockets" + audience: "mqtt-us-v1.letsmesh.net" + use_jwt_auth: true + format: letsmesh + retain_status: false + tls: + enabled: true + insecure: false diff --git a/repeater/presets/waev.yaml b/repeater/presets/waev.yaml new file mode 100644 index 0000000..fe4f7a4 --- /dev/null +++ b/repeater/presets/waev.yaml @@ -0,0 +1,35 @@ +# Waev MC2MQTT broker preset. +# +# Waev is a real-time telemetry, monitoring, and analytics platform for +# MeshCore radio networks. These brokers speak the MeshCoreToMQTT +# (MC2MQTT) protocol with the "waev" format flavor, which today shares +# the canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...) +# and is reserved for future Waev-specific deviations. +# +# Reference all Waev endpoints with: brokers: [{preset: waev}] +brokers: + - name: "waev-a" + enabled: true + host: mqtt-a.waev.app + port: 443 + transport: "websockets" + audience: "mqtt-a.waev.app" + use_jwt_auth: true + format: waev + retain_status: true + tls: + enabled: true + insecure: false + + - name: "waev-b" + enabled: true + host: mqtt-b.waev.app + port: 443 + transport: "websockets" + audience: "mqtt-b.waev.app" + use_jwt_auth: true + format: waev + retain_status: true + tls: + enabled: true + insecure: false diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py index cbadbb0..851d2f6 100644 --- a/repeater/web/api_endpoints.py +++ b/repeater/web/api_endpoints.py @@ -1224,6 +1224,13 @@ class APIEndpoints: for i, b in enumerate(brokers): if not isinstance(b, dict): return self._error(f"Broker at index {i} must be an object") + + # Bundled preset reference: {preset: }. Pass through + # unchanged - the MQTT handler expands it on next start. + if "preset" in b and "name" not in b: + validated.append({"preset": str(b["preset"]).strip()}) + continue + for field in ("name", "host", "port", "format"): if not b.get(field, ""): return self._error(f"Broker at index {i} missing required field: {field}") diff --git a/tests/test_presets.py b/tests/test_presets.py new file mode 100644 index 0000000..76c7fa9 --- /dev/null +++ b/tests/test_presets.py @@ -0,0 +1,168 @@ +"""Tests for the bundled MQTT broker preset system. + +Locks the public contract documented in `config.yaml.example` and the +behavior contract in the feat/generalized-mqtt PR. +""" + +import logging + +import pytest + +from repeater.data_acquisition.mqtt_handler import ( + MC2MQTT_FORMATS, + MeshCoreToMqttPusher, + _BrokerConnection, + _expand_preset_entries, + _merge_overrides_by_name, +) +from repeater.presets import get_preset, list_presets + + +# -------------------------------------------------------------------- +# Preset loader contract +# -------------------------------------------------------------------- +def test_list_presets_returns_bundled_names(): + """The shipped wheel must contain at least 'waev' and 'letsmesh'.""" + names = list_presets() + assert "waev" in names + assert "letsmesh" in names + + +def test_get_preset_waev_has_two_brokers(): + """Waev preset shape: top-level 'brokers' list with two MC2MQTT entries.""" + preset = get_preset("waev") + brokers = preset.get("brokers", []) + assert len(brokers) == 2 + for b in brokers: + assert "name" in b + assert "host" in b + assert b.get("format") == "waev" + + +def test_get_preset_unknown_returns_empty_dict(): + """Unknown preset names resolve to {} - no exception.""" + assert get_preset("definitely-not-a-real-preset") == {} + + +# -------------------------------------------------------------------- +# Pass 1: preset expansion +# -------------------------------------------------------------------- +def test_expand_preset_entries_inlines_bundled_brokers(): + """A {preset: waev} entry expands to the two Waev broker dicts.""" + expanded = _expand_preset_entries([{"preset": "waev"}]) + assert len(expanded) == 2 + names = [b["name"] for b in expanded] + assert "waev-a" in names + assert "waev-b" in names + + +def test_expand_preset_entries_drops_unknown_preset_with_warning(caplog): + """An unknown preset is dropped; the daemon does not crash.""" + with caplog.at_level(logging.WARNING, logger="MQTTHandler"): + expanded = _expand_preset_entries([{"preset": "bogus"}]) + assert expanded == [] + assert any("bogus" in record.message for record in caplog.records) + + +# -------------------------------------------------------------------- +# Pass 2: override-by-name merge +# -------------------------------------------------------------------- +def test_merge_overrides_by_name_disables_one_preset_broker(): + """Override AFTER preset wins: documented happy-path.""" + pre_expanded = _expand_preset_entries([{"preset": "waev"}]) + merged = _merge_overrides_by_name(pre_expanded + [{"name": "waev-b", "enabled": False}]) + assert len(merged) == 2 + by_name = {b["name"]: b for b in merged} + assert by_name["waev-a"]["enabled"] is True + assert by_name["waev-b"]["enabled"] is False + + +def test_merge_overrides_by_name_later_wins_documented_rule(): + """Override BEFORE preset is overwritten - locks the documented rule. + + The preset-expanded entry comes after the user's override in this case, + so the preset wins and the user's `enabled: False` is silently lost. This + is the published rule ("place override entries AFTER preset entries"); + this test exists so a future refactor can't quietly flip it. + """ + user_first = [{"name": "waev-b", "enabled": False}] + pipeline = _merge_overrides_by_name(user_first + _expand_preset_entries([{"preset": "waev"}])) + by_name = {b["name"]: b for b in pipeline} + # Preset wins - waev-b is enabled despite the user trying to disable it earlier. + assert by_name["waev-b"]["enabled"] is True + + +# -------------------------------------------------------------------- +# MC2MQTT family parity in topic resolution +# -------------------------------------------------------------------- +def _make_broker_connection(format_value: str) -> _BrokerConnection: + """Build a minimal _BrokerConnection for topic-structure assertions.""" + broker = { + "name": f"test-{format_value}", + "host": "test.example", + "port": 443, + "format": format_value, + "enabled": True, + } + return _BrokerConnection( + broker=broker, + local_identity=object(), + public_key="ABCD" * 16, # 64-char hex stand-in + iata_code="LAX", + jwt_expiry_minutes=10, + email="", + owner="", + broker_index=0, + node_name="testnode", + ) + + +def test_mc2mqtt_formats_share_topic_structure(): + """Every MC2MQTT family member resolves to the canonical topic prefix.""" + expected_mc2mqtt = "meshcore/LAX/" + ("ABCD" * 16) + for fmt in MC2MQTT_FORMATS: + conn = _make_broker_connection(fmt) + assert conn.base_topic == expected_mc2mqtt, f"format '{fmt}' should be MC2MQTT family" + + # Legacy custom-MQTT format uses a different (operator-defined) prefix. + legacy = _make_broker_connection("mqtt") + assert legacy.base_topic == "meshcore/repeater/testnode" + + +# -------------------------------------------------------------------- +# Legacy `letsmesh:` block migration +# -------------------------------------------------------------------- +@pytest.mark.parametrize( + "broker_index, expected_disabled_names", + [ + (-1, set()), # both brokers enabled (preset default) + (0, {"US West (LetsMesh v1)"}), # EU only - US disabled + (1, {"Europe (LetsMesh v1)"}), # US only - EU disabled + ], +) +def test_legacy_letsmesh_block_migrates_to_preset_for_each_broker_index( + broker_index, expected_disabled_names +): + """Legacy letsmesh.broker_index produces the same broker set as before. + + The new migrator emits {preset: letsmesh} plus disable overrides; running + that through the expansion+merge pipeline must preserve the legacy + enabled/disabled topology. + """ + legacy_cfg = {"enabled": True, "broker_index": broker_index} + # Call the unbound method - it doesn't read instance state. + entries = MeshCoreToMqttPusher.convert_letsmesh_to_broker_config( + MeshCoreToMqttPusher.__new__(MeshCoreToMqttPusher), legacy_cfg + ) + + expanded = _expand_preset_entries(entries) + merged = _merge_overrides_by_name(expanded) + + # Always two LetsMesh brokers come out of the pipeline. + assert len(merged) == 2 + by_name = {b["name"]: b for b in merged} + for name, broker in by_name.items(): + if name in expected_disabled_names: + assert broker["enabled"] is False, f"{name} should be disabled for index {broker_index}" + else: + assert broker["enabled"] is True, f"{name} should be enabled for index {broker_index}"