MQTT SetOrderMatters(false)

This commit is contained in:
Daniel Pupius
2025-05-04 16:12:32 -07:00
parent df7df6dbd6
commit 84f8cdfe5c

View File

@@ -19,7 +19,7 @@ type Config struct {
Password string Password string
ClientID string ClientID string
Topic string Topic string
// Connection tuning parameters // Connection tuning parameters
KeepAlive int // Keep alive interval in seconds (default: 60) KeepAlive int // Keep alive interval in seconds (default: 60)
ConnectTimeout time.Duration // Connection timeout (default: 30s) ConnectTimeout time.Duration // Connection timeout (default: 30s)
@@ -55,22 +55,22 @@ func (c *Client) Connect() error {
if c.config.KeepAlive > 0 { if c.config.KeepAlive > 0 {
keepAlive = c.config.KeepAlive keepAlive = c.config.KeepAlive
} }
connectTimeout := 30 * time.Second connectTimeout := 30 * time.Second
if c.config.ConnectTimeout > 0 { if c.config.ConnectTimeout > 0 {
connectTimeout = c.config.ConnectTimeout connectTimeout = c.config.ConnectTimeout
} }
pingTimeout := 10 * time.Second pingTimeout := 10 * time.Second
if c.config.PingTimeout > 0 { if c.config.PingTimeout > 0 {
pingTimeout = c.config.PingTimeout pingTimeout = c.config.PingTimeout
} }
maxReconnectTime := 5 * time.Minute maxReconnectTime := 5 * time.Minute
if c.config.MaxReconnectTime > 0 { if c.config.MaxReconnectTime > 0 {
maxReconnectTime = c.config.MaxReconnectTime maxReconnectTime = c.config.MaxReconnectTime
} }
// Determine protocol and port // Determine protocol and port
protocol := "tcp" protocol := "tcp"
port := 1883 port := 1883
@@ -81,7 +81,7 @@ func (c *Client) Connect() error {
port = c.config.TLSPort port = c.config.TLSPort
} }
} }
// Log detailed connection settings // Log detailed connection settings
c.logger.Infow("Connecting to MQTT broker with settings", c.logger.Infow("Connecting to MQTT broker with settings",
"broker", c.config.Broker, "broker", c.config.Broker,
@@ -110,6 +110,7 @@ func (c *Client) Connect() error {
opts.SetMaxReconnectInterval(maxReconnectTime) opts.SetMaxReconnectInterval(maxReconnectTime)
opts.SetAutoReconnect(true) opts.SetAutoReconnect(true)
opts.SetCleanSession(true) opts.SetCleanSession(true)
opts.SetOrderMatters(false)
opts.OnConnect = c.connectHandler opts.OnConnect = c.connectHandler
opts.OnConnectionLost = c.connectionLostHandler opts.OnConnectionLost = c.connectionLostHandler
opts.OnReconnecting = c.reconnectingHandler opts.OnReconnecting = c.reconnectingHandler
@@ -117,7 +118,7 @@ func (c *Client) Connect() error {
// Create and start the client // Create and start the client
c.client = mqtt.NewClient(opts) c.client = mqtt.NewClient(opts)
if token := c.client.Connect(); token.Wait() && token.Error() != nil { if token := c.client.Connect(); token.Wait() && token.Error() != nil {
c.logger.Errorw("Failed to connect to MQTT broker", c.logger.Errorw("Failed to connect to MQTT broker",
"error", token.Error(), "error", token.Error(),
"broker", c.config.Broker, "broker", c.config.Broker,
"clientID", c.config.ClientID) "clientID", c.config.ClientID)
@@ -133,7 +134,7 @@ func (c *Client) Connect() error {
"topic", c.config.Topic) "topic", c.config.Topic)
return fmt.Errorf("error subscribing to topic %s: %v", c.config.Topic, token.Error()) return fmt.Errorf("error subscribing to topic %s: %v", c.config.Topic, token.Error())
} }
c.logger.Infof("Successfully subscribed to topic: %s", c.config.Topic) c.logger.Infof("Successfully subscribed to topic: %s", c.config.Topic)
return nil return nil
@@ -201,7 +202,7 @@ func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) {
// connectHandler is called when the client connects to the broker // connectHandler is called when the client connects to the broker
func (c *Client) connectHandler(client mqtt.Client) { func (c *Client) connectHandler(client mqtt.Client) {
c.logger.Infow("Connected to MQTT Broker", c.logger.Infow("Connected to MQTT Broker",
"broker", c.config.Broker, "broker", c.config.Broker,
"clientID", c.config.ClientID, "clientID", c.config.ClientID,
"topic", c.config.Topic) "topic", c.config.Topic)
@@ -209,7 +210,7 @@ func (c *Client) connectHandler(client mqtt.Client) {
// connectionLostHandler is called when the client loses connection // connectionLostHandler is called when the client loses connection
func (c *Client) connectionLostHandler(client mqtt.Client, err error) { func (c *Client) connectionLostHandler(client mqtt.Client, err error) {
c.logger.Errorw("Connection lost", c.logger.Errorw("Connection lost",
"error", err, "error", err,
"errorType", fmt.Sprintf("%T", err), "errorType", fmt.Sprintf("%T", err),
"broker", c.config.Broker, "broker", c.config.Broker,