Files
Lloyd 45a44eb47b Refactor test cases and base code for consistency and readability
- Updated byte representations in tests to use lowercase hex format for consistency.
- Reformatted code for better readability, including line breaks and indentation adjustments.
- Consolidated multiple lines into single lines where appropriate to enhance clarity.
- Ensured that all test cases maintain consistent formatting and style across the test suite.
2026-05-27 20:15:10 +01:00

1171 lines
47 KiB
Python

import base64
import binascii
import json
import logging
import threading
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, Dict, List, Optional
import paho.mqtt.client as mqtt
from repeater import __version__
from repeater.presets import get_preset
# Try to import paho-mqtt error code mappings
try:
from paho.mqtt.reasoncodes import ReasonCode
HAS_REASON_CODES = True
except ImportError:
ReasonCode = None
HAS_REASON_CODES = False
logger = logging.getLogger("MQTTHandler")
# Custom ultra-verbose level for high-frequency per-broker publish logs.
# Keeps default DEBUG useful while allowing deep diagnostics when needed.
TRACE_LEVEL = 5
logging.addLevelName(TRACE_LEVEL, "TRACE")
def _trace(message: str):
if logger.isEnabledFor(TRACE_LEVEL):
logger.log(TRACE_LEVEL, message)
# --------------------------------------------------------------------
# Helper: Base64URL without padding
# --------------------------------------------------------------------
def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
def _summarize_payload_for_log(payload: Any, message: Optional[str] = None) -> str:
"""Return a compact single-line payload summary for debug logging."""
if message is None:
try:
message = json.dumps(payload)
except Exception:
message = str(payload)
if isinstance(payload, list):
item_bits = []
if payload and isinstance(payload[0], dict):
item_bits.append(_summarize_payload_for_log(payload[0]))
return f"items={len(payload)}, bytes={len(message.encode('utf-8'))}" + (
f", first={{ {item_bits[0]} }}" if item_bits else ""
)
if not isinstance(payload, dict):
return f"type={type(payload).__name__}, bytes={len(message.encode('utf-8'))}"
summary_fields = []
for key in ("type", "packet_type", "route", "origin", "status", "hash", "RSSI", "SNR"):
value = payload.get(key)
if value not in (None, ""):
summary_fields.append(f"{key}={value}")
if "len" in payload:
summary_fields.append(f"len={payload['len']}")
if "payload_len" in payload:
summary_fields.append(f"payload_len={payload['payload_len']}")
if "raw" in payload:
summary_fields.append(f"raw_bytes={len(str(payload['raw'])) // 2}")
if not summary_fields:
keys = ",".join(sorted(payload.keys())[:6])
if len(payload) > 6:
keys += ",..."
summary_fields.append(f"keys={keys}")
summary_fields.append(f"bytes={len(message.encode('utf-8'))}")
return ", ".join(summary_fields)
def _truncate_middle(text: str, prefix: int = 24, suffix: int = 16) -> str:
"""Truncate long strings by keeping the beginning and end visible."""
if len(text) <= prefix + suffix + 5:
return text
return f"{text[:prefix]} ... {text[-suffix:]}"
# --------------------------------------------------------------------
# Format family
# --------------------------------------------------------------------
# Format values that belong to the MeshCoreToMQTT (MC2MQTT) protocol family.
# All MC2MQTT brokers share the topic structure: meshcore/{IATA}/{PUBLIC_KEY}/...
# Adding a new MC2MQTT-compatible platform = add its format value here AND
# drop a matching presets/<name>.yaml. The legacy "mqtt" format is NOT in this
# tuple - it speaks the operator-defined custom topic structure instead.
MC2MQTT_FORMATS = ("meshcoretomqtt", "letsmesh", "waev")
# --------------------------------------------------------------------
# Preset expansion helpers (used during broker-list assembly below).
#
# A user's brokers: list may contain a mix of:
# * preset references: {preset: <name>} -> expands to N broker dicts
# * full broker dicts: {name, host, port, ...} -> used as-is
#
# Expansion is two passes so the rules are obvious and order-independent
# in spirit, but order-dependent in practice (later wins on name collision):
# Pass 1 - replace every {preset: ...} entry in place with the bundled
# broker list. Unknown preset is dropped with a warning.
# Pass 2 - walk left-to-right; if an entry's name already appeared
# earlier, shallow-merge the LATER entry onto the EARLIER one
# and drop the duplicate. Place override entries AFTER preset
# entries to make them win.
# --------------------------------------------------------------------
def _expand_preset_entries(brokers: List[dict]) -> List[dict]:
"""Pass 1: replace every {preset: <name>} entry with the preset's brokers."""
expanded: List[dict] = []
for entry in brokers:
if isinstance(entry, dict) and "preset" in entry and "name" not in entry:
preset_name = entry["preset"]
preset = get_preset(preset_name)
preset_brokers = preset.get("brokers", []) if preset else []
if not preset_brokers:
logger.warning(f"Unknown or empty broker preset '{preset_name}' - skipping")
continue
logger.info(f"Expanded preset '{preset_name}' into {len(preset_brokers)} broker(s)")
expanded.extend(preset_brokers)
else:
expanded.append(entry)
return expanded
def _merge_overrides_by_name(brokers: List[dict]) -> List[dict]:
"""Pass 2: collapse duplicates by name; the LATER entry's fields win."""
by_index: Dict[str, int] = {}
result: List[dict] = []
for entry in brokers:
if not isinstance(entry, dict):
result.append(entry)
continue
name = entry.get("name")
if name and name in by_index:
# Shallow-merge: later entry's keys overwrite earlier entry's keys.
result[by_index[name]] = {**result[by_index[name]], **entry}
else:
if name:
by_index[name] = len(result)
result.append(entry)
return result
# ====================================================================
# Single Broker Connection Manager
# ====================================================================
class _BrokerConnection:
"""
Manages a single MQTT broker connection with independent lifecycle.
Internal class - not exposed publicly.
"""
def __init__(
self,
broker: dict,
local_identity,
public_key: str,
iata_code: str,
jwt_expiry_minutes: int,
email: str,
owner: str,
broker_index: int,
node_name: str,
on_connect_callback: Optional[Callable] = None,
on_disconnect_callback: Optional[Callable] = None,
):
self.broker = broker
self.local_identity = local_identity
self.public_key = public_key.upper()
self.iata_code = iata_code
self.jwt_expiry_minutes = jwt_expiry_minutes
self.email = email
self.owner = owner
self.node_name = node_name
self.broker_index = broker_index
self._on_connect_callback = on_connect_callback
self._on_disconnect_callback = on_disconnect_callback
self._connect_time = None
self._running = False
self._reconnect_attempts = 0
self._reconnect_timer = None
self._max_reconnect_delay = 300 # 5 minutes max
self._keepalive = broker.get(
"keepalive", 30
) # default tighter than paho's 60s to beat NAT/proxy timeouts
self._jwt_refresh_timer = None
self._shutdown_requested = False
self._last_jwt_claims = None
self.transport = broker.get("transport", "websockets")
self.use_jwt_auth = broker.get("use_jwt_auth", False)
self.username = broker.get("username", None)
self.password = broker.get("password", None)
self.format = broker.get("format", "letsmesh")
self.tls = broker.get(
"tls",
{
"enabled": False,
"insecure": False,
},
)
client_id = f"meshcore_{self.public_key}_{broker['host']}_{self.format}"
client_kwargs = {
"client_id": client_id,
"transport": self.transport,
}
# Prefer callback API v2 when available (paho-mqtt>=2.x) to avoid
# deprecation warnings from the legacy callback API v1.
callback_api = getattr(mqtt, "CallbackAPIVersion", None)
if callback_api is not None and hasattr(callback_api, "VERSION2"):
client_kwargs["callback_api_version"] = callback_api.VERSION2
try:
self.client = mqtt.Client(**client_kwargs)
except TypeError:
# Backward-compatibility fallback for older paho versions.
client_kwargs.pop("callback_api_version", None)
self.client = mqtt.Client(**client_kwargs)
if hasattr(self.client, "on_pre_connect"):
self.client.on_pre_connect = self._on_pre_connect
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
# If None, will be use defaults depending on the format value
self.base_topic = broker.get("base_topic", None)
self.enabled = broker.get("enabled", False)
self.retain_status = broker.get("retain_status", False)
self._tls_verified = False
if self.base_topic is None:
if self.format == "mqtt":
# Custom MQTT family: operator-defined topic, not meshcore convention.
self.base_topic = f"meshcore/repeater/{self.node_name}"
elif self.format in MC2MQTT_FORMATS:
# MC2MQTT family: canonical meshcoretomqtt topic structure.
self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}"
else:
logger.warning(
f"Unknown broker format '{self.format}' for {self.broker['name']}, defaulting to MC2MQTT topic structure"
)
self.base_topic = f"meshcore/{self.iata_code}/{self.public_key}"
from pymc_core.protocol.utils import PAYLOAD_TYPES
disallowed_types = broker.get("disallowed_packet_types", [])
type_name_map = {name: code for code, name in PAYLOAD_TYPES.items()}
self.disallowed_types = [type_name_map.get(name.upper(), None) for name in disallowed_types]
self.disallowed_types = [
val for val in self.disallowed_types if val is not None
] # Filter out invalid names
def _generate_jwt(self) -> str:
"""Generate MeshCore-style Ed25519 JWT token"""
now = datetime.now(timezone.utc)
header = {"alg": "Ed25519", "typ": "JWT"}
payload = {
"publicKey": self.public_key.upper(),
"aud": self.broker["audience"],
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()),
}
if "audience" in self.broker:
payload["aud"] = self.broker["audience"]
# Only include email/owner for verified TLS connections
if (
self.tls
and self.tls.get("enabled", False)
and self._tls_verified
and (self.email or self.owner)
):
payload["email"] = self.email
payload["owner"] = self.owner
else:
payload["email"] = ""
payload["owner"] = ""
self._last_jwt_claims = {
"aud": payload.get("aud"),
"iat": payload.get("iat"),
"exp": payload.get("exp"),
"public_key_suffix": self.public_key[-12:],
}
# Encode header and payload (compact JSON - no spaces)
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
# Sign using LocalIdentity (supports both standard and firmware keys)
try:
signature = self.local_identity.sign(signing_input)
except Exception as e:
logger.error(f"JWT signing failed for {self.broker['name']}: {e}")
logger.error(f" - public_key: {self.public_key}")
logger.error(f" - signing_input length: {len(signing_input)}")
raise
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
logger.debug(f"JWT token generated for {self.broker['name']}: {token[:50]}...")
return token
def _on_connect(self, client, userdata, flags, rc, properties=None):
"""MQTT connection callback"""
rc_value = int(getattr(rc, "value", rc)) if rc is not None else -1
if rc_value == 0:
logger.info(f"Connected to {self.broker['name']}")
self._running = True
self._reconnect_attempts = 0 # Reset counter on success
# Successful connect can race with a previously scheduled timer; cancel it.
if self._reconnect_timer:
self._reconnect_timer.cancel()
self._reconnect_timer = None
if self.use_jwt_auth:
self._schedule_jwt_refresh() # Schedule proactive JWT refresh
if self._on_connect_callback:
self._on_connect_callback(self.broker["name"])
else:
error_msg = get_mqtt_error_message(rc_value, is_disconnect=False)
logger.error(f"Failed to connect to {self.broker['name']}: {error_msg}")
self._schedule_reconnect(reason=error_msg)
def _on_pre_connect(self, client, userdata):
"""Refresh credentials before each connect/reconnect attempt."""
if self._shutdown_requested:
return
if self.use_jwt_auth:
self._set_credentials()
def _on_disconnect(self, client, userdata, rc, *extra):
"""MQTT disconnection callback"""
# Callback API v2 passes: (client, userdata, disconnect_flags, reason_code, properties)
# while API v1 passes: (client, userdata, rc). Normalize to integer rc.
if not isinstance(rc, (int, float)) and extra:
rc = extra[0]
rc_value = int(getattr(rc, "value", rc)) if rc is not None else -1
was_running = self._running
self._running = False
if self._shutdown_requested:
logger.info(f"Clean disconnect from {self.broker['name']}")
if self._on_disconnect_callback:
self._on_disconnect_callback(self.broker["name"])
return
if rc_value != 0: # Unexpected disconnect
error_msg = get_mqtt_error_message(rc_value, is_disconnect=True)
if was_running:
logger.warning(
f"Disconnected from {self.broker['name']} (rc={rc_value}): {error_msg}"
)
else:
logger.debug(
f"Duplicate disconnect callback from {self.broker['name']} while already disconnected "
f"(rc={rc_value}): {error_msg}"
)
if was_running: # Only reconnect if we were intentionally connected
self._schedule_reconnect(reason=error_msg)
else:
logger.info(f"Clean disconnect from {self.broker['name']}")
if self._on_disconnect_callback:
self._on_disconnect_callback(self.broker["name"])
def _schedule_reconnect(self, reason: str = "connection lost"):
"""Schedule reconnection with exponential backoff"""
if self._shutdown_requested:
return
if self._reconnect_timer:
self._reconnect_timer.cancel()
# Exponential backoff: 5s, 10s, 20s, 40s, 80s, up to max
delay = min(5 * (2**self._reconnect_attempts), self._max_reconnect_delay)
self._reconnect_attempts += 1
logger.info(
f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts}, reason: {reason})"
)
self._reconnect_timer = threading.Timer(delay, lambda: self._attempt_reconnect(reason))
self._reconnect_timer.daemon = True
self._reconnect_timer.start()
def _attempt_reconnect(self, reason: str = "connection lost"):
"""Attempt to reconnect to broker with fresh JWT"""
if self._shutdown_requested:
return
# Timer has fired; clear handle so state reflects reality.
self._reconnect_timer = None
if self._running:
logger.debug(f"Skipping reconnect to {self.broker['name']} - already connected")
return
try:
logger.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...")
# Stop the loop if it's still running (websocket mode requires clean restart)
try:
self.client.loop_stop()
except Exception as e:
logger.debug(
f"loop_stop during reconnect was ignored for {self.broker['name']}: {e}"
)
self._set_credentials()
# Reconnect and restart loop
self.client.connect(self.broker["host"], self.broker["port"], keepalive=self._keepalive)
self.client.loop_start()
self._loop_running = True
except Exception as e:
logger.error(f"Reconnection failed for {self.broker['name']}: {e}")
self._schedule_reconnect() # Try again later
def _set_credentials(self):
"""Set credentials before connecting (CONNECT handshake only)"""
try:
if self.use_jwt_auth:
logger.debug(f"Generating JWT credentials for {self.broker['name']}...")
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
logger.debug(
f"Credentials set for {self.broker['name']}: "
f"user=v1_{self.public_key[:8]}...{self.public_key[-8:]}"
)
elif self.username and self.password:
logger.info(
f"Using provided credentials for {self.broker['name']} (username: {self.username})"
)
self.client.username_pw_set(username=self.username, password=self.password)
else:
logger.info(
f"No credentials set for {self.broker['name']} (JWT auth disabled and no username/password provided)"
)
self._connect_time = datetime.now(timezone.utc)
except Exception as e:
logger.error(f"Failed to set JWT credentials for {self.broker['name']}: {e}")
raise
def connect(self):
"""Establish connection to broker"""
self._shutdown_requested = False
# Conditional TLS setup
if not self.enabled:
logger.info(f"Connection to {self.broker['name']} is disabled in configuration")
return
if self.transport == "websockets":
protocol = "ws"
elif self.transport == "tcp":
protocol = "mqtt"
else:
raise ValueError(f"Invalid transport '{self.transport}' for {self.broker['name']}")
# Setup TLS independent of transport - MQTT over TLS can be used with both websockets and raw TCP
if self.tls and self.tls.get("enabled", False):
import ssl
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)
self.client.tls_insecure_set(self.tls.get("insecure", False))
self._tls_verified = True
# Ensure to update the protocol is we're running TLS on websockets
if self.transport == "websockets":
protocol = "wss"
# Set JWT credentials before CONNECT handshake
self._set_credentials()
logger.info(
f"Connecting to {self.broker['name']} "
f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..."
)
self.client.connect(self.broker["host"], self.broker["port"], keepalive=self._keepalive)
self.client.loop_start()
self._loop_running = True
def disconnect(self):
"""Disconnect from broker"""
self._shutdown_requested = False
self._running = False
self._loop_running = False
# Cancel any pending timers
if self._reconnect_timer:
self._reconnect_timer.cancel()
self._reconnect_timer = None
if self._jwt_refresh_timer:
self._jwt_refresh_timer.cancel()
self._jwt_refresh_timer = None
self.client.loop_stop()
self.client.disconnect()
logger.info(f"Disconnected from {self.broker['name']}")
def publish(self, subtopic: str, payload: str, retain: bool = False, qos: int = 0):
"""Publish message to broker"""
# Legacy MQTT config uses singular "packet" topic, while LetsMesh uses "packets". Handle this for compatibility.
if self.format == "mqtt" and subtopic == "packets":
subtopic = "packet"
if (
subtopic == "status"
): # Override the status topic retain and qos settings based on broker configuration
retain = self.retain_status
qos = 1 if self.retain_status else 0
full_topic = f"{self.base_topic}/{subtopic}"
_trace(
f"Publishing topic='{_truncate_middle(full_topic)}', bytes={len(payload.encode('utf-8'))}, "
f"running={self._running}, retain={retain}, qos={qos}"
)
if self._running:
result = self.client.publish(full_topic, payload, retain=retain, qos=qos)
return result
else:
logger.warning(f"Cannot publish to {self.broker['name']} - not connected")
return None
def is_enabled(self) -> bool:
"""Check if connection is enabled"""
return self.enabled
def is_connected(self) -> bool:
"""Check if connection is active"""
return self._running
def has_pending_reconnect(self) -> bool:
"""Check if a reconnection is scheduled"""
return self._reconnect_timer is not None and self._reconnect_timer.is_alive()
def should_reconnect_for_token_expiry(self) -> bool:
"""Check if connection should be reconnected due to JWT expiry (at 80% of lifetime)"""
if not self._connect_time:
return False
elapsed = (datetime.now(timezone.utc) - self._connect_time).total_seconds()
expiry_seconds = self.jwt_expiry_minutes * 60
# Stagger refresh by 5% per broker to prevent simultaneous disconnects
# Broker 0: 80%, Broker 1: 85%, Broker 2: 90%, etc.
stagger_offset = self.broker_index * 0.05
refresh_threshold = 0.80 + stagger_offset
return elapsed >= expiry_seconds * refresh_threshold
def _schedule_jwt_refresh(self):
"""Schedule proactive JWT refresh before token expires"""
if self._jwt_refresh_timer:
self._jwt_refresh_timer.cancel()
expiry_seconds = self.jwt_expiry_minutes * 60
# Stagger refresh by 5% per broker to prevent simultaneous disconnects
# Broker 0: 80%, Broker 1: 85%, Broker 2: 90%, etc.
stagger_offset = self.broker_index * 0.05
refresh_threshold = 0.80 + stagger_offset
refresh_delay = expiry_seconds * refresh_threshold
_trace(
f"JWT refresh scheduled for {self.broker['name']} in {refresh_delay:.0f}s "
f"({refresh_threshold * 100:.0f}% of {self.jwt_expiry_minutes}min token lifetime)"
)
self._jwt_refresh_timer = threading.Timer(refresh_delay, self.reconnect_for_token_expiry)
self._jwt_refresh_timer.daemon = True
self._jwt_refresh_timer.start()
def reconnect_for_token_expiry(self):
"""Proactively reconnect with new JWT before current one expires"""
if not self._running:
return
logger.info(f"JWT token expiring soon for {self.broker['name']}, refreshing...")
self._running = False
self._jwt_refresh_timer = None
self._schedule_reconnect(reason="JWT token expiry")
self.client.disconnect()
# ====================================================================
# MeshCore → MQTT Publisher
# ====================================================================
class MeshCoreToMqttPusher:
def __init__(
self,
local_identity,
config: dict,
jwt_expiry_minutes: int = 10,
stats_provider: Optional[Callable[[], dict]] = None,
):
# Store local identity and get public key
self.local_identity = local_identity
public_key = local_identity.get_public_key().hex().upper()
# Extract values from config
from ..config import get_node_info
node_info = get_node_info(config)
self.iata_code = node_info["iata_code"]
self.email = node_info.get("email", "")
self.owner = node_info.get("owner", "")
self.status_interval = node_info["status_interval"]
self.node_name = node_info["node_name"]
self.local_identity = local_identity
self.public_key = public_key
self.jwt_expiry_minutes = jwt_expiry_minutes
self.app_version = __version__
self.radio_config = node_info["radio_config"]
self.stats_provider = stats_provider
self._status_task = None
self._running = False
self._shutdown_requested = False
self._lock = threading.Lock()
self._connect_timers: List[threading.Timer] = []
# Initialize brokers list
mqtt_brokers_config = config.get("mqtt_brokers", {})
letsmesh_config = config.get("letsmesh", {})
mqtt_config = config.get("mqtt", {})
brokers = []
if mqtt_brokers_config:
# Pull in brokers from mqtt_brokers config
brokers.extend(mqtt_brokers_config.get("brokers", []))
if letsmesh_config or mqtt_config:
logger.warning(
"Multiple MQTT broker configurations found (mqtt_brokers, letsmesh, mqtt). Only mqtt_brokers will be used"
)
else:
if mqtt_config:
imported_mqtt_config = self.convert_mqtt_to_broker_config(mqtt_config)
brokers.append(imported_mqtt_config)
if letsmesh_config:
imported_letsmesh_configs = self.convert_letsmesh_to_broker_config(letsmesh_config)
brokers.extend(imported_letsmesh_configs)
# Expand any {preset: <name>} entries (from user config OR legacy
# migrators) into concrete broker dicts, then collapse duplicates so
# later overrides win. Result feeds the validation loop unchanged.
brokers = _expand_preset_entries(brokers)
brokers = _merge_overrides_by_name(brokers)
# Known MC2MQTT hostnames that must never use format: mqtt.
# If a user's saved config has the wrong format (common after manual editing
# or an old UI version), auto-correct it so diagnostic topics aren't sent
# to brokers that will reject them and close the connection (rc=16).
_MC2MQTT_HOSTS = {
"letsmesh.net",
"waev.app",
"meshcoretomqtt",
}
self.brokers = []
if brokers:
for broker_config in brokers:
if all(k in broker_config for k in ["name", "host", "port", "enabled"]):
host = broker_config.get("host", "")
fmt = broker_config.get("format", "")
if fmt == "mqtt" and any(mc2 in host for mc2 in _MC2MQTT_HOSTS):
corrected = broker_config.get("name", host)
logger.warning(
f"Broker '{corrected}' has format=mqtt but host '{host}' is a MC2MQTT "
f"endpoint — auto-correcting to format=letsmesh. "
f"Update your config.yaml to silence this warning."
)
broker_config = {**broker_config, "format": "letsmesh"}
self.brokers.append(broker_config)
logger.info(
f"Added broker: {broker_config['name']} (format={broker_config.get('format', 'unknown')})"
)
else:
logger.warning(f"Skipping invalid broker config: {broker_config}")
# Create broker connections
self.connections: List[_BrokerConnection] = []
for idx, broker in enumerate(self.brokers):
conn = _BrokerConnection(
broker=broker,
local_identity=self.local_identity,
public_key=self.public_key,
iata_code=self.iata_code,
jwt_expiry_minutes=self.jwt_expiry_minutes,
email=self.email,
owner=self.owner,
broker_index=idx,
node_name=self.node_name,
on_connect_callback=self._on_broker_connected,
on_disconnect_callback=self._on_broker_disconnected,
)
self.connections.append(conn)
logger.info(f"Initialized with {len(self.connections)} broker connection(s)")
# Convert legacy configration to new one
if not mqtt_brokers_config:
logger.info("Storing mqtt_brokers config from legacy mqtt/letsmesh configuration")
mqtt_brokers_config = {
"iata_code": self.iata_code,
"status_interval": self.status_interval,
"owner": self.owner,
"email": self.email,
"brokers": brokers,
}
# Update the configuration with the new configuration
config["mqtt_brokers"] = mqtt_brokers_config
def convert_mqtt_to_broker_config(self, mqtt_cfg: dict) -> dict:
"""Convert legacy MQTT config format to internal broker config format"""
logger.info(f"Imported MQTT broker from 'mqtt' config: {mqtt_cfg['broker']}")
transport = "websockets" if mqtt_cfg.get("use_websockets", False) else "tcp"
return {
"enabled": mqtt_cfg.get("enabled", False),
"name": mqtt_cfg["broker"],
"host": mqtt_cfg["broker"],
"port": mqtt_cfg["port"],
"use_jwt_auth": False, # The legacy MQTT config does not support JWT auth, so we set this to False
"username": mqtt_cfg.get("username", None),
"password": mqtt_cfg.get("password", None),
"transport": transport,
"tls": mqtt_cfg.get("tls", None),
"format": "mqtt",
"base_topic": mqtt_cfg.get("base_topic", None),
}
def convert_letsmesh_to_broker_config(self, letsmesh_cfg: dict) -> List[dict]:
"""Migrate the legacy ``letsmesh:`` config block into preset-style entries.
Emits a list that the preset-expansion pass will turn into the same
broker dicts the old hard-coded migrator used to produce. Specifically:
* ``broker_index == -1`` -> both LetsMesh brokers (the preset default).
* ``broker_index == 0`` -> Europe only (US disabled via override).
* ``broker_index == 1`` -> US only (Europe disabled via override).
* ``additional_brokers`` -> appended verbatim after the preset reference.
* ``enabled: false`` -> overrides every preset broker as disabled.
"""
enabled = letsmesh_cfg.get("enabled", False)
idx = letsmesh_cfg.get("broker_index", -1)
# Resolve preset broker names so we can build accurate "disable" overrides.
preset_brokers = get_preset("letsmesh").get("brokers", [])
eu_name = preset_brokers[0]["name"] if len(preset_brokers) > 0 else "Europe (LetsMesh v1)"
us_name = preset_brokers[1]["name"] if len(preset_brokers) > 1 else "US West (LetsMesh v1)"
entries: List[dict] = [{"preset": "letsmesh"}]
# Honor broker_index by disabling the broker the legacy user did not pick.
if idx == 0:
entries.append({"name": us_name, "enabled": False})
elif idx == 1:
entries.append({"name": eu_name, "enabled": False})
# Honor the legacy enabled flag by overriding every preset broker.
if not enabled:
for b in preset_brokers:
entries.append({"name": b["name"], "enabled": False})
# Append any user-defined additional brokers as full entries.
for add_broker in letsmesh_cfg.get("additional_brokers", []):
logger.info(
f"Imported additional LetsMesh broker from 'letsmesh' config: {add_broker.get('name')}"
)
entries.append(
{
"enabled": enabled,
"name": add_broker["name"],
"host": add_broker["host"],
"port": add_broker["port"],
"audience": add_broker["audience"],
"use_jwt_auth": add_broker.get("use_jwt_auth", True),
"transport": add_broker.get("transport", "websockets"),
"format": "letsmesh",
"base_topic": None,
"retain_status": False,
"tls": {
"enabled": add_broker.get("tls", {}).get("enabled", True),
"insecure": add_broker.get("tls", {}).get("insecure", False),
},
}
)
return entries
def _on_broker_connected(self, broker_name: str):
"""Callback when a broker connects"""
if self._shutdown_requested:
return
# Publish initial status on first connection
if not self._status_task and self.status_interval > 0:
self._running = True
logger.info(f"Publishing initial status for {broker_name}...")
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
# Start heartbeat thread
self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True)
self._status_task.start()
logger.info(f"Started status heartbeat (interval: {self.status_interval}s)")
def _on_broker_disconnected(self, broker_name: str):
"""Callback when a broker disconnects"""
# Check if all connections are down AND none have pending reconnects
all_down = all(not conn.is_connected() for conn in self.connections)
any_reconnecting = any(conn.has_pending_reconnect() for conn in self.connections)
if all_down and not any_reconnecting:
logger.warning("All broker connections lost with no pending reconnects")
elif all_down:
logger.info("All brokers temporarily disconnected, reconnects pending")
def connect(self):
"""Establish connections to all configured brokers"""
self._shutdown_requested = False
self._connect_timers = []
for idx, conn in enumerate(self.connections):
try:
if idx == 0:
# Connect first broker immediately
conn.connect()
else:
# Stagger additional brokers using background timers
delay = idx * 30
logger.info(f"Staggering connection to {conn.broker['name']} by {delay}s")
timer = threading.Timer(delay, lambda c=conn: self._delayed_connect(c))
timer.daemon = True
timer.start()
self._connect_timers.append(timer)
except Exception as e:
logger.error(f"Failed to connect to {conn.broker['name']}: {e}")
def _delayed_connect(self, conn):
"""Connect a broker after a delay (called by timer)"""
if self._shutdown_requested:
return
try:
conn.connect()
except Exception as e:
logger.error(f"Failed to connect to {conn.broker['name']}: {e}")
def disconnect(self):
"""Disconnect from all brokers"""
self._shutdown_requested = True
# Cancel any delayed connect timers first.
for timer in self._connect_timers:
try:
timer.cancel()
except Exception as exc:
logger.debug(f"Error cancelling MQTT connect timer: {exc}")
self._connect_timers = []
# Stop the heartbeat loop
self._running = False
# Publish offline status before disconnecting
try:
self.publish_status(
state="offline", origin=self.node_name, radio_config=self.radio_config
)
except Exception as exc:
logger.debug(f"Failed to publish MQTT offline status during disconnect: {exc}")
# Disconnect all brokers
for conn in self.connections:
try:
conn.disconnect()
except Exception as e:
logger.error(f"Error disconnecting from {conn.broker['name']}: {e}")
self._status_task = None
logger.info("Disconnected from all brokers")
def _status_heartbeat_loop(self):
"""Background thread that publishes periodic status updates"""
import time
while self._running:
try:
# Publish status (JWT refresh now handled by individual broker timers)
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
logger.debug(f"Status heartbeat sent (next in {self.status_interval}s)")
time.sleep(self.status_interval)
except Exception as e:
logger.error(f"Status heartbeat error: {e}")
time.sleep(self.status_interval)
# ----------------------------------------------------------------
# Packet helpers
# ----------------------------------------------------------------
def _process_packet(self, pkt: dict) -> dict:
return {
"timestamp": datetime.now(timezone.utc).isoformat(),
"origin_id": self.public_key,
**pkt,
}
def publish_packet(self, pkt: dict, subtopic="packets", retain=False):
return self.publish(subtopic, self._process_packet(pkt), retain)
def publish_raw_data(self, raw_hex: str, subtopic="raw", retain=False):
pkt = {"type": "raw", "data": raw_hex, "bytes": len(raw_hex) // 2}
return self.publish_packet(pkt, subtopic, retain)
def publish_status(
self,
state: str = "online",
location: Optional[dict] = None,
extra_stats: Optional[dict] = None,
origin: Optional[str] = None,
radio_config: Optional[str] = None,
):
"""
Publish device status/heartbeat message
Args:
state: Device state (online/offline)
location: Optional dict with latitude/longitude
extra_stats: Optional additional statistics to include
origin: Node name/description
radio_config: Radio configuration string (freq,bw,sf,cr)
"""
# Get live stats from provider if available
if self.stats_provider:
live_stats = self.stats_provider()
else:
live_stats = {"uptime_secs": 0, "packets_sent": 0, "packets_received": 0}
status = {
"status": state,
"timestamp": datetime.now(timezone.utc).isoformat(),
"origin": origin or self.node_name,
"origin_id": self.public_key,
"model": "PyMC-Repeater",
"firmware_version": self.app_version,
"radio": radio_config or self.radio_config,
"client_version": f"pyMC_repeater/{self.app_version}",
"stats": {**live_stats, "errors": 0, "queue_len": 0, **(extra_stats or {})},
}
if location:
status["location"] = location
return self.publish("status", status, retain=True, qos=1)
def publish(self, subtopic: str, payload: dict, retain: bool = False, qos: int = 0):
"""Publish message to all connected brokers"""
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
logger.debug(
f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}"
)
packet_type = payload.get("type")
results = []
with self._lock:
for conn in self.connections:
if conn.enabled and conn.is_connected():
if packet_type in conn.disallowed_types:
_trace(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)")
continue
result = conn.publish(subtopic, message, retain=retain, qos=qos)
results.append((conn.broker["name"], result))
_trace(f"Published to {conn.broker['name']} -- {subtopic}")
elif not conn.enabled:
results.append((conn.broker["name"], "Skipped due to being disabled"))
if not results:
logger.warning(f"No active broker connections for publishing to {subtopic}")
return results
def publish_mqtt(self, payload: dict, subtopic: str, retain: bool = False, qos: int = 0):
"""Publish message to brokers using the legacy custom-MQTT format only.
This path exists for diagnostic streams (advert, noise_floor, crc_errors)
that were originally only published to ``format: mqtt`` brokers. MC2MQTT
family brokers (letsmesh, waev, meshcoretomqtt) intentionally skip these
topics today.
"""
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
logger.debug(
f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}"
)
results = []
with self._lock:
for conn in self.connections:
if conn.enabled and conn.is_connected():
if conn.format != "mqtt":
# Custom-MQTT-only path; MC2MQTT brokers are intentionally skipped here.
_trace(
f"Skipped publishing to {conn.broker['name']} "
f"(intentional: publish_mqtt only targets legacy mqtt format; broker format={conn.format})"
)
results.append((conn.broker["name"], None))
continue
result = conn.publish(subtopic, message, retain=retain, qos=qos)
results.append((conn.broker["name"], result))
_trace(
f"Published to {conn.broker['name']} (format={conn.format}) -- {subtopic}"
)
elif not conn.enabled:
results.append((conn.broker["name"], "Skipped due to being disabled"))
if not results:
logger.warning(f"No active broker connections for publishing to {subtopic}")
return results
# ====================================================================
# Helper Functions
# ====================================================================
def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
"""
Get human-readable MQTT error message.
Args:
rc: Return code from paho-mqtt
is_disconnect: True if from on_disconnect, False if from on_connect
Returns:
Human-readable error message
"""
# Fallback to manual mappings - Extended with MQTT v5 codes
connect_errors = {
0: "Connection accepted",
1: "Incorrect protocol version",
2: "Invalid client identifier",
3: "Server unavailable",
4: "Bad username or password (JWT invalid)",
5: "Not authorized (JWT signature/format invalid)",
# MQTT v5 codes
128: "Unspecified error",
129: "Malformed packet",
130: "Protocol error",
131: "Implementation specific error",
132: "Unsupported protocol version",
133: "Client identifier not valid",
134: "Bad username or password",
135: "Not authorized",
136: "Server unavailable",
137: "Server busy",
138: "Banned",
140: "Bad authentication method",
144: "Topic name invalid",
149: "Packet too large",
151: "Quota exceeded",
153: "Payload format invalid",
154: "Retain not supported",
155: "QoS not supported",
156: "Use another server",
157: "Server moved",
159: "Connection rate exceeded",
}
disconnect_errors = {
0: "Normal disconnect",
1: "Unacceptable protocol version",
2: "Identifier rejected",
3: "Server unavailable",
4: "Bad username or password",
5: "Not authorized",
7: "Connection lost / network error",
16: "The connection was lost.",
17: "Client timeout",
# MQTT v5 codes
128: "Unspecified error",
129: "Malformed packet",
130: "Protocol error",
131: "Implementation specific error",
135: "Not authorized",
137: "Server busy",
139: "Server shutting down",
141: "Keep alive timeout",
142: "Session taken over",
143: "Topic filter invalid",
144: "Topic name invalid",
147: "Receive maximum exceeded",
148: "Topic alias invalid",
149: "Packet too large",
150: "Message rate too high",
151: "Quota exceeded",
152: "Administrative action",
153: "Payload format invalid",
154: "Retain not supported",
155: "QoS not supported",
156: "Use another server",
157: "Server moved",
158: "Shared subscriptions not supported",
159: "Connection rate exceeded",
160: "Maximum connect time",
161: "Subscription identifiers not supported",
162: "Wildcard subscriptions not supported",
}
if HAS_REASON_CODES and ReasonCode is not None:
try:
reason = ReasonCode(
mqtt.CONNACK if not is_disconnect else mqtt.DISCONNECT, identifier=rc
)
name = reason.getName() if hasattr(reason, "getName") else str(reason)
return f"{name} (code {rc})"
except Exception as e:
_fallback = (disconnect_errors if is_disconnect else connect_errors).get(rc)
if _fallback is None:
logger.debug(f"Could not decode reason code {rc}: {e}")
error_dict = disconnect_errors if is_disconnect else connect_errors
if is_disconnect:
mapped = error_dict.get(rc)
if mapped is not None:
if rc >= 128 and "(code" not in mapped:
return f"{mapped} (code {rc})"
return mapped
try:
paho_error = mqtt.error_string(rc)
if paho_error and paho_error != "Unknown error.":
return paho_error
except Exception as exc:
logger.debug(f"Failed to map paho MQTT error string for code {rc}: {exc}")
return error_dict.get(rc, f"Unknown error code {rc}")