feat: enhance MQTT error handling and add packet stats broadcasting

This commit is contained in:
Lloyd
2026-01-20 20:31:45 +00:00
parent 33e353d215
commit 8b95bfb8ba
2 changed files with 91 additions and 13 deletions

View File

@@ -11,6 +11,14 @@ from typing import Callable, Optional, List, Dict
from .. import __version__
# Try to import paho-mqtt error code mappings
try:
from paho.mqtt.reasoncodes import ReasonCode
HAS_REASON_CODES = True
except ImportError:
HAS_REASON_CODES = False
# --------------------------------------------------------------------
# Helper: Base64URL without padding
# --------------------------------------------------------------------
@@ -75,6 +83,7 @@ class _BrokerConnection:
self._reconnect_attempts = 0
self._reconnect_timer = None
self._max_reconnect_delay = 300 # 5 minutes max
self._loop_running = False # Track if MQTT loop is active
# MQTT WebSocket client - unique client ID per broker
client_id = f"meshcore_{self.public_key}_{broker['host']}"
@@ -110,9 +119,18 @@ class _BrokerConnection:
signing_input = f"{header_b64}.{payload_b64}".encode()
# Sign using LocalIdentity (supports both standard and firmware keys)
signature = self.local_identity.sign(signing_input)
try:
signature = self.local_identity.sign(signing_input)
except Exception as e:
logging.error(f"JWT signing failed for {self.broker['name']}: {e}")
logging.error(f" - public_key: {self.public_key}")
logging.error(f" - signing_input length: {len(signing_input)}")
raise
signature_hex = binascii.hexlify(signature).decode()
token = f"{header_b64}.{payload_b64}.{signature_hex}"
logging.debug(f"JWT token generated for {self.broker['name']}: {token[:50]}...")
return token
@@ -125,7 +143,8 @@ class _BrokerConnection:
if self._on_connect_callback:
self._on_connect_callback(self.broker["name"])
else:
logging.error(f"Failed to connect to {self.broker['name']} (rc={rc})")
error_msg = get_mqtt_error_message(rc, is_disconnect=False)
logging.error(f"Failed to connect to {self.broker['name']}: {error_msg}")
self._schedule_reconnect()
def _on_disconnect(self, client, userdata, rc):
@@ -162,6 +181,11 @@ class _BrokerConnection:
try:
logging.info(f"Attempting reconnection to {self.broker['name']}...")
self.refresh_jwt_token() # Refresh token before reconnecting
# Check if loop is still running - restart if needed
if not hasattr(self, '_loop_running') or not self._loop_running:
logging.warning(f"MQTT loop not running for {self.broker['name']}, restarting...")
self.client.loop_start()
self._loop_running = True
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
except Exception as e:
logging.error(f"Reconnection failed for {self.broker['name']}: {e}")
@@ -169,11 +193,17 @@ class _BrokerConnection:
def refresh_jwt_token(self):
"""Refresh JWT token for MQTT authentication"""
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
self._connect_time = datetime.now(UTC)
logging.debug(f"JWT token refreshed for {self.broker['name']}")
try:
token = self._generate_jwt()
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
self._connect_time = datetime.now(UTC)
logging.debug(f"JWT token refreshed for {self.broker['name']}")
logging.debug(f"Using username: {username}")
logging.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}")
except Exception as e:
logging.error(f"Failed to generate JWT token for {self.broker['name']}: {e}")
raise
def connect(self):
"""Establish connection to broker"""
@@ -198,10 +228,12 @@ class _BrokerConnection:
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.loop_start()
self._loop_running = True
def disconnect(self):
"""Disconnect from broker"""
self._running = False
self._loop_running = False
# Cancel any pending reconnection
if self._reconnect_timer:
@@ -475,3 +507,51 @@ class MeshCoreToMqttJwtPusher:
return results
# ====================================================================
# Helper Functions
# ====================================================================
def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
"""
Get human-readable MQTT error message.
Args:
rc: Return code from paho-mqtt
is_disconnect: True if from on_disconnect, False if from on_connect
Returns:
Human-readable error message
"""
if HAS_REASON_CODES:
try:
reason = ReasonCode(rc)
return f"{reason.name}: {reason.value}"
except (ValueError, AttributeError):
pass
# Fallback to manual mappings
connect_errors = {
0: "connection accepted",
1: "incorrect protocol version",
2: "invalid client identifier",
3: "server unavailable",
4: "bad username or password (JWT invalid)",
5: "not authorized (JWT signature/format invalid)",
6: "reserved error code",
}
disconnect_errors = {
0: "normal disconnect",
1: "unacceptable protocol version",
2: "identifier rejected",
3: "server unavailable",
4: "bad username or password",
5: "not authorized",
16: "connection lost / protocol error",
17: "client timeout",
}
error_dict = disconnect_errors if is_disconnect else connect_errors
return error_dict.get(rc, f"unknown error code {rc}")

View File

@@ -146,14 +146,12 @@ class StorageCollector:
try:
self.websocket_broadcast_packet(packet_record)
# Also broadcast lightweight stats update
# Broadcast 24-hour packet stats (same as /api/packet_stats?hours=24)
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
uptime_seconds = time.time() - self.repeater_handler.start_time if self.repeater_handler else 0
self.websocket_broadcast_stats({
"packet_stats": {
"total_packets": cumulative_counts.get("rx_total", 0),
"transmitted_packets": cumulative_counts.get("tx_total", 0),
"dropped_packets": cumulative_counts.get("drop_total", 0),
},
"packet_stats": packet_stats_24h,
"system_stats": {
"uptime_seconds": uptime_seconds,
}