diff --git a/.env.example b/.env.example index a78e708..525edf4 100644 --- a/.env.example +++ b/.env.example @@ -22,6 +22,10 @@ CLICKHOUSE_READONLY_PASSWORD=readonly # with an error if this is empty, so configure at least one broker. MQTT_BROKERS=[{"url":"tcp://mqtt.example.com:1883","username":"CHANGE_ME","password":"CHANGE_ME","topics":["meshcore/#"]}] MQTT_CLIENT_ID=meshcore-ingest +# Staleness watchdog: if a broker reports connected but delivers no messages for +# this many seconds, the daemon forces a fresh reconnect + resubscribe. Guards +# against "zombie" connections that survive an upstream broker swap. Default 300. +MQTT_STALE_AFTER_SECONDS=300 # ─── Web app ───────────────────────────────────────────────────────────────── # Base URL for client-side API calls. Leave empty to use relative URLs. diff --git a/ingest/internal/ingestcommon/ingest.go b/ingest/internal/ingestcommon/ingest.go index 560db22..6395c81 100644 --- a/ingest/internal/ingestcommon/ingest.go +++ b/ingest/internal/ingestcommon/ingest.go @@ -7,6 +7,7 @@ import ( "math" "os" "strconv" + "sync" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -75,6 +76,12 @@ type Daemon struct { Ctx context.Context Cancel context.CancelFunc MessageHandler MessageHandler + + // mu guards BrokerStatus and lastActivity, which are read/written from both + // the connection-monitor goroutine and paho's callback goroutines. + mu sync.Mutex + lastActivity map[string]time.Time + staleAfter time.Duration } func NewDaemon(config *Config, handler MessageHandler) *Daemon { @@ -85,18 +92,54 @@ func NewDaemon(config *Config, handler MessageHandler) *Daemon { Ctx: ctx, Cancel: cancel, MessageHandler: handler, + lastActivity: make(map[string]time.Time), + staleAfter: time.Duration(GetEnvIntOrDefault("MQTT_STALE_AFTER_SECONDS", 300)) * time.Second, } } -func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, maxRetries int) (mqtt.Client, error) { +// recordActivity resets a broker's liveness clock. Called on every received +// message and on every (re)connect. +func (d *Daemon) recordActivity(brokerURL string) { + d.mu.Lock() + d.lastActivity[brokerURL] = time.Now() + d.mu.Unlock() +} + +// isStale reports whether a broker has produced no traffic within the staleness +// window. A client that reports connected but is nonetheless stale is a zombie +// (connected but no longer delivering messages) — the failure the watchdog in +// checkAndReconnectBrokers exists to recover from. +func (d *Daemon) isStale(brokerURL string) bool { + d.mu.Lock() + defer d.mu.Unlock() + last, ok := d.lastActivity[brokerURL] + if !ok { + return false + } + return time.Since(last) > d.staleAfter +} + +func (d *Daemon) setBrokerStatus(brokerURL string, up bool) { + d.mu.Lock() + d.BrokerStatus[brokerURL] = up + d.mu.Unlock() +} + +func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, idx int, maxRetries int) (mqtt.Client, error) { baseDelay := time.Second maxDelay := 30 * time.Second for attempt := 0; attempt <= maxRetries; attempt++ { opts := mqtt.NewClientOptions() opts.AddBroker(broker.URL) - opts.SetClientID(fmt.Sprintf("%s-%d", d.Config.MQTTClientID, attempt)) + // Stable, per-broker client ID so reconnects and forced rebuilds reuse the + // same MQTT session identity rather than spawning a "-0" client that races + // the one paho is already auto-reconnecting. + opts.SetClientID(fmt.Sprintf("%s-%d", d.Config.MQTTClientID, idx)) opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) { + // Any received message proves the subscription is live; record it so the + // staleness watchdog can distinguish a healthy connection from a zombie. + d.recordActivity(broker.URL) d.MessageHandler(client, msg, d) }) opts.SetAutoReconnect(true) @@ -105,15 +148,36 @@ func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, maxRetries int) (mqtt. opts.SetMaxReconnectInterval(30 * time.Second) opts.SetKeepAlive(30 * time.Second) opts.SetPingTimeout(10 * time.Second) - opts.SetCleanSession(false) - opts.SetResumeSubs(true) + // Start a fresh session on every (re)connect and resubscribe explicitly in + // the OnConnect handler below. This avoids depending on broker-side session + // persistence — which is exactly what left the client connected-but- + // unsubscribed (a zombie, ingesting nothing) after the upstream broker was + // replaced. + opts.SetCleanSession(true) + opts.SetResumeSubs(false) opts.SetOnConnectHandler(func(client mqtt.Client) { - zap.L().Debug("Connected to MQTT broker", zap.String("broker", broker.URL)) + zap.L().Info("Connected to MQTT broker", zap.String("broker", broker.URL)) + // Reset the staleness clock and (re)subscribe on every connect, including + // paho's automatic reconnects, so a reconnect always restores delivery. + d.recordActivity(broker.URL) + d.setBrokerStatus(broker.URL, true) + for _, topic := range broker.Topics { + if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil { + zap.L().Warn("Failed to subscribe to topic", + zap.String("topic", topic), + zap.String("broker", broker.URL), + zap.Error(token.Error())) + } else { + zap.L().Info("Subscribed to topic", + zap.String("topic", topic), + zap.String("broker", broker.URL)) + } + } }) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { zap.L().Warn("Connection lost to MQTT broker", zap.String("broker", broker.URL), zap.Error(err)) - d.BrokerStatus[broker.URL] = false + d.setBrokerStatus(broker.URL, false) }) if broker.Username != "" { @@ -143,33 +207,11 @@ func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, maxRetries int) (mqtt. return nil, fmt.Errorf("failed to connect to MQTT broker %s after %d attempts: %w", broker.URL, maxRetries+1, token.Error()) } - for _, topic := range broker.Topics { - if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil { - zap.L().Warn("Failed to subscribe to topic", - zap.String("topic", topic), - zap.String("broker", broker.URL), - zap.Int("attempt", attempt+1), - zap.Int("maxRetries", maxRetries+1), - zap.Error(token.Error())) - - if attempt < maxRetries { - client.Disconnect(250) - delay := time.Duration(float64(baseDelay) * math.Pow(2, float64(attempt))) - if delay > maxDelay { - delay = maxDelay - } - zap.L().Debug("Retrying subscription", zap.String("broker", broker.URL), zap.Duration("delay", delay)) - time.Sleep(delay) - continue - } - client.Disconnect(250) - return nil, fmt.Errorf("failed to subscribe to topic %s on broker %s after %d attempts: %w", topic, broker.URL, maxRetries+1, token.Error()) - } - } - - zap.L().Debug("Successfully connected to MQTT broker and subscribed to topics", - zap.Strings("topics", broker.Topics), - zap.String("broker", broker.URL)) + // Subscriptions are established in the OnConnect handler so they are + // re-established on every reconnect, not just on this initial connect. + // Seed the liveness clock so a brand-new connection gets a full staleness + // window before the watchdog can judge it. + d.recordActivity(broker.URL) return client, nil } return nil, fmt.Errorf("unexpected error in connectToBroker for %s", broker.URL) @@ -182,15 +224,21 @@ func (d *Daemon) ConnectMQTT() error { zap.L().Debug("Attempting to connect to MQTT brokers", zap.Int("totalBrokers", totalBrokers)) - for _, broker := range d.Config.MQTTBrokers { - client, err := d.connectToBroker(broker, maxRetries) + // Pre-size MQTTClients so each slot stays aligned with its broker index even + // when some brokers fail to connect (a failed broker leaves a nil slot that the + // monitor retries). The old append-only approach misaligned indices whenever an + // earlier broker failed. + d.MQTTClients = make([]mqtt.Client, totalBrokers) + + for i, broker := range d.Config.MQTTBrokers { + client, err := d.connectToBroker(broker, i, maxRetries) if err != nil { zap.L().Warn("Failed to connect to broker", zap.String("broker", broker.URL), zap.Error(err)) - d.BrokerStatus[broker.URL] = false + d.setBrokerStatus(broker.URL, false) continue } - d.MQTTClients = append(d.MQTTClients, client) - d.BrokerStatus[broker.URL] = true + d.MQTTClients[i] = client + d.setBrokerStatus(broker.URL, true) successfulConnections++ } @@ -198,9 +246,10 @@ func (d *Daemon) ConnectMQTT() error { return fmt.Errorf("failed to connect to any MQTT brokers") } - zap.L().Debug("Successfully connected to MQTT brokers", + zap.L().Info("Connected to MQTT brokers", zap.Int("successfulConnections", successfulConnections), - zap.Int("totalBrokers", totalBrokers)) + zap.Int("totalBrokers", totalBrokers), + zap.Duration("staleAfter", d.staleAfter)) return nil } @@ -248,45 +297,52 @@ func (d *Daemon) MonitorConnections() { } } +// checkAndReconnectBrokers is a watchdog, not the primary reconnection path. +// paho's auto-reconnect handles transient drops and the OnConnect handler +// resubscribes, so this only intervenes to (a) (re)establish brokers that have no +// live client and (b) tear down zombie connections — clients that report connected +// but have gone silent past the staleness window (e.g. after an upstream broker +// swap dropped the subscription). Forcing a full rebuild gets a fresh session and +// resubscribe, which is what actually restores delivery. func (d *Daemon) checkAndReconnectBrokers() { for i, broker := range d.Config.MQTTBrokers { - if i >= len(d.MQTTClients) { - client, err := d.connectToBroker(broker, 3) - if err != nil { - zap.L().Warn("Background reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err)) - continue - } - d.MQTTClients = append(d.MQTTClients, client) - d.BrokerStatus[broker.URL] = true - zap.L().Debug("Successfully reconnected to broker", zap.String("broker", broker.URL)) - continue - } client := d.MQTTClients[i] + if client == nil { - newClient, err := d.connectToBroker(broker, 3) + newClient, err := d.connectToBroker(broker, i, 3) if err != nil { zap.L().Warn("Background reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err)) - d.BrokerStatus[broker.URL] = false + d.setBrokerStatus(broker.URL, false) continue } d.MQTTClients[i] = newClient - d.BrokerStatus[broker.URL] = true - zap.L().Debug("Successfully reconnected to broker", zap.String("broker", broker.URL)) + d.setBrokerStatus(broker.URL, true) + zap.L().Info("Established connection to broker", zap.String("broker", broker.URL)) continue } - if !client.IsConnected() { - d.BrokerStatus[broker.URL] = false - newClient, err := d.connectToBroker(broker, 3) + + if d.isStale(broker.URL) { + zap.L().Warn("No messages received within staleness window; forcing reconnect", + zap.String("broker", broker.URL), + zap.Bool("reportedConnected", client.IsConnected()), + zap.Duration("staleAfter", d.staleAfter)) + // Explicitly disconnect so the old client stops auto-reconnecting and + // does not race the replacement under the same client ID. + client.Disconnect(250) + d.setBrokerStatus(broker.URL, false) + newClient, err := d.connectToBroker(broker, i, 3) if err != nil { - zap.L().Warn("Background reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err)) + zap.L().Warn("Forced reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err)) + d.MQTTClients[i] = nil continue } d.MQTTClients[i] = newClient - d.BrokerStatus[broker.URL] = true - zap.L().Debug("Successfully reconnected to broker", zap.String("broker", broker.URL)) - } else { - d.BrokerStatus[broker.URL] = true + d.recordActivity(broker.URL) + d.setBrokerStatus(broker.URL, true) + continue } + + d.setBrokerStatus(broker.URL, client.IsConnected()) } } diff --git a/ingest/internal/ingestcommon/ingest_test.go b/ingest/internal/ingestcommon/ingest_test.go new file mode 100644 index 0000000..5ec14ef --- /dev/null +++ b/ingest/internal/ingestcommon/ingest_test.go @@ -0,0 +1,65 @@ +package ingestcommon + +import ( + "testing" + "time" +) + +func TestIsStale(t *testing.T) { + d := NewDaemon(&Config{}, nil) + d.staleAfter = 5 * time.Minute + const url = "wss://broker.example:443" + + // A broker we have never heard from is not considered stale: there is no + // activity baseline yet, and connectToBroker seeds one on a successful connect. + if d.isStale(url) { + t.Fatalf("broker with no recorded activity should not be stale") + } + + // Fresh activity -> healthy. + d.recordActivity(url) + if d.isStale(url) { + t.Fatalf("broker with recent activity should not be stale") + } + + // Activity older than the staleness window -> zombie. + d.mu.Lock() + d.lastActivity[url] = time.Now().Add(-10 * time.Minute) + d.mu.Unlock() + if !d.isStale(url) { + t.Fatalf("broker silent for longer than staleAfter should be stale") + } + + // recordActivity clears staleness again. + d.recordActivity(url) + if d.isStale(url) { + t.Fatalf("recordActivity should reset the staleness clock") + } +} + +func TestNewDaemonStaleAfterDefault(t *testing.T) { + d := NewDaemon(&Config{}, nil) + if d.staleAfter != 300*time.Second { + t.Fatalf("expected default staleAfter of 300s, got %s", d.staleAfter) + } + if d.lastActivity == nil { + t.Fatalf("lastActivity map should be initialized") + } +} + +func TestNewDaemonStaleAfterFromEnv(t *testing.T) { + t.Setenv("MQTT_STALE_AFTER_SECONDS", "42") + d := NewDaemon(&Config{}, nil) + if d.staleAfter != 42*time.Second { + t.Fatalf("expected staleAfter of 42s from env, got %s", d.staleAfter) + } +} + +func TestSetBrokerStatus(t *testing.T) { + d := NewDaemon(&Config{}, nil) + d.setBrokerStatus("a", true) + d.setBrokerStatus("b", false) + if !d.BrokerStatus["a"] || d.BrokerStatus["b"] { + t.Fatalf("setBrokerStatus did not record expected values: %#v", d.BrokerStatus) + } +}