From 32e13cb40edc8b23e3ff1d28bed8444f14ae0699 Mon Sep 17 00:00:00 2001 From: Lloyd Date: Mon, 22 Dec 2025 13:08:30 +0000 Subject: [PATCH] Enhance LetsMesh broker configuration and connection management - Added detailed comments and examples for broker selection in config.yaml - Refactored letsmesh_handler.py to support multiple broker connections - Implemented connection lifecycle management for individual brokers - Improved JWT token handling and publishing across all connected brokers --- config.yaml.example | 46 +- repeater/data_acquisition/letsmesh_handler.py | 404 +++++++++++------- 2 files changed, 306 insertions(+), 144 deletions(-) diff --git a/config.yaml.example b/config.yaml.example index e714270..eb23ab1 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -199,7 +199,51 @@ storage: letsmesh: enabled: false iata_code: "Test" # e.g., "SFO", "LHR", "Test" - broker_index: 0 # Which LetsMesh broker (0=EU, 1=US West) + + # ============================================================ + # BROKER SELECTION MODE - Choose how to connect to brokers + # ============================================================ + # + # EXAMPLE 1: Single built-in broker (default, most common) + # Connect to Europe only - simple, low bandwidth + broker_index: 0 # 0 = Europe, 1 = US West + + # EXAMPLE 2: All built-in brokers for maximum redundancy + # Survives single broker failure, best uptime + # broker_index: -1 # or null - connects to both EU and US + + # EXAMPLE 3: Only custom brokers (private/self-hosted) + # Ignores built-in LetsMesh brokers completely + # broker_index: -2 + # additional_brokers: + # - name: "Private Server" + # host: "mqtt.myserver.com" + # port: 443 + # audience: "mqtt.myserver.com" + + # EXAMPLE 4: Single built-in + custom backup + # Use EU primary with your own backup + # broker_index: 0 + # additional_brokers: + # - name: "Backup Server" + # host: "mqtt-backup.mydomain.com" + # port: 8883 + # audience: "mqtt-backup.mydomain.com" + + # EXAMPLE 5: All built-in + multiple custom (maximum redundancy) + # EU + US + your own servers - best for critical deployments + # broker_index: -1 + # additional_brokers: + # - name: "Custom Primary" + # host: "mqtt-1.mydomain.com" + # port: 443 + # audience: "mqtt-1.mydomain.com" + # - name: "Custom Backup" + # host: "mqtt-2.mydomain.com" + # port: 443 + # audience: "mqtt-2.mydomain.com" + # ============================================================ + status_interval: 300 owner: "" email: "" diff --git a/repeater/data_acquisition/letsmesh_handler.py b/repeater/data_acquisition/letsmesh_handler.py index 8a42504..9b2c9b4 100644 --- a/repeater/data_acquisition/letsmesh_handler.py +++ b/repeater/data_acquisition/letsmesh_handler.py @@ -3,10 +3,11 @@ import logging import binascii import base64 import paho.mqtt.client as mqtt +import threading from datetime import datetime, timedelta, UTC from nacl.signing import SigningKey -from typing import Callable, Optional +from typing import Callable, Optional, List, Dict from .. import __version__ @@ -36,15 +37,164 @@ LETSMESH_BROKERS = [ ] +# ==================================================================== +# Single Broker Connection Manager +# ==================================================================== +class _BrokerConnection: + """ + Manages a single MQTT broker connection with independent lifecycle. + Internal class - not exposed publicly. + """ + + def __init__( + self, + broker: dict, + private_key_hex: str, + public_key: str, + iata_code: str, + jwt_expiry_minutes: int, + use_tls: bool, + email: str, + owner: str, + on_connect_callback: Optional[Callable] = None, + on_disconnect_callback: Optional[Callable] = None, + ): + self.broker = broker + self.private_key_hex = private_key_hex + self.public_key = public_key.upper() + self.iata_code = iata_code + self.jwt_expiry_minutes = jwt_expiry_minutes + self.use_tls = use_tls + self.email = email + self.owner = owner + self._on_connect_callback = on_connect_callback + self._on_disconnect_callback = on_disconnect_callback + self._connect_time = None + self._tls_verified = False + self._running = False + + # MQTT WebSocket client - unique client ID per broker + client_id = f"meshcore_{self.public_key}_{broker['host']}" + self.client = mqtt.Client(client_id=client_id, transport="websockets") + self.client.on_connect = self._on_connect + self.client.on_disconnect = self._on_disconnect + + def _generate_jwt(self) -> str: + """Generate MeshCore-style Ed25519 JWT token""" + now = datetime.now(UTC) + + header = {"alg": "Ed25519", "typ": "JWT"} + + payload = { + "publicKey": self.public_key.upper(), + "aud": self.broker["audience"], + "iat": int(now.timestamp()), + "exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()), + } + + # Only include email/owner for verified TLS connections + if self.use_tls and self._tls_verified and (self.email or self.owner): + payload["email"] = self.email + payload["owner"] = self.owner + else: + payload["email"] = "" + payload["owner"] = "" + + # Encode header and payload (compact JSON - no spaces) + header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode()) + payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode()) + + signing_input = f"{header_b64}.{payload_b64}".encode() + seed32 = binascii.unhexlify(self.private_key_hex) + signer = SigningKey(seed32) + + # Sign the message + signature = signer.sign(signing_input).signature + signature_hex = binascii.hexlify(signature).decode() + token = f"{header_b64}.{payload_b64}.{signature_hex}" + + return token + + def _on_connect(self, client, userdata, flags, rc): + """MQTT connection callback""" + if rc == 0: + logging.info(f"Connected to {self.broker['name']}") + self._running = True + if self._on_connect_callback: + self._on_connect_callback(self.broker["name"]) + else: + logging.error(f"Failed to connect to {self.broker['name']} (rc={rc})") + + def _on_disconnect(self, client, userdata, rc): + """MQTT disconnection callback""" + logging.warning(f"Disconnected from {self.broker['name']} (rc={rc})") + self._running = False + if self._on_disconnect_callback: + self._on_disconnect_callback(self.broker["name"]) + + def refresh_jwt_token(self): + """Refresh JWT token for MQTT authentication""" + token = self._generate_jwt() + username = f"v1_{self.public_key}" + self.client.username_pw_set(username=username, password=token) + self._connect_time = datetime.now(UTC) + logging.debug(f"JWT token refreshed for {self.broker['name']}") + + def connect(self): + """Establish connection to broker""" + # Conditional TLS setup + if self.use_tls: + import ssl + + self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT) + self.client.tls_insecure_set(False) + self._tls_verified = True + protocol = "wss" + else: + protocol = "ws" + + # Generate and set JWT token + self.refresh_jwt_token() + + logging.info( + f"Connecting to {self.broker['name']} " + f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..." + ) + + self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) + self.client.loop_start() + + def disconnect(self): + """Disconnect from broker""" + self._running = False + self.client.loop_stop() + self.client.disconnect() + logging.info(f"Disconnected from {self.broker['name']}") + + def publish(self, topic: str, payload: str, retain: bool = False): + """Publish message to broker""" + if self._running: + result = self.client.publish(topic, payload, retain=retain) + return result + return None + + def is_connected(self) -> bool: + """Check if connection is active""" + return self._running + + def should_refresh_token(self) -> bool: + """Check if JWT token needs refresh (at 80% of expiry)""" + if not self._connect_time: + return False + elapsed = (datetime.now(UTC) - self._connect_time).total_seconds() + expiry_seconds = self.jwt_expiry_minutes * 60 + return elapsed >= expiry_seconds * 0.8 + + # ==================================================================== # MeshCore → MQTT Publisher with Ed25519 auth token # ==================================================================== class MeshCoreToMqttJwtPusher: - """ - Push-only MQTT publisher for Let's Mesh MQTT brokers. - Implements MeshCore-style Ed25519 token signing. - No modifications to crypto.py. - """ def __init__( self, @@ -61,17 +211,49 @@ class MeshCoreToMqttJwtPusher: node_info = get_node_info(config) iata_code = node_info["iata_code"] - broker_index = node_info["broker_index"] + broker_index = node_info.get("broker_index") self.email = node_info.get("email", "") self.owner = node_info.get("owner", "") status_interval = node_info["status_interval"] node_name = node_info["node_name"] radio_config = node_info["radio_config"] - if broker_index >= len(LETSMESH_BROKERS): - raise ValueError(f"Invalid broker_index {broker_index}") + # Get additional brokers from config (optional) + letsmesh_config = config.get("letsmesh", {}) + additional_brokers = letsmesh_config.get("additional_brokers", []) + + # Determine which brokers to connect to + if broker_index == -2: + # Custom brokers only - no built-in brokers + self.brokers = [] + logging.info("Custom broker mode: using only user-defined brokers") + elif broker_index is None or broker_index == -1: + # Connect to all built-in brokers + additional ones + self.brokers = LETSMESH_BROKERS.copy() + logging.info(f"Multi-broker mode: connecting to all {len(LETSMESH_BROKERS)} built-in brokers") + else: + # Single broker mode (backward compatibility) + if broker_index >= len(LETSMESH_BROKERS): + raise ValueError(f"Invalid broker_index {broker_index}") + self.brokers = [LETSMESH_BROKERS[broker_index]] + logging.info(f"Single broker mode: connecting to {self.brokers[0]['name']}") + + # Add additional brokers from config + if additional_brokers: + for broker_config in additional_brokers: + if all(k in broker_config for k in ["name", "host", "port", "audience"]): + self.brokers.append(broker_config) + logging.info(f"Added custom broker: {broker_config['name']}") + else: + logging.warning(f"Skipping invalid broker config: {broker_config}") + + # Validate that we have at least one broker + if not self.brokers: + raise ValueError( + "No brokers configured. Either set broker_index to a valid value " + "or provide additional_brokers in config." + ) - self.broker = LETSMESH_BROKERS[broker_index] self.private_key_hex = private_key self.public_key = public_key.upper() self.iata_code = iata_code @@ -84,148 +266,74 @@ class MeshCoreToMqttJwtPusher: self.stats_provider = stats_provider self._status_task = None self._running = False - self._connect_time = None - self._tls_verified = False + self._lock = threading.Lock() - # MQTT WebSocket client - self.client = mqtt.Client(client_id=f"meshcore_{self.public_key}", transport="websockets") - self.client.on_connect = self._on_connect - self.client.on_disconnect = self._on_disconnect - - # ---------------------------------------------------------------- - # MeshCore-style Ed25519 token generator - # ---------------------------------------------------------------- - def _generate_jwt(self) -> str: - now = datetime.now(UTC) - - header = {"alg": "Ed25519", "typ": "JWT"} - - payload = { - "publicKey": self.public_key.upper(), - "aud": self.broker["audience"], - "iat": int(now.timestamp()), - "exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()), - } - - # Only include email/owner for verified TLS connections - if self.use_tls and self._tls_verified and (self.email or self.owner): - payload["email"] = self.email - payload["owner"] = self.owner - logging.debug("JWT includes email/owner (TLS verified)") - else: - payload["email"] = "" - payload["owner"] = "" - if not self.use_tls: - logging.debug("JWT excludes email/owner (TLS disabled)") - elif not self._tls_verified: - logging.debug("JWT excludes email/owner (TLS not verified yet)") - else: - logging.debug("JWT excludes email/owner (email/owner not configured)") - - # Encode header and payload (compact JSON - no spaces) - header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode()) - payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode()) - - signing_input = f"{header_b64}.{payload_b64}".encode() - seed32 = binascii.unhexlify(self.private_key_hex) - signer = SigningKey(seed32) - - # Verify the public key matches what we expect - derived_public = binascii.hexlify(bytes(signer.verify_key)).decode() - if derived_public.upper() != self.public_key.upper(): - raise ValueError( - f"Public key mismatch! " f"Derived: {derived_public}, Expected: {self.public_key}" + # Create broker connections + self.connections: List[_BrokerConnection] = [] + for broker in self.brokers: + conn = _BrokerConnection( + broker=broker, + private_key_hex=self.private_key_hex, + public_key=self.public_key, + iata_code=self.iata_code, + jwt_expiry_minutes=self.jwt_expiry_minutes, + use_tls=self.use_tls, + email=self.email, + owner=self.owner, + on_connect_callback=self._on_broker_connected, + on_disconnect_callback=self._on_broker_disconnected, ) + self.connections.append(conn) - # Sign the message - signature = signer.sign(signing_input).signature - signature_hex = binascii.hexlify(signature).decode() - token = f"{header_b64}.{payload_b64}.{signature_hex}" + logging.info(f"Initialized with {len(self.connections)} broker connection(s)") - logging.debug(f"Generated MeshCore token: {token[:10]}...{token[-10:]}") - return token - - # ---------------------------------------------------------------- - # MQTT setup - # ---------------------------------------------------------------- - def _on_connect(self, client, userdata, flags, rc): - if rc == 0: - logging.info(f"Connected to {self.broker['name']}") + def _on_broker_connected(self, broker_name: str): + """Callback when a broker connects""" + # Publish initial status on first connection + if not self._status_task and self.status_interval > 0: self._running = True - - # Publish initial status on connect self.publish_status( state="online", origin=self.node_name, radio_config=self.radio_config ) - - # connected start heartbeat thread - if self.status_interval > 0 and not self._status_task: - import threading - self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True) - self._status_task.start() - logging.info(f"Started status heartbeat (interval: {self.status_interval}s)") - else: - logging.error(f"Failed with code {rc}") + # Start heartbeat thread + self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True) + self._status_task.start() + logging.info(f"Started status heartbeat (interval: {self.status_interval}s)") - def _on_disconnect(self, client, userdata, rc): - logging.warning(f"Disconnected (rc={rc})") - self._running = False + def _on_broker_disconnected(self, broker_name: str): + """Callback when a broker disconnects""" + # Check if all connections are down + all_down = all(not conn.is_connected() for conn in self.connections) + if all_down: + logging.warning("All broker connections lost") + self._running = False - def _refresh_jwt_token(self): - """Refresh JWT token for MQTT authentication""" - token = self._generate_jwt() - username = f"v1_{self.public_key}" - self.client.username_pw_set(username=username, password=token) - self._connect_time = datetime.now(UTC) - logging.info("JWT token refreshed") - - # ---------------------------------------------------------------- - # Connect using WebSockets + TLS + MeshCore token auth - # ---------------------------------------------------------------- def connect(self): - # Conditional TLS setup - if self.use_tls: - import ssl - # Enable TLS with certificate verification - self.client.tls_set( - cert_reqs=ssl.CERT_REQUIRED, - tls_version=ssl.PROTOCOL_TLS_CLIENT - ) - self.client.tls_insecure_set(False) # Enforce hostname verification - # Mark as verified - if connection fails, we won't connect anyway - self._tls_verified = True - if self.email or self.owner: - logging.info("TLS enabled with certificate verification - email/owner will be included") - protocol = "wss" - else: - protocol = "ws" - - # Generate JWT token (will include email/owner if TLS verified) - token = self._generate_jwt() - username = f"v1_{self.public_key}" - self.client.username_pw_set(username=username, password=token) - - logging.info( - f"Connecting to {self.broker['name']} " - f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..." - ) - - # Must use raw hostname without wss:// - self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) - self.client.loop_start() - self._connect_time = datetime.now(UTC) + """Establish connections to all configured brokers""" + for conn in self.connections: + try: + conn.connect() + except Exception as e: + logging.error(f"Failed to connect to {conn.broker['name']}: {e}") def disconnect(self): + """Disconnect from all brokers""" self._running = False + # Publish offline status before disconnecting self.publish_status(state="offline", origin=self.node_name, radio_config=self.radio_config) + import time + time.sleep(0.5) # Give time for messages to be sent - time.sleep(0.5) # Give time for the message to be sent + # Disconnect all brokers + for conn in self.connections: + try: + conn.disconnect() + except Exception as e: + logging.error(f"Error disconnecting from {conn.broker['name']}: {e}") - self.client.loop_stop() - self.client.disconnect() - logging.info("Disconnected") + logging.info("Disconnected from all brokers") def _status_heartbeat_loop(self): """Background thread that publishes periodic status updates""" @@ -233,13 +341,11 @@ class MeshCoreToMqttJwtPusher: while self._running: try: - # Refresh JWT token before it expires (at 80% of expiry time) - if self._connect_time: - elapsed = (datetime.now(UTC) - self._connect_time).total_seconds() - expiry_seconds = self.jwt_expiry_minutes * 60 - if elapsed >= expiry_seconds * 0.8: - self._refresh_jwt_token() - + # Refresh JWT tokens for all connections before they expire + for conn in self.connections: + if conn.is_connected() and conn.should_refresh_token(): + conn.refresh_jwt_token() + self.publish_status( state="online", origin=self.node_name, radio_config=self.radio_config ) @@ -307,9 +413,21 @@ class MeshCoreToMqttJwtPusher: return self.publish("status", status, retain=False) def publish(self, subtopic: str, payload: dict, retain: bool = False): + """Publish message to all connected brokers""" topic = self._topic(subtopic) message = json.dumps(payload) - result = self.client.publish(topic, message, retain=retain) - logging.debug(f"Published to {topic}: {message}") - return result + + results = [] + with self._lock: + for conn in self.connections: + if conn.is_connected(): + result = conn.publish(topic, message, retain=retain) + results.append((conn.broker["name"], result)) + logging.debug(f"Published to {conn.broker['name']}/{topic}") + + # Log if no brokers were available + if not results: + logging.warning(f"No active broker connections for publishing to {topic}") + + return results