Update mqtt_handler.py

This commit is contained in:
sh4un
2025-11-18 19:30:47 -05:00
committed by GitHub
parent 6e853872ad
commit 371b7459a4

View File

@@ -9,8 +9,12 @@ import time
import json
from queue import Queue, Empty, Full
from typing import Optional, Dict, Any
import paho.mqtt.client as paho_mqtt
# External dependencies
import paho.mqtt.client as paho_mqtt
import paho.mqtt.enums as paho_enums # For MQTTv5 properties if used
# Project dependencies
from .config_handler import BridgeConfig
class MQTTHandler:
@@ -29,7 +33,7 @@ class MQTTHandler:
self._mqtt_connected = threading.Event()
self._lock = threading.Lock()
if not all([config.mqtt_broker, config.mqtt_port is not None, config.mqtt_topic_in, config.mqtt_topic_out, config.mqtt_client_id, config.mqtt_qos is not None]):
if not all([config.mqtt_broker, config.mqtt_port is not None, config.mqtt_topic_in, config.mqtt_topic_out, config.mqtt_client_id, config.mqtt_qos is not None, config.mqtt_retain_out is not None]):
raise ValueError("MQTT transport selected, but required MQTT configuration options seem missing.")
self.logger.info("MQTT Handler Initialized.")
@@ -54,10 +58,13 @@ class MQTTHandler:
self.client = paho_mqtt.Client(client_id=self.config.mqtt_client_id,
protocol=paho_mqtt.MQTTv311,
clean_session=True)
self.logger.info(f"Attempting connection to MQTT broker {self.config.mqtt_broker}:{self.config.mqtt_port}...")
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_message = self._on_message
self.client.on_log = self._on_log
if self.config.mqtt_username:
self.client.username_pw_set(self.config.mqtt_username, self.config.mqtt_password)
@@ -67,7 +74,7 @@ class MQTTHandler:
return True
except Exception as e:
self.logger.error(f"Error initiating MQTT connection: {e}", exc_info=True)
self.logger.error(f"Error initiating MQTT connection or starting loop: {e}", exc_info=True)
if self.client:
try: self.client.loop_stop(force=True)
except: pass
@@ -97,16 +104,19 @@ class MQTTHandler:
self._mqtt_connected.clear()
self.logger.info("MQTT handler stopped.")
# --- MQTT Callbacks (Executed by Paho's Network Thread) ---
def _on_connect(self, client, userdata, flags, rc, properties=None):
connack_str = paho_mqtt.connack_string(rc)
if rc == 0:
self.logger.info(f"Connected to MQTT broker: {self.config.mqtt_broker}")
self.logger.info(f"Successfully connected to MQTT broker: {self.config.mqtt_broker} ({connack_str})")
self._mqtt_connected.set()
try:
client.subscribe(self.config.mqtt_topic_in, qos=self.config.mqtt_qos)
except Exception as e:
self.logger.error(f"Error during MQTT subscription: {e}")
else:
self.logger.error(f"MQTT connection failed. Result code: {rc}")
self.logger.error(f"MQTT connection failed. Result code: {rc} - {connack_str}")
self._mqtt_connected.clear()
def _on_disconnect(self, client, userdata, rc, properties=None):
@@ -171,9 +181,20 @@ class MQTTHandler:
payload_str = json.dumps(item)
topic = self.config.mqtt_topic_out
if not topic:
self.logger.error("MQTT_TOPIC_OUT is not configured. Cannot publish.")
self.to_mqtt_queue.task_done()
continue
qos = self.config.mqtt_qos
if qos not in [0, 1, 2]:
self.logger.error(f"Invalid MQTT_QOS ({qos}) for publishing. Using QoS 0.")
qos = 0
with self._lock:
if self.client and self.client.is_connected():
self.client.publish(topic, payload=payload_str, qos=self.config.mqtt_qos, retain=self.config.mqtt_retain_out)
self.client.publish(topic, payload=payload_str, qos=qos, retain=self.config.mqtt_retain_out)
else:
self._mqtt_connected.clear()