diff --git a/mqtt/client.go b/mqtt/client.go index b0e113e..a2fc27e 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -19,7 +19,7 @@ type Config struct { Password string ClientID string Topic string - + // Connection tuning parameters KeepAlive int // Keep alive interval in seconds (default: 60) ConnectTimeout time.Duration // Connection timeout (default: 30s) @@ -55,22 +55,22 @@ func (c *Client) Connect() error { if c.config.KeepAlive > 0 { keepAlive = c.config.KeepAlive } - + connectTimeout := 30 * time.Second if c.config.ConnectTimeout > 0 { connectTimeout = c.config.ConnectTimeout } - + pingTimeout := 10 * time.Second if c.config.PingTimeout > 0 { pingTimeout = c.config.PingTimeout } - + maxReconnectTime := 5 * time.Minute if c.config.MaxReconnectTime > 0 { maxReconnectTime = c.config.MaxReconnectTime } - + // Determine protocol and port protocol := "tcp" port := 1883 @@ -81,7 +81,7 @@ func (c *Client) Connect() error { port = c.config.TLSPort } } - + // Log detailed connection settings c.logger.Infow("Connecting to MQTT broker with settings", "broker", c.config.Broker, @@ -110,6 +110,7 @@ func (c *Client) Connect() error { opts.SetMaxReconnectInterval(maxReconnectTime) opts.SetAutoReconnect(true) opts.SetCleanSession(true) + opts.SetOrderMatters(false) opts.OnConnect = c.connectHandler opts.OnConnectionLost = c.connectionLostHandler opts.OnReconnecting = c.reconnectingHandler @@ -117,7 +118,7 @@ func (c *Client) Connect() error { // Create and start the client c.client = mqtt.NewClient(opts) if token := c.client.Connect(); token.Wait() && token.Error() != nil { - c.logger.Errorw("Failed to connect to MQTT broker", + c.logger.Errorw("Failed to connect to MQTT broker", "error", token.Error(), "broker", c.config.Broker, "clientID", c.config.ClientID) @@ -133,7 +134,7 @@ func (c *Client) Connect() error { "topic", c.config.Topic) return fmt.Errorf("error subscribing to topic %s: %v", c.config.Topic, token.Error()) } - + c.logger.Infof("Successfully subscribed to topic: %s", c.config.Topic) return nil @@ -201,7 +202,7 @@ func (c *Client) messageHandler(client mqtt.Client, msg mqtt.Message) { // connectHandler is called when the client connects to the broker func (c *Client) connectHandler(client mqtt.Client) { - c.logger.Infow("Connected to MQTT Broker", + c.logger.Infow("Connected to MQTT Broker", "broker", c.config.Broker, "clientID", c.config.ClientID, "topic", c.config.Topic) @@ -209,7 +210,7 @@ func (c *Client) connectHandler(client mqtt.Client) { // connectionLostHandler is called when the client loses connection func (c *Client) connectionLostHandler(client mqtt.Client, err error) { - c.logger.Errorw("Connection lost", + c.logger.Errorw("Connection lost", "error", err, "errorType", fmt.Sprintf("%T", err), "broker", c.config.Broker,