Files
pyMC_Repeater/repeater/data_acquisition/letsmesh_handler.py
T
2026-01-23 18:43:49 +10:00

564 lines
21 KiB
Python

import json
import logging
import binascii
import base64
import paho.mqtt.client as mqtt
import threading
from datetime import datetime, timedelta
from nacl.signing import SigningKey
from typing import Callable, Optional, List, Dict
from .. import __version__
# Try to import datetime.UTC (Python 3.11+) otherwise fallback to timezone.utc
try:
from datetime import UTC
except Exception:
from datetime import timezone
UTC = timezone.utc
# Try to import paho-mqtt error code mappings
try:
from paho.mqtt.reasoncodes import ReasonCode
HAS_REASON_CODES = True
except ImportError:
HAS_REASON_CODES = False
# --------------------------------------------------------------------
# Helper: Base64URL without padding
# --------------------------------------------------------------------
def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
# --------------------------------------------------------------------
# Let's Mesh MQTT Broker List (WebSocket Secure)
# --------------------------------------------------------------------
LETSMESH_BROKERS = [
{
"name": "Europe (LetsMesh v1)",
"host": "mqtt-eu-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-eu-v1.letsmesh.net",
},
{
"name": "US West (LetsMesh v1)",
"host": "mqtt-us-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-us-v1.letsmesh.net",
},
]
# ====================================================================
# Single Broker Connection Manager
# ====================================================================
class _BrokerConnection:
"""
Manages a single MQTT broker connection with independent lifecycle.
Internal class - not exposed publicly.
"""
def __init__(
self,
broker: dict,
local_identity,
public_key: str,
iata_code: str,
jwt_expiry_minutes: int,
use_tls: bool,
email: str,
owner: str,
on_connect_callback: Optional[Callable] = None,
on_disconnect_callback: Optional[Callable] = None,
):
self.broker = broker
self.local_identity = local_identity
self.public_key = public_key.upper()
self.iata_code = iata_code
self.jwt_expiry_minutes = jwt_expiry_minutes
self.use_tls = use_tls
self.email = email
self.owner = owner
self._on_connect_callback = on_connect_callback
self._on_disconnect_callback = on_disconnect_callback
self._connect_time = None
self._tls_verified = False
self._running = False
self._reconnect_attempts = 0
self._reconnect_timer = None
self._max_reconnect_delay = 300 # 5 minutes max
self._loop_running = False # Track if MQTT loop is active
# MQTT WebSocket client - unique client ID per broker
client_id = f"meshcore_{self.public_key}_{broker['host']}"
self.client = mqtt.Client(client_id=client_id, transport="websockets")
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
def _generate_jwt(self) -> str:
"""Generate MeshCore-style Ed25519 JWT token"""
now = datetime.now(UTC)
header = {"alg": "Ed25519", "typ": "JWT"}
payload = {
"publicKey": self.public_key.upper(),
"aud": self.broker["audience"],
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()),
}
# Only include email/owner for verified TLS connections
if self.use_tls and self._tls_verified and (self.email or self.owner):
payload["email"] = self.email
payload["owner"] = self.owner
else:
payload["email"] = ""
payload["owner"] = ""
# Encode header and payload (compact JSON - no spaces)
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
# Sign using LocalIdentity (supports both standard and firmware keys)
try:
signature = self.local_identity.sign(signing_input)
except Exception as e:
logging.error(f"JWT signing failed for {self.broker['name']}: {e}")
logging.error(f" - public_key: {self.public_key}")
logging.error(f" - signing_input length: {len(signing_input)}")
raise
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
logging.debug(f"JWT token generated for {self.broker['name']}: {token[:50]}...")
return token
def _on_connect(self, client, userdata, flags, rc):
"""MQTT connection callback"""
if rc == 0:
logging.info(f"Connected to {self.broker['name']}")
self._running = True
self._reconnect_attempts = 0 # Reset counter on success
if self._on_connect_callback:
self._on_connect_callback(self.broker["name"])
else:
error_msg = get_mqtt_error_message(rc, is_disconnect=False)
logging.error(f"Failed to connect to {self.broker['name']}: {error_msg}")
self._schedule_reconnect()
def _on_disconnect(self, client, userdata, rc):
"""MQTT disconnection callback"""
was_running = self._running
self._running = False
if rc != 0: # Unexpected disconnect
logging.warning(f"Disconnected from {self.broker['name']} (rc={rc})")
if was_running: # Only reconnect if we were intentionally connected
self._schedule_reconnect()
else:
logging.info(f"Clean disconnect from {self.broker['name']}")
if self._on_disconnect_callback:
self._on_disconnect_callback(self.broker["name"])
def _schedule_reconnect(self):
"""Schedule reconnection with exponential backoff"""
if self._reconnect_timer:
self._reconnect_timer.cancel()
# Exponential backoff: 5s, 10s, 20s, 40s, 80s, up to max
delay = min(5 * (2 ** self._reconnect_attempts), self._max_reconnect_delay)
self._reconnect_attempts += 1
logging.info(f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts})")
self._reconnect_timer = threading.Timer(delay, self._attempt_reconnect)
self._reconnect_timer.daemon = True
self._reconnect_timer.start()
def _attempt_reconnect(self):
"""Attempt to reconnect to broker"""
try:
logging.info(f"Attempting reconnection to {self.broker['name']}...")
self.refresh_jwt_token() # Refresh token before reconnecting
# Check if loop is still running - restart if needed
if not hasattr(self, '_loop_running') or not self._loop_running:
logging.warning(f"MQTT loop not running for {self.broker['name']}, restarting...")
self.client.loop_start()
self._loop_running = True
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
except Exception as e:
logging.error(f"Reconnection failed for {self.broker['name']}: {e}")
self._schedule_reconnect() # Try again later
def refresh_jwt_token(self):
"""Refresh JWT token for MQTT authentication"""
try:
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
self._connect_time = datetime.now(UTC)
logging.debug(f"JWT token refreshed for {self.broker['name']}")
logging.debug(f"Using username: {username}")
logging.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}")
except Exception as e:
logging.error(f"Failed to generate JWT token for {self.broker['name']}: {e}")
raise
def connect(self):
"""Establish connection to broker"""
# Conditional TLS setup
if self.use_tls:
import ssl
self.client.tls_set(cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS_CLIENT)
self.client.tls_insecure_set(False)
self._tls_verified = True
protocol = "wss"
else:
protocol = "ws"
# Generate and set JWT token
self.refresh_jwt_token()
logging.info(
f"Connecting to {self.broker['name']} "
f"({protocol}://{self.broker['host']}:{self.broker['port']}) ..."
)
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.loop_start()
self._loop_running = True
def disconnect(self):
"""Disconnect from broker"""
self._running = False
self._loop_running = False
# Cancel any pending reconnection
if self._reconnect_timer:
self._reconnect_timer.cancel()
self._reconnect_timer = None
self.client.loop_stop()
self.client.disconnect()
logging.info(f"Disconnected from {self.broker['name']}")
def publish(self, topic: str, payload: str, retain: bool = False):
"""Publish message to broker"""
if self._running:
result = self.client.publish(topic, payload, retain=retain)
return result
return None
def is_connected(self) -> bool:
"""Check if connection is active"""
return self._running
def should_refresh_token(self) -> bool:
"""Check if JWT token needs refresh (at 80% of expiry)"""
if not self._connect_time:
return False
elapsed = (datetime.now(UTC) - self._connect_time).total_seconds()
expiry_seconds = self.jwt_expiry_minutes * 60
return elapsed >= expiry_seconds * 0.8
# ====================================================================
# MeshCore → MQTT Publisher with Ed25519 auth token
# ====================================================================
class MeshCoreToMqttJwtPusher:
def __init__(
self,
local_identity,
config: dict,
jwt_expiry_minutes: int = 10,
use_tls: bool = True,
stats_provider: Optional[Callable[[], dict]] = None,
):
# Store local identity and get public key
self.local_identity = local_identity
public_key = local_identity.get_public_key().hex().upper()
# Extract values from config
from ..config import get_node_info
node_info = get_node_info(config)
iata_code = node_info["iata_code"]
broker_index = node_info.get("broker_index")
self.email = node_info.get("email", "")
self.owner = node_info.get("owner", "")
status_interval = node_info["status_interval"]
node_name = node_info["node_name"]
radio_config = node_info["radio_config"]
# Get additional brokers from config (optional)
letsmesh_config = config.get("letsmesh", {})
additional_brokers = letsmesh_config.get("additional_brokers", [])
# Determine which brokers to connect to
if broker_index == -2:
# Custom brokers only - no built-in brokers
self.brokers = []
logging.info("Custom broker mode: using only user-defined brokers")
elif broker_index is None or broker_index == -1:
# Connect to all built-in brokers + additional ones
self.brokers = LETSMESH_BROKERS.copy()
logging.info(f"Multi-broker mode: connecting to all {len(LETSMESH_BROKERS)} built-in brokers")
else:
# Single broker mode (backward compatibility)
if broker_index >= len(LETSMESH_BROKERS):
raise ValueError(f"Invalid broker_index {broker_index}")
self.brokers = [LETSMESH_BROKERS[broker_index]]
logging.info(f"Single broker mode: connecting to {self.brokers[0]['name']}")
# Add additional brokers from config
if additional_brokers:
for broker_config in additional_brokers:
if all(k in broker_config for k in ["name", "host", "port", "audience"]):
self.brokers.append(broker_config)
logging.info(f"Added custom broker: {broker_config['name']}")
else:
logging.warning(f"Skipping invalid broker config: {broker_config}")
# Validate that we have at least one broker
if not self.brokers:
raise ValueError(
"No brokers configured. Either set broker_index to a valid value "
"or provide additional_brokers in config."
)
self.local_identity = local_identity
self.public_key = public_key
self.iata_code = iata_code
self.jwt_expiry_minutes = jwt_expiry_minutes
self.use_tls = use_tls
self.status_interval = status_interval
self.app_version = __version__
self.node_name = node_name
self.radio_config = radio_config
self.stats_provider = stats_provider
self._status_task = None
self._running = False
self._lock = threading.Lock()
# Create broker connections
self.connections: List[_BrokerConnection] = []
for broker in self.brokers:
conn = _BrokerConnection(
broker=broker,
local_identity=self.local_identity,
public_key=self.public_key,
iata_code=self.iata_code,
jwt_expiry_minutes=self.jwt_expiry_minutes,
use_tls=self.use_tls,
email=self.email,
owner=self.owner,
on_connect_callback=self._on_broker_connected,
on_disconnect_callback=self._on_broker_disconnected,
)
self.connections.append(conn)
logging.info(f"Initialized with {len(self.connections)} broker connection(s)")
def _on_broker_connected(self, broker_name: str):
"""Callback when a broker connects"""
# Publish initial status on first connection
if not self._status_task and self.status_interval > 0:
self._running = True
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
# Start heartbeat thread
self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True)
self._status_task.start()
logging.info(f"Started status heartbeat (interval: {self.status_interval}s)")
def _on_broker_disconnected(self, broker_name: str):
"""Callback when a broker disconnects"""
# Check if all connections are down
all_down = all(not conn.is_connected() for conn in self.connections)
if all_down:
logging.warning("All broker connections lost")
self._running = False
def connect(self):
"""Establish connections to all configured brokers"""
for conn in self.connections:
try:
conn.connect()
except Exception as e:
logging.error(f"Failed to connect to {conn.broker['name']}: {e}")
def disconnect(self):
"""Disconnect from all brokers"""
self._running = False
# Publish offline status before disconnecting
self.publish_status(state="offline", origin=self.node_name, radio_config=self.radio_config)
import time
time.sleep(0.5) # Give time for messages to be sent
# Disconnect all brokers
for conn in self.connections:
try:
conn.disconnect()
except Exception as e:
logging.error(f"Error disconnecting from {conn.broker['name']}: {e}")
logging.info("Disconnected from all brokers")
def _status_heartbeat_loop(self):
"""Background thread that publishes periodic status updates"""
import time
while self._running:
try:
# Refresh JWT tokens for all connections before they expire
for conn in self.connections:
if conn.is_connected() and conn.should_refresh_token():
conn.refresh_jwt_token()
self.publish_status(
state="online", origin=self.node_name, radio_config=self.radio_config
)
logging.debug(f"Status heartbeat sent (next in {self.status_interval}s)")
time.sleep(self.status_interval)
except Exception as e:
logging.error(f"Status heartbeat error: {e}")
time.sleep(self.status_interval)
# ----------------------------------------------------------------
# Packet helpers
# ----------------------------------------------------------------
def _process_packet(self, pkt: dict) -> dict:
return {"timestamp": datetime.now(UTC).isoformat(), "origin_id": self.public_key, **pkt}
def _topic(self, subtopic: str) -> str:
return f"meshcore/{self.iata_code}/{self.public_key}/{subtopic}"
def publish_packet(self, pkt: dict, subtopic="packets", retain=False):
return self.publish(subtopic, self._process_packet(pkt), retain)
def publish_raw_data(self, raw_hex: str, subtopic="raw", retain=False):
pkt = {"type": "raw", "data": raw_hex, "bytes": len(raw_hex) // 2}
return self.publish_packet(pkt, subtopic, retain)
def publish_status(
self,
state: str = "online",
location: Optional[dict] = None,
extra_stats: Optional[dict] = None,
origin: Optional[str] = None,
radio_config: Optional[str] = None,
):
"""
Publish device status/heartbeat message
Args:
state: Device state (online/offline)
location: Optional dict with latitude/longitude
extra_stats: Optional additional statistics to include
origin: Node name/description
radio_config: Radio configuration string (freq,bw,sf,cr)
"""
# Get live stats from provider if available
if self.stats_provider:
live_stats = self.stats_provider()
else:
live_stats = {"uptime_secs": 0, "packets_sent": 0, "packets_received": 0}
status = {
"status": state,
"timestamp": datetime.now(UTC).isoformat(),
"origin": origin or self.node_name,
"origin_id": self.public_key,
"model": "PyMC-Repeater",
"firmware_version": self.app_version,
"radio": radio_config or self.radio_config,
"client_version": f"pyMC_repeater/{self.app_version}",
"stats": {**live_stats, "errors": 0, "queue_len": 0, **(extra_stats or {})},
}
if location:
status["location"] = location
return self.publish("status", status, retain=False)
def publish(self, subtopic: str, payload: dict, retain: bool = False):
"""Publish message to all connected brokers"""
topic = self._topic(subtopic)
message = json.dumps(payload)
results = []
with self._lock:
for conn in self.connections:
if conn.is_connected():
result = conn.publish(topic, message, retain=retain)
results.append((conn.broker["name"], result))
logging.debug(f"Published to {conn.broker['name']}/{topic}")
# Log if no brokers were available
if not results:
logging.warning(f"No active broker connections for publishing to {topic}")
return results
# ====================================================================
# Helper Functions
# ====================================================================
def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
"""
Get human-readable MQTT error message.
Args:
rc: Return code from paho-mqtt
is_disconnect: True if from on_disconnect, False if from on_connect
Returns:
Human-readable error message
"""
if HAS_REASON_CODES:
try:
reason = ReasonCode(rc)
return f"{reason.name}: {reason.value}"
except (ValueError, AttributeError):
pass
# Fallback to manual mappings
connect_errors = {
0: "connection accepted",
1: "incorrect protocol version",
2: "invalid client identifier",
3: "server unavailable",
4: "bad username or password (JWT invalid)",
5: "not authorized (JWT signature/format invalid)",
6: "reserved error code",
}
disconnect_errors = {
0: "normal disconnect",
1: "unacceptable protocol version",
2: "identifier rejected",
3: "server unavailable",
4: "bad username or password",
5: "not authorized",
16: "connection lost / protocol error",
17: "client timeout",
}
error_dict = disconnect_errors if is_disconnect else connect_errors
return error_dict.get(rc, f"unknown error code {rc}")