Enhance LetsMesh broker configuration and connection management

- Added detailed comments and examples for broker selection in config.yaml
- Refactored letsmesh_handler.py to support multiple broker connections
- Implemented connection lifecycle management for individual brokers
- Improved JWT token handling and publishing across all connected brokers
This commit is contained in:
Lloyd
2025-12-22 13:08:30 +00:00
parent 6d82936987
commit 32e13cb40e
2 changed files with 306 additions and 144 deletions
+45 -1
View File
@@ -199,7 +199,51 @@ storage:
letsmesh:
enabled: false
iata_code: "Test" # e.g., "SFO", "LHR", "Test"
broker_index: 0 # Which LetsMesh broker (0=EU, 1=US West)
# ============================================================
# BROKER SELECTION MODE - Choose how to connect to brokers
# ============================================================
#
# EXAMPLE 1: Single built-in broker (default, most common)
# Connect to Europe only - simple, low bandwidth
broker_index: 0 # 0 = Europe, 1 = US West
# EXAMPLE 2: All built-in brokers for maximum redundancy
# Survives single broker failure, best uptime
# broker_index: -1 # or null - connects to both EU and US
# EXAMPLE 3: Only custom brokers (private/self-hosted)
# Ignores built-in LetsMesh brokers completely
# broker_index: -2
# additional_brokers:
# - name: "Private Server"
# host: "mqtt.myserver.com"
# port: 443
# audience: "mqtt.myserver.com"
# EXAMPLE 4: Single built-in + custom backup
# Use EU primary with your own backup
# broker_index: 0
# additional_brokers:
# - name: "Backup Server"
# host: "mqtt-backup.mydomain.com"
# port: 8883
# audience: "mqtt-backup.mydomain.com"
# EXAMPLE 5: All built-in + multiple custom (maximum redundancy)
# EU + US + your own servers - best for critical deployments
# broker_index: -1
# additional_brokers:
# - name: "Custom Primary"
# host: "mqtt-1.mydomain.com"
# port: 443
# audience: "mqtt-1.mydomain.com"
# - name: "Custom Backup"
# host: "mqtt-2.mydomain.com"
# port: 443
# audience: "mqtt-2.mydomain.com"
# ============================================================
status_interval: 300
owner: ""
email: ""
+261 -143
View File
@@ -3,10 +3,11 @@ import logging
import binascii
import base64
import paho.mqtt.client as mqtt
import threading
from datetime import datetime, timedelta, UTC
from nacl.signing import SigningKey
from typing import Callable, Optional
from typing import Callable, Optional, List, Dict
from .. import __version__
@@ -36,15 +37,164 @@ LETSMESH_BROKERS = [
]
# ====================================================================
# Single Broker Connection Manager
# ====================================================================
class _BrokerConnection:
"""
Manages a single MQTT broker connection with independent lifecycle.
Internal class - not exposed publicly.
"""
def __init__(
self,
broker: dict,
private_key_hex: str,
public_key: str,
iata_code: str,
jwt_expiry_minutes: int,
use_tls: bool,
email: str,
owner: str,
on_connect_callback: Optional[Callable] = None,
on_disconnect_callback: Optional[Callable] = None,
):
self.broker = broker
self.private_key_hex = private_key_hex
self.public_key = public_key.upper()
self.iata_code = iata_code
self.jwt_expiry_minutes = jwt_expiry_minutes
self.use_tls = use_tls
self.email = email
self.owner = owner
self._on_connect_callback = on_connect_callback
self._on_disconnect_callback = on_disconnect_callback
self._connect_time = None
self._tls_verified = False
self._running = False
# MQTT WebSocket client - unique client ID per broker
client_id = f"meshcore_{self.public_key}_{broker['host']}"
self.client = mqtt.Client(client_id=client_id, transport="websockets")
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
def _generate_jwt(self) -> str:
"""Generate MeshCore-style Ed25519 JWT token"""
now = datetime.now(UTC)
header = {"alg": "Ed25519", "typ": "JWT"}
payload = {
"publicKey": self.public_key.upper(),
"aud": self.broker["audience"],
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()),
}
# Only include email/owner for verified TLS connections
if self.use_tls and self._tls_verified and (self.email or self.owner):
payload["email"] = self.email
payload["owner"] = self.owner
else:
payload["email"] = ""
payload["owner"] = ""
# Encode header and payload (compact JSON - no spaces)
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
seed32 = binascii.unhexlify(self.private_key_hex)
signer = SigningKey(seed32)
# Sign the message
signature = signer.sign(signing_input).signature
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
return token
def _on_connect(self, client, userdata, flags, rc):
"""MQTT connection callback"""
if rc == 0:
logging.info(f"Connected to {self.broker['name']}")
self._running = True
if self._on_connect_callback:
self._on_connect_callback(self.broker["name"])
else:
logging.error(f"Failed to connect to {self.broker['name']} (rc={rc})")
def _on_disconnect(self, client, userdata, rc):
"""MQTT disconnection callback"""
logging.warning(f"Disconnected from {self.broker['name']} (rc={rc})")
self._running = False
if self._on_disconnect_callback:
self._on_disconnect_callback(self.broker["name"])
def refresh_jwt_token(self):
"""Refresh JWT token for MQTT authentication"""
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
self._connect_time = datetime.now(UTC)
logging.debug(f"JWT token refreshed for {self.broker['name']}")
def connect(self):
"""Establish connection to broker"""
# Conditional TLS setup
if self.use_tls:
import ssl
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)
self.client.tls_insecure_set(False)
self._tls_verified = True
protocol = "wss"
else:
protocol = "ws"
# Generate and set JWT token
self.refresh_jwt_token()
logging.info(
f"Connecting to {self.broker['name']} "
f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..."
)
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.loop_start()
def disconnect(self):
"""Disconnect from broker"""
self._running = False
self.client.loop_stop()
self.client.disconnect()
logging.info(f"Disconnected from {self.broker['name']}")
def publish(self, topic: str, payload: str, retain: bool = False):
"""Publish message to broker"""
if self._running:
result = self.client.publish(topic, payload, retain=retain)
return result
return None
def is_connected(self) -> bool:
"""Check if connection is active"""
return self._running
def should_refresh_token(self) -> bool:
"""Check if JWT token needs refresh (at 80% of expiry)"""
if not self._connect_time:
return False
elapsed = (datetime.now(UTC) - self._connect_time).total_seconds()
expiry_seconds = self.jwt_expiry_minutes * 60
return elapsed >= expiry_seconds * 0.8
# ====================================================================
# MeshCore → MQTT Publisher with Ed25519 auth token
# ====================================================================
class MeshCoreToMqttJwtPusher:
"""
Push-only MQTT publisher for Let's Mesh MQTT brokers.
Implements MeshCore-style Ed25519 token signing.
No modifications to crypto.py.
"""
def __init__(
self,
@@ -61,17 +211,49 @@ class MeshCoreToMqttJwtPusher:
node_info = get_node_info(config)
iata_code = node_info["iata_code"]
broker_index = node_info["broker_index"]
broker_index = node_info.get("broker_index")
self.email = node_info.get("email", "")
self.owner = node_info.get("owner", "")
status_interval = node_info["status_interval"]
node_name = node_info["node_name"]
radio_config = node_info["radio_config"]
if broker_index >= len(LETSMESH_BROKERS):
raise ValueError(f"Invalid broker_index {broker_index}")
# Get additional brokers from config (optional)
letsmesh_config = config.get("letsmesh", {})
additional_brokers = letsmesh_config.get("additional_brokers", [])
# Determine which brokers to connect to
if broker_index == -2:
# Custom brokers only - no built-in brokers
self.brokers = []
logging.info("Custom broker mode: using only user-defined brokers")
elif broker_index is None or broker_index == -1:
# Connect to all built-in brokers + additional ones
self.brokers = LETSMESH_BROKERS.copy()
logging.info(f"Multi-broker mode: connecting to all {len(LETSMESH_BROKERS)} built-in brokers")
else:
# Single broker mode (backward compatibility)
if broker_index >= len(LETSMESH_BROKERS):
raise ValueError(f"Invalid broker_index {broker_index}")
self.brokers = [LETSMESH_BROKERS[broker_index]]
logging.info(f"Single broker mode: connecting to {self.brokers[0]['name']}")
# Add additional brokers from config
if additional_brokers:
for broker_config in additional_brokers:
if all(k in broker_config for k in ["name", "host", "port", "audience"]):
self.brokers.append(broker_config)
logging.info(f"Added custom broker: {broker_config['name']}")
else:
logging.warning(f"Skipping invalid broker config: {broker_config}")
# Validate that we have at least one broker
if not self.brokers:
raise ValueError(
"No brokers configured. Either set broker_index to a valid value "
"or provide additional_brokers in config."
)
self.broker = LETSMESH_BROKERS[broker_index]
self.private_key_hex = private_key
self.public_key = public_key.upper()
self.iata_code = iata_code
@@ -84,148 +266,74 @@ class MeshCoreToMqttJwtPusher:
self.stats_provider = stats_provider
self._status_task = None
self._running = False
self._connect_time = None
self._tls_verified = False
self._lock = threading.Lock()
# MQTT WebSocket client
self.client = mqtt.Client(client_id=f"meshcore_{self.public_key}", transport="websockets")
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
# ----------------------------------------------------------------
# MeshCore-style Ed25519 token generator
# ----------------------------------------------------------------
def _generate_jwt(self) -> str:
now = datetime.now(UTC)
header = {"alg": "Ed25519", "typ": "JWT"}
payload = {
"publicKey": self.public_key.upper(),
"aud": self.broker["audience"],
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()),
}
# Only include email/owner for verified TLS connections
if self.use_tls and self._tls_verified and (self.email or self.owner):
payload["email"] = self.email
payload["owner"] = self.owner
logging.debug("JWT includes email/owner (TLS verified)")
else:
payload["email"] = ""
payload["owner"] = ""
if not self.use_tls:
logging.debug("JWT excludes email/owner (TLS disabled)")
elif not self._tls_verified:
logging.debug("JWT excludes email/owner (TLS not verified yet)")
else:
logging.debug("JWT excludes email/owner (email/owner not configured)")
# Encode header and payload (compact JSON - no spaces)
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
seed32 = binascii.unhexlify(self.private_key_hex)
signer = SigningKey(seed32)
# Verify the public key matches what we expect
derived_public = binascii.hexlify(bytes(signer.verify_key)).decode()
if derived_public.upper() != self.public_key.upper():
raise ValueError(
f"Public key mismatch! " f"Derived: {derived_public}, Expected: {self.public_key}"
# Create broker connections
self.connections: List[_BrokerConnection] = []
for broker in self.brokers:
conn = _BrokerConnection(
broker=broker,
private_key_hex=self.private_key_hex,
public_key=self.public_key,
iata_code=self.iata_code,
jwt_expiry_minutes=self.jwt_expiry_minutes,
use_tls=self.use_tls,
email=self.email,
owner=self.owner,
on_connect_callback=self._on_broker_connected,
on_disconnect_callback=self._on_broker_disconnected,
)
self.connections.append(conn)
# Sign the message
signature = signer.sign(signing_input).signature
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
logging.info(f"Initialized with {len(self.connections)} broker connection(s)")
logging.debug(f"Generated MeshCore token: {token[:10]}...{token[-10:]}")
return token
# ----------------------------------------------------------------
# MQTT setup
# ----------------------------------------------------------------
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
logging.info(f"Connected to {self.broker['name']}")
def _on_broker_connected(self, broker_name: str):
"""Callback when a broker connects"""
# Publish initial status on first connection
if not self._status_task and self.status_interval > 0:
self._running = True
# Publish initial status on connect
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
# connected start heartbeat thread
if self.status_interval > 0 and not self._status_task:
import threading
self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True)
self._status_task.start()
logging.info(f"Started status heartbeat (interval: {self.status_interval}s)")
else:
logging.error(f"Failed with code {rc}")
# Start heartbeat thread
self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True)
self._status_task.start()
logging.info(f"Started status heartbeat (interval: {self.status_interval}s)")
def _on_disconnect(self, client, userdata, rc):
logging.warning(f"Disconnected (rc={rc})")
self._running = False
def _on_broker_disconnected(self, broker_name: str):
"""Callback when a broker disconnects"""
# Check if all connections are down
all_down = all(not conn.is_connected() for conn in self.connections)
if all_down:
logging.warning("All broker connections lost")
self._running = False
def _refresh_jwt_token(self):
"""Refresh JWT token for MQTT authentication"""
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
self._connect_time = datetime.now(UTC)
logging.info("JWT token refreshed")
# ----------------------------------------------------------------
# Connect using WebSockets + TLS + MeshCore token auth
# ----------------------------------------------------------------
def connect(self):
# Conditional TLS setup
if self.use_tls:
import ssl
# Enable TLS with certificate verification
self.client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS_CLIENT
)
self.client.tls_insecure_set(False) # Enforce hostname verification
# Mark as verified - if connection fails, we won't connect anyway
self._tls_verified = True
if self.email or self.owner:
logging.info("TLS enabled with certificate verification - email/owner will be included")
protocol = "wss"
else:
protocol = "ws"
# Generate JWT token (will include email/owner if TLS verified)
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
logging.info(
f"Connecting to {self.broker['name']} "
f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..."
)
# Must use raw hostname without wss://
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.loop_start()
self._connect_time = datetime.now(UTC)
"""Establish connections to all configured brokers"""
for conn in self.connections:
try:
conn.connect()
except Exception as e:
logging.error(f"Failed to connect to {conn.broker['name']}: {e}")
def disconnect(self):
"""Disconnect from all brokers"""
self._running = False
# Publish offline status before disconnecting
self.publish_status(state="offline", origin=self.node_name, radio_config=self.radio_config)
import time
time.sleep(0.5) # Give time for messages to be sent
time.sleep(0.5) # Give time for the message to be sent
# Disconnect all brokers
for conn in self.connections:
try:
conn.disconnect()
except Exception as e:
logging.error(f"Error disconnecting from {conn.broker['name']}: {e}")
self.client.loop_stop()
self.client.disconnect()
logging.info("Disconnected")
logging.info("Disconnected from all brokers")
def _status_heartbeat_loop(self):
"""Background thread that publishes periodic status updates"""
@@ -233,13 +341,11 @@ class MeshCoreToMqttJwtPusher:
while self._running:
try:
# Refresh JWT token before it expires (at 80% of expiry time)
if self._connect_time:
elapsed = (datetime.now(UTC) - self._connect_time).total_seconds()
expiry_seconds = self.jwt_expiry_minutes * 60
if elapsed >= expiry_seconds * 0.8:
self._refresh_jwt_token()
# Refresh JWT tokens for all connections before they expire
for conn in self.connections:
if conn.is_connected() and conn.should_refresh_token():
conn.refresh_jwt_token()
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
@@ -307,9 +413,21 @@ class MeshCoreToMqttJwtPusher:
return self.publish("status", status, retain=False)
def publish(self, subtopic: str, payload: dict, retain: bool = False):
"""Publish message to all connected brokers"""
topic = self._topic(subtopic)
message = json.dumps(payload)
result = self.client.publish(topic, message, retain=retain)
logging.debug(f"Published to {topic}: {message}")
return result
results = []
with self._lock:
for conn in self.connections:
if conn.is_connected():
result = conn.publish(topic, message, retain=retain)
results.append((conn.broker["name"], result))
logging.debug(f"Published to {conn.broker['name']}/{topic}")
# Log if no brokers were available
if not results:
logging.warning(f"No active broker connections for publishing to {topic}")
return results