diff --git a/mqtt/client.go b/mqtt/client.go index 68a7e35..9931d8e 100644 --- a/mqtt/client.go +++ b/mqtt/client.go @@ -112,8 +112,8 @@ func (c *Client) Connect() error { opts.SetKeepAlive(time.Duration(keepAlive) * time.Second) opts.SetConnectTimeout(connectTimeout) opts.SetPingTimeout(pingTimeout) - opts.SetConnectRetryInterval(1 * time.Second) // Start with 1 second retry - opts.SetMaxReconnectInterval(1 * time.Minute) // Cap at 1 minute instead of 5 + opts.SetConnectRetryInterval(1 * time.Second) // Start with 1 second retry + opts.SetMaxReconnectInterval(1 * time.Minute) // Cap at 1 minute instead of 5 opts.SetAutoReconnect(true) opts.SetCleanSession(true) opts.SetOrderMatters(false) @@ -135,11 +135,11 @@ func (c *Client) Connect() error { c.connectionMutex.Lock() c.isConnected = true c.connectionMutex.Unlock() - + // Start health check c.healthCheckStop = make(chan struct{}) go c.monitorConnectionHealth(30 * time.Second) // Check every 30 seconds - + // Note: We don't need to subscribe to the topic here anymore // as it's now handled in the connectHandler which is called // after each successful connection and reconnection @@ -153,7 +153,7 @@ func (c *Client) Disconnect() { if c.healthCheckStop != nil { close(c.healthCheckStop) } - + close(c.done) token := c.client.Unsubscribe(c.config.Topic) token.Wait() @@ -178,10 +178,10 @@ func (c *Client) IsConnected() bool { func (c *Client) monitorConnectionHealth(interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() - + consecutiveFailures := 0 maxConsecutiveFailures := 3 - + for { select { case <-ticker.C: @@ -191,35 +191,35 @@ func (c *Client) monitorConnectionHealth(interval time.Duration) { "broker", c.config.Broker, "clientID", c.config.ClientID, "consecutiveFailures", consecutiveFailures) - + // Update our internal connection state c.connectionMutex.Lock() c.isConnected = false c.connectionMutex.Unlock() - + // If we've had too many consecutive failures, try to force reconnection if consecutiveFailures >= maxConsecutiveFailures { c.logger.Warnw("Too many consecutive connection failures, forcing reconnection attempt", "consecutiveFailures", consecutiveFailures, "maxFailures", maxConsecutiveFailures) - + // Since we can't force reconnection directly, we can try to disconnect // and let the auto-reconnect logic handle it if c.client.IsConnectionOpen() { c.client.Disconnect(100) } - + consecutiveFailures = 0 } } else { // Reset failure counter when connection is good if consecutiveFailures > 0 { - c.logger.Infow("MQTT connection restored", + c.logger.Infow("MQTT connection restored", "broker", c.config.Broker, "clientID", c.config.ClientID) consecutiveFailures = 0 } - + // Update our internal connection state c.connectionMutex.Lock() c.isConnected = true @@ -283,7 +283,7 @@ func (c *Client) connectHandler(client mqtt.Client) { "broker", c.config.Broker, "clientID", c.config.ClientID, "topic", c.config.Topic) - + // Subscribe to the configured topic after each reconnection token := client.Subscribe(c.config.Topic, 0, nil) if token.Wait() && token.Error() != nil { @@ -302,7 +302,7 @@ func (c *Client) connectionLostHandler(client mqtt.Client, err error) { "errorType", fmt.Sprintf("%T", err), "broker", c.config.Broker, "clientID", c.config.ClientID) - + // Update connection status c.connectionMutex.Lock() c.isConnected = false