From 8b95bfb8badec4c597414b6c4d4270eea4ebe22f Mon Sep 17 00:00:00 2001 From: Lloyd Date: Tue, 20 Jan 2026 20:31:45 +0000 Subject: [PATCH] feat: enhance MQTT error handling and add packet stats broadcasting --- repeater/data_acquisition/letsmesh_handler.py | 94 +++++++++++++++++-- .../data_acquisition/storage_collector.py | 10 +- 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/repeater/data_acquisition/letsmesh_handler.py b/repeater/data_acquisition/letsmesh_handler.py index 7302da5..e3e83d2 100644 --- a/repeater/data_acquisition/letsmesh_handler.py +++ b/repeater/data_acquisition/letsmesh_handler.py @@ -11,6 +11,14 @@ from typing import Callable, Optional, List, Dict from .. import __version__ +# Try to import paho-mqtt error code mappings +try: + from paho.mqtt.reasoncodes import ReasonCode + HAS_REASON_CODES = True +except ImportError: + HAS_REASON_CODES = False + + # -------------------------------------------------------------------- # Helper: Base64URL without padding # -------------------------------------------------------------------- @@ -75,6 +83,7 @@ class _BrokerConnection: self._reconnect_attempts = 0 self._reconnect_timer = None self._max_reconnect_delay = 300 # 5 minutes max + self._loop_running = False # Track if MQTT loop is active # MQTT WebSocket client - unique client ID per broker client_id = f"meshcore_{self.public_key}_{broker['host']}" @@ -110,9 +119,18 @@ class _BrokerConnection: signing_input = f"{header_b64}.{payload_b64}".encode() # Sign using LocalIdentity (supports both standard and firmware keys) - signature = self.local_identity.sign(signing_input) + try: + signature = self.local_identity.sign(signing_input) + except Exception as e: + logging.error(f"JWT signing failed for {self.broker['name']}: {e}") + logging.error(f" - public_key: {self.public_key}") + logging.error(f" - signing_input length: {len(signing_input)}") + raise + signature_hex = binascii.hexlify(signature).decode() token = f"{header_b64}.{payload_b64}.{signature_hex}" + + logging.debug(f"JWT token generated for {self.broker['name']}: {token[:50]}...") return token @@ -125,7 +143,8 @@ class _BrokerConnection: if self._on_connect_callback: self._on_connect_callback(self.broker["name"]) else: - logging.error(f"Failed to connect to {self.broker['name']} (rc={rc})") + error_msg = get_mqtt_error_message(rc, is_disconnect=False) + logging.error(f"Failed to connect to {self.broker['name']}: {error_msg}") self._schedule_reconnect() def _on_disconnect(self, client, userdata, rc): @@ -162,6 +181,11 @@ class _BrokerConnection: try: logging.info(f"Attempting reconnection to {self.broker['name']}...") self.refresh_jwt_token() # Refresh token before reconnecting + # Check if loop is still running - restart if needed + if not hasattr(self, '_loop_running') or not self._loop_running: + logging.warning(f"MQTT loop not running for {self.broker['name']}, restarting...") + self.client.loop_start() + self._loop_running = True self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) except Exception as e: logging.error(f"Reconnection failed for {self.broker['name']}: {e}") @@ -169,11 +193,17 @@ class _BrokerConnection: def refresh_jwt_token(self): """Refresh JWT token for MQTT authentication""" - token = self._generate_jwt() - username = f"v1_{self.public_key}" - self.client.username_pw_set(username=username, password=token) - self._connect_time = datetime.now(UTC) - logging.debug(f"JWT token refreshed for {self.broker['name']}") + try: + token = self._generate_jwt() + username = f"v1_{self.public_key}" + self.client.username_pw_set(username=username, password=token) + self._connect_time = datetime.now(UTC) + logging.debug(f"JWT token refreshed for {self.broker['name']}") + logging.debug(f"Using username: {username}") + logging.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}") + except Exception as e: + logging.error(f"Failed to generate JWT token for {self.broker['name']}: {e}") + raise def connect(self): """Establish connection to broker""" @@ -198,10 +228,12 @@ class _BrokerConnection: self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) self.client.loop_start() + self._loop_running = True def disconnect(self): """Disconnect from broker""" self._running = False + self._loop_running = False # Cancel any pending reconnection if self._reconnect_timer: @@ -475,3 +507,51 @@ class MeshCoreToMqttJwtPusher: return results + +# ==================================================================== +# Helper Functions +# ==================================================================== + +def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str: + """ + Get human-readable MQTT error message. + + Args: + rc: Return code from paho-mqtt + is_disconnect: True if from on_disconnect, False if from on_connect + + Returns: + Human-readable error message + """ + if HAS_REASON_CODES: + try: + reason = ReasonCode(rc) + return f"{reason.name}: {reason.value}" + except (ValueError, AttributeError): + pass + + # Fallback to manual mappings + connect_errors = { + 0: "connection accepted", + 1: "incorrect protocol version", + 2: "invalid client identifier", + 3: "server unavailable", + 4: "bad username or password (JWT invalid)", + 5: "not authorized (JWT signature/format invalid)", + 6: "reserved error code", + } + + disconnect_errors = { + 0: "normal disconnect", + 1: "unacceptable protocol version", + 2: "identifier rejected", + 3: "server unavailable", + 4: "bad username or password", + 5: "not authorized", + 16: "connection lost / protocol error", + 17: "client timeout", + } + + error_dict = disconnect_errors if is_disconnect else connect_errors + return error_dict.get(rc, f"unknown error code {rc}") + diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index a2ccee2..241e53b 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -146,14 +146,12 @@ class StorageCollector: try: self.websocket_broadcast_packet(packet_record) - # Also broadcast lightweight stats update + # Broadcast 24-hour packet stats (same as /api/packet_stats?hours=24) + packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24) uptime_seconds = time.time() - self.repeater_handler.start_time if self.repeater_handler else 0 + self.websocket_broadcast_stats({ - "packet_stats": { - "total_packets": cumulative_counts.get("rx_total", 0), - "transmitted_packets": cumulative_counts.get("tx_total", 0), - "dropped_packets": cumulative_counts.get("drop_total", 0), - }, + "packet_stats": packet_stats_24h, "system_stats": { "uptime_seconds": uptime_seconds, }