feat: enhance MQTT logging and error handling with payload summaries and improved disconnect messages

This commit is contained in:
Lloyd
2026-05-06 09:53:44 +01:00
parent dfacfeade8
commit 5b20f5580a
2 changed files with 115 additions and 7 deletions
+69 -7
View File
@@ -5,7 +5,7 @@ import logging
import string
import threading
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
import paho.mqtt.client as mqtt
from nacl.signing import SigningKey
@@ -26,6 +26,7 @@ try:
HAS_REASON_CODES = True
except ImportError:
ReasonCode = None
HAS_REASON_CODES = False
logger = logging.getLogger("MQTTHandler")
@@ -38,6 +39,55 @@ def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
def _summarize_payload_for_log(payload: Any, message: Optional[str] = None) -> str:
"""Return a compact single-line payload summary for debug logging."""
if message is None:
try:
message = json.dumps(payload)
except Exception:
message = str(payload)
if isinstance(payload, list):
item_bits = []
if payload and isinstance(payload[0], dict):
item_bits.append(_summarize_payload_for_log(payload[0]))
return f"items={len(payload)}, bytes={len(message.encode('utf-8'))}" + (
f", first={{ {item_bits[0]} }}" if item_bits else ""
)
if not isinstance(payload, dict):
return f"type={type(payload).__name__}, bytes={len(message.encode('utf-8'))}"
summary_fields = []
for key in ("type", "packet_type", "route", "origin", "status", "hash", "RSSI", "SNR"):
value = payload.get(key)
if value not in (None, ""):
summary_fields.append(f"{key}={value}")
if "len" in payload:
summary_fields.append(f"len={payload['len']}")
if "payload_len" in payload:
summary_fields.append(f"payload_len={payload['payload_len']}")
if "raw" in payload:
summary_fields.append(f"raw_bytes={len(str(payload['raw'])) // 2}")
if not summary_fields:
keys = ",".join(sorted(payload.keys())[:6])
if len(payload) > 6:
keys += ",..."
summary_fields.append(f"keys={keys}")
summary_fields.append(f"bytes={len(message.encode('utf-8'))}")
return ", ".join(summary_fields)
def _truncate_middle(text: str, prefix: int = 24, suffix: int = 16) -> str:
"""Truncate long strings by keeping the beginning and end visible."""
if len(text) <= prefix + suffix + 5:
return text
return f"{text[:prefix]} ... {text[-suffix:]}"
# --------------------------------------------------------------------
# Format family
# --------------------------------------------------------------------
@@ -405,9 +455,13 @@ class _BrokerConnection:
retain = self.retain_status
qos = 1 if self.retain_status else 0
logger.debug(f"Publishing to topic '{self.base_topic}/{subtopic}' with payload: [{payload}]. Running={self._running}. Retain={retain}, QoS={qos}")
full_topic = f"{self.base_topic}/{subtopic}"
logger.debug(
f"Publishing topic='{_truncate_middle(full_topic)}', bytes={len(payload.encode('utf-8'))}, "
f"running={self._running}, retain={retain}, qos={qos}"
)
if self._running:
result = self.client.publish(f"{self.base_topic}/{subtopic}", payload, retain=retain, qos=qos)
result = self.client.publish(full_topic, payload, retain=retain, qos=qos)
return result
else:
logger.warning(f"Cannot publish to {self.broker['name']} - not connected")
@@ -820,7 +874,7 @@ class MeshCoreToMqttPusher:
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
logger.debug(f"Publishing to topic '{subtopic}' with payload: {message}")
logger.debug(f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}")
packet_type = payload.get("type")
@@ -854,7 +908,7 @@ class MeshCoreToMqttPusher:
message = json.dumps(payload)
# _BrokerConnection now handles topic prefixing, so we only log the subtopic here
logger.debug(f"Publishing to topic '{subtopic}' with payload: {message}")
logger.debug(f"Publishing topic='{subtopic}', {_summarize_payload_for_log(payload, message)}")
results = []
with self._lock:
@@ -933,7 +987,7 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
4: "Bad username or password",
5: "Not authorized",
7: "Connection lost / network error",
16: "Connection lost / protocol error",
16: "The connection was lost.",
17: "Client timeout",
# MQTT v5 codes
4: "Disconnect with Will message",
@@ -966,7 +1020,7 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
162: "Wildcard subscriptions not supported",
}
if HAS_REASON_CODES:
if HAS_REASON_CODES and ReasonCode is not None:
try:
reason = ReasonCode(mqtt.CONNACK if not is_disconnect else mqtt.DISCONNECT, identifier=rc)
@@ -978,5 +1032,13 @@ def get_mqtt_error_message(rc: int, is_disconnect: bool = False) -> str:
if _fallback is None:
logger.debug(f"Could not decode reason code {rc}: {e}")
if is_disconnect:
try:
paho_error = mqtt.error_string(rc)
if paho_error and paho_error != "Unknown error.":
return paho_error
except Exception:
pass
error_dict = disconnect_errors if is_disconnect else connect_errors
return error_dict.get(rc, f"Unknown error code {rc}")
+46
View File
@@ -14,6 +14,9 @@ from repeater.data_acquisition.mqtt_handler import (
_BrokerConnection,
_expand_preset_entries,
_merge_overrides_by_name,
_summarize_payload_for_log,
_truncate_middle,
get_mqtt_error_message,
)
from repeater.presets import get_preset, list_presets
@@ -166,3 +169,46 @@ def test_legacy_letsmesh_block_migrates_to_preset_for_each_broker_index(
assert broker["enabled"] is False, f"{name} should be disabled for index {broker_index}"
else:
assert broker["enabled"] is True, f"{name} should be enabled for index {broker_index}"
def test_disconnect_error_message_uses_paho_legacy_connection_lost_string():
"""Legacy paho disconnect rc=16 should not be mislabeled as a protocol error."""
assert get_mqtt_error_message(16, is_disconnect=True) == "The connection was lost."
def test_disconnect_error_message_preserves_mqtt_v5_reason_codes():
"""Real MQTT v5 disconnect reason codes should still decode to their reason names."""
assert get_mqtt_error_message(130, is_disconnect=True) == "Protocol error (code 130)"
def test_payload_summary_omits_full_raw_dump_for_packet_logs():
"""MQTT debug logging should summarize packet payloads instead of dumping JSON blobs."""
payload = {
"type": "PACKET",
"packet_type": "4",
"route": "F",
"origin": "NWTBASE02",
"len": "120",
"payload_len": "115",
"raw": "aa" * 120,
"hash": "DD63C8077B5912FC",
}
summary = _summarize_payload_for_log(payload)
assert "type=PACKET" in summary
assert "route=F" in summary
assert "raw_bytes=120" in summary
assert '"raw"' not in summary
assert "aa" * 20 not in summary
def test_truncate_middle_preserves_topic_prefix_and_suffix():
"""Long MQTT topics should keep both routing context and the final path segment visible."""
topic = "meshcore/BOH/BEEF2F7F8632ADE3461D42D1653A0229310E424C37324A6768071A629DFDAA32/packets"
truncated = _truncate_middle(topic)
assert truncated.startswith("meshcore/BOH/BEEF2F7F863")
assert truncated.endswith("9DFDAA32/packets")
assert " ... " in truncated