feat: bundled MC2MQTT broker presets (waev, letsmesh) + format family

Introduces a 'set format and forget' workflow for MQTT brokers. Users
reference a bundled preset by name inside the existing brokers: list,
and the package supplies the endpoints, audiences, and TLS settings.
Endpoint changes ship via 'pip install -U' instead of manual edits.

What changes
- New repeater/presets/ package with a tiny lazy YAML loader and two
  bundled presets: waev (mqtt-{a,b}.waev.app) and letsmesh (EU + US).
- New format-family constant MC2MQTT_FORMATS = ('meshcoretomqtt',
  'letsmesh', 'waev') replaces the inline tuple in topic resolution.
  The legacy 'mqtt' format keeps its custom-topic semantics unchanged.
- Two-pass broker assembly in mqtt_handler.py: pass 1 expands every
  {preset: <name>} entry inline; pass 2 collapses duplicates by name
  with later-wins semantics. Place override entries AFTER preset
  entries.
- Hard-coded LETSMESH_BROKERS constant deleted; its data now lives in
  repeater/presets/letsmesh.yaml.
- convert_letsmesh_to_broker_config() collapsed from ~70 to ~25 lines
  by emitting {preset: letsmesh} plus disable overrides for unwanted
  brokers. Honors broker_index in (-1, 0, 1), additional_brokers, and
  enabled flag exactly as before.
- update_mqtt_config API endpoint accepts {preset: <name>} entries and
  passes them through unchanged so the web UI can author them when the
  frontend is updated.
- config.yaml.example documents the preset entry shape, the override
  rule, and the format family hierarchy.
- pyproject.toml ships presets/*.yaml as package data.

How to use
  mqtt_brokers:
    iata_code: "LAX"
    brokers:
      - preset: waev

  # Override a single preset broker:
  brokers:
    - preset: waev
    - name: waev-b
      enabled: false

Tests
- tests/test_presets.py: 9 tests covering loader, expand/merge,
  MC2MQTT topic-family parity, and parametrized legacy migration.

Co-Authored-By: Oz <oz-agent@warp.dev>
This commit is contained in:
Daniel Duran
2026-05-02 15:32:22 -07:00
parent 8bbef50e37
commit dfacfeade8
8 changed files with 455 additions and 91 deletions
+30 -14
View File
@@ -396,7 +396,10 @@ mqtt_brokers:
# use_jwt_auth: true|false # Does this endpoint require JWT auth. Mutually Exclusive with Username & Password fields
# username: "" # Username for basic auth. If empty or missing, uses anonymous access
# password: "" # Password for basic auth. Required if username is set
# format: letsmesh|mqtt
# format: meshcoretomqtt|letsmesh|waev|mqtt
# meshcoretomqtt - canonical open-source MC2MQTT topic structure
# letsmesh, waev - MC2MQTT family flavors (same topic structure, network-specific identity)
# mqtt - legacy pyMC_Repeater local-broker convention (custom topic, singular 'packet')
# retain_status: true|false # Sets MQTT "retain" on status messages so they remain on the broker when disconnected. Also enforces a QOS of 1 (guaranteed delivery)
# tls:
# enabled: true|false # Enable TLS. If the endpoint's certificate is self-signed, the Root CA should be added to the OS's certificate store.
@@ -419,21 +422,34 @@ mqtt_brokers:
# - TRACE # Don't publish trace packets
# - RAW_CUSTOM # Don't publish custom raw packets
# Example of using the US and EU LetsMesh endpoints
# Bundled network presets (recommended). Endpoints ship with the package,
# so URL/audience updates arrive via `pip install -U`. Available presets:
# waev, letsmesh.
#
# brokers:
# - name: US West (LetsMesh v1)
# host: mqtt-us-v1.letsmesh.net
# port: 443
# audience: mqtt-us-v1.letsmesh.net
# use_jwt_auth: true
# - preset: waev
# - preset: letsmesh
#
# Override a preset broker by listing it again with the same name AFTER
# the preset entry. Later entries win on name collision.
#
# brokers:
# - preset: waev
# - name: waev-b
# enabled: false
#
# Mix presets with fully custom brokers in the same list:
#
# brokers:
# - preset: waev
# - name: my-lan-mqtt
# enabled: true
# - name: Europe (LetsMesh v1)
# host: mqtt-eu-v1.letsmesh.net
# port: 443
# audience: mqtt-eu-v1.letsmesh.net
# use_jwt_auth: true
# enabled: true
# host: mqtt.lan
# port: 1883
# transport: tcp
# format: mqtt
# username: repeater
# password: secret
# pyMC_Glass control-plane integration (optional)
glass:
+1
View File
@@ -75,6 +75,7 @@ repeater = [
"web/html/assets/**/*",
"web/*.yaml",
"web/*.html",
"presets/*.yaml",
]
[tool.black]
+117 -77
View File
@@ -18,6 +18,7 @@ except Exception:
UTC = timezone.utc
from repeater import __version__, config
from repeater.presets import get_preset
# Try to import paho-mqtt error code mappings
try:
@@ -36,30 +37,69 @@ logger = logging.getLogger("MQTTHandler")
def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
LETSMESH_BROKERS = [
{
"name": "Europe (LetsMesh v1)",
"host": "mqtt-eu-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-eu-v1.letsmesh.net",
"use_jwt_auth": True,
"tls": {
"enabled": True,
"insecure": False,
},
},
{
"name": "US West (LetsMesh v1)",
"host": "mqtt-us-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-us-v1.letsmesh.net",
"use_jwt_auth": True,
"tls": {
"enabled": True,
"insecure": False,
},
},
]
# --------------------------------------------------------------------
# Format family
# --------------------------------------------------------------------
# Format values that belong to the MeshCoreToMQTT (MC2MQTT) protocol family.
# All MC2MQTT brokers share the topic structure: meshcore/{IATA}/{PUBLIC_KEY}/...
# Adding a new MC2MQTT-compatible platform = add its format value here AND
# drop a matching presets/<name>.yaml. The legacy "mqtt" format is NOT in this
# tuple - it speaks the operator-defined custom topic structure instead.
MC2MQTT_FORMATS = ("meshcoretomqtt", "letsmesh", "waev")
# --------------------------------------------------------------------
# Preset expansion helpers (used during broker-list assembly below).
#
# A user's brokers: list may contain a mix of:
# * preset references: {preset: <name>} -> expands to N broker dicts
# * full broker dicts: {name, host, port, ...} -> used as-is
#
# Expansion is two passes so the rules are obvious and order-independent
# in spirit, but order-dependent in practice (later wins on name collision):
# Pass 1 - replace every {preset: ...} entry in place with the bundled
# broker list. Unknown preset is dropped with a warning.
# Pass 2 - walk left-to-right; if an entry's name already appeared
# earlier, shallow-merge the LATER entry onto the EARLIER one
# and drop the duplicate. Place override entries AFTER preset
# entries to make them win.
# --------------------------------------------------------------------
def _expand_preset_entries(brokers: List[dict]) -> List[dict]:
"""Pass 1: replace every {preset: <name>} entry with the preset's brokers."""
expanded: List[dict] = []
for entry in brokers:
if isinstance(entry, dict) and "preset" in entry and "name" not in entry:
preset_name = entry["preset"]
preset = get_preset(preset_name)
preset_brokers = preset.get("brokers", []) if preset else []
if not preset_brokers:
logger.warning(f"Unknown or empty broker preset '{preset_name}' - skipping")
continue
logger.info(f"Expanded preset '{preset_name}' into {len(preset_brokers)} broker(s)")
expanded.extend(preset_brokers)
else:
expanded.append(entry)
return expanded
def _merge_overrides_by_name(brokers: List[dict]) -> List[dict]:
"""Pass 2: collapse duplicates by name; the LATER entry's fields win."""
by_index: Dict[str, int] = {}
result: List[dict] = []
for entry in brokers:
if not isinstance(entry, dict):
result.append(entry)
continue
name = entry.get("name")
if name and name in by_index:
# Shallow-merge: later entry's keys overwrite earlier entry's keys.
result[by_index[name]] = {**result[by_index[name]], **entry}
else:
if name:
by_index[name] = len(result)
result.append(entry)
return result
# ====================================================================
@@ -130,11 +170,13 @@ class _BrokerConnection:
if self.base_topic is None:
if self.format == "mqtt":
# Custom MQTT family: operator-defined topic, not meshcore convention.
self.base_topic = f"meshcore/repeater/{self.node_name}"
elif self.format == "letsmesh":
elif self.format in MC2MQTT_FORMATS:
# MC2MQTT family: canonical meshcoretomqtt topic structure.
self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}"
else:
logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, using default base topic")
logger.warning(f"Unknown broker format '{self.format}' for {self.broker['name']}, defaulting to MC2MQTT topic structure")
self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}"
from pymc_core.protocol.utils import PAYLOAD_TYPES
@@ -488,6 +530,12 @@ class MeshCoreToMqttPusher:
imported_letsmesh_configs = self.convert_letsmesh_to_broker_config(letsmesh_config)
brokers.extend(imported_letsmesh_configs)
# Expand any {preset: <name>} entries (from user config OR legacy
# migrators) into concrete broker dicts, then collapse duplicates so
# later overrides win. Result feeds the validation loop unchanged.
brokers = _expand_preset_entries(brokers)
brokers = _merge_overrides_by_name(brokers)
self.brokers = []
if brokers:
for broker_config in brokers:
@@ -552,62 +600,47 @@ class MeshCoreToMqttPusher:
}
def convert_letsmesh_to_broker_config(self, letsmesh_cfg: dict) -> List[dict]:
"""Convert LetsMesh config format to internal broker config format"""
brokers = []
"""Migrate the legacy ``letsmesh:`` config block into preset-style entries.
Emits a list that the preset-expansion pass will turn into the same
broker dicts the old hard-coded migrator used to produce. Specifically:
* ``broker_index == -1`` -> both LetsMesh brokers (the preset default).
* ``broker_index == 0`` -> Europe only (US disabled via override).
* ``broker_index == 1`` -> US only (Europe disabled via override).
* ``additional_brokers`` -> appended verbatim after the preset reference.
* ``enabled: false`` -> overrides every preset broker as disabled.
"""
enabled = letsmesh_cfg.get("enabled", False)
idx = letsmesh_cfg.get("broker_index", -1)
idx = letsmesh_cfg.get("broker_index", None)
if idx == 0 or idx == 1:
broker_info = LETSMESH_BROKERS[idx]
logger.info(f"Imported LetsMesh broker from 'letsmesh' config: {broker_info['name']}")
brokers.append({
"enabled": enabled,
"name": broker_info["name"],
"host": broker_info["host"],
"port": broker_info["port"],
"audience": broker_info["audience"],
"use_jwt_auth": True,
"transport": "websockets",
"format": "letsmesh",
"base_topic": None,
"retain_status": False,
"tls": {
"enabled": True,
"insecure": False,
},
})
elif idx < 0:
if idx == -1:
brokers.extend({
"enabled": enabled,
"name": broker_info["name"],
"host": broker_info["host"],
"port": broker_info["port"],
"audience": broker_info["audience"],
"use_jwt_auth": True,
"transport": "websockets",
"format": "letsmesh",
"base_topic": None,
"retain_status": False,
"tls": {
"enabled": True,
"insecure": False,
},
} for broker_info in LETSMESH_BROKERS)
# Resolve preset broker names so we can build accurate "disable" overrides.
preset_brokers = get_preset("letsmesh").get("brokers", [])
eu_name = preset_brokers[0]["name"] if len(preset_brokers) > 0 else "Europe (LetsMesh v1)"
us_name = preset_brokers[1]["name"] if len(preset_brokers) > 1 else "US West (LetsMesh v1)"
additional = letsmesh_cfg.get("additional_brokers", [])
for add_broker in additional:
logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker['name']}")
brokers.append({
entries: List[dict] = [{"preset": "letsmesh"}]
# Honor broker_index by disabling the broker the legacy user did not pick.
if idx == 0:
entries.append({"name": us_name, "enabled": False})
elif idx == 1:
entries.append({"name": eu_name, "enabled": False})
# Honor the legacy enabled flag by overriding every preset broker.
if not enabled:
for b in preset_brokers:
entries.append({"name": b["name"], "enabled": False})
# Append any user-defined additional brokers as full entries.
for add_broker in letsmesh_cfg.get("additional_brokers", []):
logger.info(f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker.get('name')}")
entries.append({
"enabled": enabled,
"name": add_broker["name"],
"host": add_broker["host"],
"port": add_broker["port"],
"audience": add_broker["audience"],
"use_jwt_auth": True,
"transport": "websockets",
"use_jwt_auth": add_broker.get("use_jwt_auth", True),
"transport": add_broker.get("transport", "websockets"),
"format": "letsmesh",
@@ -616,10 +649,10 @@ class MeshCoreToMqttPusher:
"tls": {
"enabled": add_broker.get("tls", {}).get("enabled", True),
"insecure": add_broker.get("tls", {}).get("insecure", False),
}
},
})
return brokers
return entries
def _on_broker_connected(self, broker_name: str):
"""Callback when a broker connects"""
@@ -811,7 +844,13 @@ class MeshCoreToMqttPusher:
def publish_mqtt(self, payload: dict, subtopic: str, retain: bool = False, qos: int = 0):
"""Publish message to all connected brokers"""
"""Publish message to brokers using the legacy custom-MQTT format only.
This path exists for diagnostic streams (advert, noise_floor, crc_errors)
that were originally only published to ``format: mqtt`` brokers. MC2MQTT
family brokers (letsmesh, waev, meshcoretomqtt) intentionally skip these
topics today.
"""
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
@@ -822,6 +861,7 @@ class MeshCoreToMqttPusher:
for conn in self.connections:
if conn.enabled and conn.is_connected():
if conn.format != "mqtt":
# Custom-MQTT-only path; MC2MQTT brokers are intentionally skipped here.
logger.debug(f"Skipped publishing to {conn.broker['name']} (wrong format)")
results.append((conn.broker["name"], None)) # Indicate skipped due to format mismatch
continue
+60
View File
@@ -0,0 +1,60 @@
"""Bundled MQTT broker presets.
Each sibling ``*.yaml`` file in this package defines a named preset - a
ready-to-use list of broker dicts for a known MeshCoreToMQTT (MC2MQTT)
network. Presets ship with the package, so a ``pip install -U`` is enough
to pick up new endpoints without editing user config.
Public API:
get_preset(name) -> dict (the parsed YAML, or {} if unknown)
list_presets() -> sorted list of preset names
The loader is lazy: nothing is read or parsed at import time. The first
call discovers sibling YAML files via ``importlib.resources`` and caches
the parsed dicts for the lifetime of the process.
"""
from __future__ import annotations
import logging
from importlib.resources import files
from pathlib import Path
from typing import Dict, List
import yaml
logger = logging.getLogger("Presets")
# Cache of parsed presets, keyed by name (e.g. "waev"). Populated on first
# call to _load_all(); never cleared.
_CACHE: Dict[str, dict] = {}
_LOADED: bool = False
def _load_all() -> Dict[str, dict]:
"""Discover and parse every bundled ``*.yaml`` file once."""
global _LOADED
if _LOADED:
return _CACHE
for resource in files(__package__).iterdir():
# importlib.resources.Traversable: only consider real files ending in .yaml
name = getattr(resource, "name", "")
if not name.endswith(".yaml"):
continue
try:
with resource.open("r", encoding="utf-8") as f:
_CACHE[Path(name).stem] = yaml.safe_load(f) or {}
except Exception as e: # pragma: no cover - defensive
logger.warning(f"Failed to load preset '{name}': {e}")
_LOADED = True
return _CACHE
def get_preset(name: str) -> dict:
"""Return the parsed preset dict, or ``{}`` if no such preset exists."""
return _load_all().get(name, {})
def list_presets() -> List[str]:
"""Return the sorted list of bundled preset names."""
return sorted(_load_all().keys())
+37
View File
@@ -0,0 +1,37 @@
# LetsMesh MC2MQTT broker preset.
#
# LetsMesh is the operator of the public MeshCore Packet Analyzer
# infrastructure. These brokers speak the MeshCoreToMQTT (MC2MQTT)
# protocol with the "letsmesh" format flavor, which today shares the
# canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...).
#
# Reference all LetsMesh endpoints with: brokers: [{preset: letsmesh}]
#
# Note: order matters for backward compatibility with the legacy
# letsmesh.broker_index field. Index 0 is Europe, index 1 is US West.
brokers:
- name: "Europe (LetsMesh v1)"
enabled: true
host: mqtt-eu-v1.letsmesh.net
port: 443
transport: "websockets"
audience: "mqtt-eu-v1.letsmesh.net"
use_jwt_auth: true
format: letsmesh
retain_status: false
tls:
enabled: true
insecure: false
- name: "US West (LetsMesh v1)"
enabled: true
host: mqtt-us-v1.letsmesh.net
port: 443
transport: "websockets"
audience: "mqtt-us-v1.letsmesh.net"
use_jwt_auth: true
format: letsmesh
retain_status: false
tls:
enabled: true
insecure: false
+35
View File
@@ -0,0 +1,35 @@
# Waev MC2MQTT broker preset.
#
# Waev is a real-time telemetry, monitoring, and analytics platform for
# MeshCore radio networks. These brokers speak the MeshCoreToMQTT
# (MC2MQTT) protocol with the "waev" format flavor, which today shares
# the canonical MC2MQTT topic structure (meshcore/{IATA}/{PUBLIC_KEY}/...)
# and is reserved for future Waev-specific deviations.
#
# Reference all Waev endpoints with: brokers: [{preset: waev}]
brokers:
- name: "waev-a"
enabled: true
host: mqtt-a.waev.app
port: 443
transport: "websockets"
audience: "mqtt-a.waev.app"
use_jwt_auth: true
format: waev
retain_status: true
tls:
enabled: true
insecure: false
- name: "waev-b"
enabled: true
host: mqtt-b.waev.app
port: 443
transport: "websockets"
audience: "mqtt-b.waev.app"
use_jwt_auth: true
format: waev
retain_status: true
tls:
enabled: true
insecure: false
+7
View File
@@ -1224,6 +1224,13 @@ class APIEndpoints:
for i, b in enumerate(brokers):
if not isinstance(b, dict):
return self._error(f"Broker at index {i} must be an object")
# Bundled preset reference: {preset: <name>}. Pass through
# unchanged - the MQTT handler expands it on next start.
if "preset" in b and "name" not in b:
validated.append({"preset": str(b["preset"]).strip()})
continue
for field in ("name", "host", "port", "format"):
if not b.get(field, ""):
return self._error(f"Broker at index {i} missing required field: {field}")
+168
View File
@@ -0,0 +1,168 @@
"""Tests for the bundled MQTT broker preset system.
Locks the public contract documented in `config.yaml.example` and the
behavior contract in the feat/generalized-mqtt PR.
"""
import logging
import pytest
from repeater.data_acquisition.mqtt_handler import (
MC2MQTT_FORMATS,
MeshCoreToMqttPusher,
_BrokerConnection,
_expand_preset_entries,
_merge_overrides_by_name,
)
from repeater.presets import get_preset, list_presets
# --------------------------------------------------------------------
# Preset loader contract
# --------------------------------------------------------------------
def test_list_presets_returns_bundled_names():
"""The shipped wheel must contain at least 'waev' and 'letsmesh'."""
names = list_presets()
assert "waev" in names
assert "letsmesh" in names
def test_get_preset_waev_has_two_brokers():
"""Waev preset shape: top-level 'brokers' list with two MC2MQTT entries."""
preset = get_preset("waev")
brokers = preset.get("brokers", [])
assert len(brokers) == 2
for b in brokers:
assert "name" in b
assert "host" in b
assert b.get("format") == "waev"
def test_get_preset_unknown_returns_empty_dict():
"""Unknown preset names resolve to {} - no exception."""
assert get_preset("definitely-not-a-real-preset") == {}
# --------------------------------------------------------------------
# Pass 1: preset expansion
# --------------------------------------------------------------------
def test_expand_preset_entries_inlines_bundled_brokers():
"""A {preset: waev} entry expands to the two Waev broker dicts."""
expanded = _expand_preset_entries([{"preset": "waev"}])
assert len(expanded) == 2
names = [b["name"] for b in expanded]
assert "waev-a" in names
assert "waev-b" in names
def test_expand_preset_entries_drops_unknown_preset_with_warning(caplog):
"""An unknown preset is dropped; the daemon does not crash."""
with caplog.at_level(logging.WARNING, logger="MQTTHandler"):
expanded = _expand_preset_entries([{"preset": "bogus"}])
assert expanded == []
assert any("bogus" in record.message for record in caplog.records)
# --------------------------------------------------------------------
# Pass 2: override-by-name merge
# --------------------------------------------------------------------
def test_merge_overrides_by_name_disables_one_preset_broker():
"""Override AFTER preset wins: documented happy-path."""
pre_expanded = _expand_preset_entries([{"preset": "waev"}])
merged = _merge_overrides_by_name(pre_expanded + [{"name": "waev-b", "enabled": False}])
assert len(merged) == 2
by_name = {b["name"]: b for b in merged}
assert by_name["waev-a"]["enabled"] is True
assert by_name["waev-b"]["enabled"] is False
def test_merge_overrides_by_name_later_wins_documented_rule():
"""Override BEFORE preset is overwritten - locks the documented rule.
The preset-expanded entry comes after the user's override in this case,
so the preset wins and the user's `enabled: False` is silently lost. This
is the published rule ("place override entries AFTER preset entries");
this test exists so a future refactor can't quietly flip it.
"""
user_first = [{"name": "waev-b", "enabled": False}]
pipeline = _merge_overrides_by_name(user_first + _expand_preset_entries([{"preset": "waev"}]))
by_name = {b["name"]: b for b in pipeline}
# Preset wins - waev-b is enabled despite the user trying to disable it earlier.
assert by_name["waev-b"]["enabled"] is True
# --------------------------------------------------------------------
# MC2MQTT family parity in topic resolution
# --------------------------------------------------------------------
def _make_broker_connection(format_value: str) -> _BrokerConnection:
"""Build a minimal _BrokerConnection for topic-structure assertions."""
broker = {
"name": f"test-{format_value}",
"host": "test.example",
"port": 443,
"format": format_value,
"enabled": True,
}
return _BrokerConnection(
broker=broker,
local_identity=object(),
public_key="ABCD" * 16, # 64-char hex stand-in
iata_code="LAX",
jwt_expiry_minutes=10,
email="",
owner="",
broker_index=0,
node_name="testnode",
)
def test_mc2mqtt_formats_share_topic_structure():
"""Every MC2MQTT family member resolves to the canonical topic prefix."""
expected_mc2mqtt = "meshcore/LAX/" + ("ABCD" * 16)
for fmt in MC2MQTT_FORMATS:
conn = _make_broker_connection(fmt)
assert conn.base_topic == expected_mc2mqtt, f"format '{fmt}' should be MC2MQTT family"
# Legacy custom-MQTT format uses a different (operator-defined) prefix.
legacy = _make_broker_connection("mqtt")
assert legacy.base_topic == "meshcore/repeater/testnode"
# --------------------------------------------------------------------
# Legacy `letsmesh:` block migration
# --------------------------------------------------------------------
@pytest.mark.parametrize(
"broker_index, expected_disabled_names",
[
(-1, set()), # both brokers enabled (preset default)
(0, {"US West (LetsMesh v1)"}), # EU only - US disabled
(1, {"Europe (LetsMesh v1)"}), # US only - EU disabled
],
)
def test_legacy_letsmesh_block_migrates_to_preset_for_each_broker_index(
broker_index, expected_disabled_names
):
"""Legacy letsmesh.broker_index produces the same broker set as before.
The new migrator emits {preset: letsmesh} plus disable overrides; running
that through the expansion+merge pipeline must preserve the legacy
enabled/disabled topology.
"""
legacy_cfg = {"enabled": True, "broker_index": broker_index}
# Call the unbound method - it doesn't read instance state.
entries = MeshCoreToMqttPusher.convert_letsmesh_to_broker_config(
MeshCoreToMqttPusher.__new__(MeshCoreToMqttPusher), legacy_cfg
)
expanded = _expand_preset_entries(entries)
merged = _merge_overrides_by_name(expanded)
# Always two LetsMesh brokers come out of the pipeline.
assert len(merged) == 2
by_name = {b["name"]: b for b in merged}
for name, broker in by_name.items():
if name in expected_disabled_names:
assert broker["enabled"] is False, f"{name} should be disabled for index {broker_index}"
else:
assert broker["enabled"] is True, f"{name} should be enabled for index {broker_index}"