ingest: resubscribe on reconnect + staleness watchdog for zombie MQTT conns (#38)

The letsmesh broker was migrated behind Cloudflare and changed its topic
layout on 2026-06-02, which left prod's MQTT client in a zombie state:
connected per paho's IsConnected() (so the 30s monitor never rebuilt it) but
receiving zero messages, because the subscription was established only once
after the initial connect and never re-applied on paho auto-reconnects. Result:
12 days of silently missing letsmesh ingestion while davekeogh masked the loss.

Make reconnection robust instead of relying on broker-side session persistence:

- Subscribe inside the OnConnect handler so every (re)connect — including paho
  auto-reconnects — restores delivery. Use CleanSession(true)+ResumeSubs(false)
  so we never depend on the broker remembering our session.
- Add a per-broker data-staleness watchdog: a broker that reports connected but
  delivers no messages for MQTT_STALE_AFTER_SECONDS (default 300) is treated as a
  zombie and force-rebuilt (disconnect + fresh connect/subscribe). This catches
  exactly the failure IsConnected() misses.
- Reduce the external monitor to that watchdog role; transient drops are left to
  paho auto-reconnect rather than racing it with a brand-new client.
- Stable per-broker client IDs (by index) and pre-sized MQTTClients slice so
  indices stay aligned when an earlier broker fails; guard BrokerStatus/lastActivity
  with a mutex; promote connect/subscribe logs to Info for visibility.

Adds unit tests for the watchdog and env parsing; documents the new env var.

Co-authored-by: Alex Vanderpot <alex@Alexs-MacBook-Pro-2.local>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Vanderpot
2026-06-14 11:55:33 -04:00
committed by GitHub
parent a0ab900da3
commit 72aa6be3d3
3 changed files with 188 additions and 63 deletions
+4
View File
@@ -22,6 +22,10 @@ CLICKHOUSE_READONLY_PASSWORD=readonly
# with an error if this is empty, so configure at least one broker.
MQTT_BROKERS=[{"url":"tcp://mqtt.example.com:1883","username":"CHANGE_ME","password":"CHANGE_ME","topics":["meshcore/#"]}]
MQTT_CLIENT_ID=meshcore-ingest
# Staleness watchdog: if a broker reports connected but delivers no messages for
# this many seconds, the daemon forces a fresh reconnect + resubscribe. Guards
# against "zombie" connections that survive an upstream broker swap. Default 300.
MQTT_STALE_AFTER_SECONDS=300
# ─── Web app ─────────────────────────────────────────────────────────────────
# Base URL for client-side API calls. Leave empty to use relative URLs.
+119 -63
View File
@@ -7,6 +7,7 @@ import (
"math"
"os"
"strconv"
"sync"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
@@ -75,6 +76,12 @@ type Daemon struct {
Ctx context.Context
Cancel context.CancelFunc
MessageHandler MessageHandler
// mu guards BrokerStatus and lastActivity, which are read/written from both
// the connection-monitor goroutine and paho's callback goroutines.
mu sync.Mutex
lastActivity map[string]time.Time
staleAfter time.Duration
}
func NewDaemon(config *Config, handler MessageHandler) *Daemon {
@@ -85,18 +92,54 @@ func NewDaemon(config *Config, handler MessageHandler) *Daemon {
Ctx: ctx,
Cancel: cancel,
MessageHandler: handler,
lastActivity: make(map[string]time.Time),
staleAfter: time.Duration(GetEnvIntOrDefault("MQTT_STALE_AFTER_SECONDS", 300)) * time.Second,
}
}
func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, maxRetries int) (mqtt.Client, error) {
// recordActivity resets a broker's liveness clock. Called on every received
// message and on every (re)connect.
func (d *Daemon) recordActivity(brokerURL string) {
d.mu.Lock()
d.lastActivity[brokerURL] = time.Now()
d.mu.Unlock()
}
// isStale reports whether a broker has produced no traffic within the staleness
// window. A client that reports connected but is nonetheless stale is a zombie
// (connected but no longer delivering messages) — the failure the watchdog in
// checkAndReconnectBrokers exists to recover from.
func (d *Daemon) isStale(brokerURL string) bool {
d.mu.Lock()
defer d.mu.Unlock()
last, ok := d.lastActivity[brokerURL]
if !ok {
return false
}
return time.Since(last) > d.staleAfter
}
func (d *Daemon) setBrokerStatus(brokerURL string, up bool) {
d.mu.Lock()
d.BrokerStatus[brokerURL] = up
d.mu.Unlock()
}
func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, idx int, maxRetries int) (mqtt.Client, error) {
baseDelay := time.Second
maxDelay := 30 * time.Second
for attempt := 0; attempt <= maxRetries; attempt++ {
opts := mqtt.NewClientOptions()
opts.AddBroker(broker.URL)
opts.SetClientID(fmt.Sprintf("%s-%d", d.Config.MQTTClientID, attempt))
// Stable, per-broker client ID so reconnects and forced rebuilds reuse the
// same MQTT session identity rather than spawning a "-0" client that races
// the one paho is already auto-reconnecting.
opts.SetClientID(fmt.Sprintf("%s-%d", d.Config.MQTTClientID, idx))
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
// Any received message proves the subscription is live; record it so the
// staleness watchdog can distinguish a healthy connection from a zombie.
d.recordActivity(broker.URL)
d.MessageHandler(client, msg, d)
})
opts.SetAutoReconnect(true)
@@ -105,15 +148,36 @@ func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, maxRetries int) (mqtt.
opts.SetMaxReconnectInterval(30 * time.Second)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetCleanSession(false)
opts.SetResumeSubs(true)
// Start a fresh session on every (re)connect and resubscribe explicitly in
// the OnConnect handler below. This avoids depending on broker-side session
// persistence — which is exactly what left the client connected-but-
// unsubscribed (a zombie, ingesting nothing) after the upstream broker was
// replaced.
opts.SetCleanSession(true)
opts.SetResumeSubs(false)
opts.SetOnConnectHandler(func(client mqtt.Client) {
zap.L().Debug("Connected to MQTT broker", zap.String("broker", broker.URL))
zap.L().Info("Connected to MQTT broker", zap.String("broker", broker.URL))
// Reset the staleness clock and (re)subscribe on every connect, including
// paho's automatic reconnects, so a reconnect always restores delivery.
d.recordActivity(broker.URL)
d.setBrokerStatus(broker.URL, true)
for _, topic := range broker.Topics {
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
zap.L().Warn("Failed to subscribe to topic",
zap.String("topic", topic),
zap.String("broker", broker.URL),
zap.Error(token.Error()))
} else {
zap.L().Info("Subscribed to topic",
zap.String("topic", topic),
zap.String("broker", broker.URL))
}
}
})
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
zap.L().Warn("Connection lost to MQTT broker", zap.String("broker", broker.URL), zap.Error(err))
d.BrokerStatus[broker.URL] = false
d.setBrokerStatus(broker.URL, false)
})
if broker.Username != "" {
@@ -143,33 +207,11 @@ func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, maxRetries int) (mqtt.
return nil, fmt.Errorf("failed to connect to MQTT broker %s after %d attempts: %w", broker.URL, maxRetries+1, token.Error())
}
for _, topic := range broker.Topics {
if token := client.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
zap.L().Warn("Failed to subscribe to topic",
zap.String("topic", topic),
zap.String("broker", broker.URL),
zap.Int("attempt", attempt+1),
zap.Int("maxRetries", maxRetries+1),
zap.Error(token.Error()))
if attempt < maxRetries {
client.Disconnect(250)
delay := time.Duration(float64(baseDelay) * math.Pow(2, float64(attempt)))
if delay > maxDelay {
delay = maxDelay
}
zap.L().Debug("Retrying subscription", zap.String("broker", broker.URL), zap.Duration("delay", delay))
time.Sleep(delay)
continue
}
client.Disconnect(250)
return nil, fmt.Errorf("failed to subscribe to topic %s on broker %s after %d attempts: %w", topic, broker.URL, maxRetries+1, token.Error())
}
}
zap.L().Debug("Successfully connected to MQTT broker and subscribed to topics",
zap.Strings("topics", broker.Topics),
zap.String("broker", broker.URL))
// Subscriptions are established in the OnConnect handler so they are
// re-established on every reconnect, not just on this initial connect.
// Seed the liveness clock so a brand-new connection gets a full staleness
// window before the watchdog can judge it.
d.recordActivity(broker.URL)
return client, nil
}
return nil, fmt.Errorf("unexpected error in connectToBroker for %s", broker.URL)
@@ -182,15 +224,21 @@ func (d *Daemon) ConnectMQTT() error {
zap.L().Debug("Attempting to connect to MQTT brokers", zap.Int("totalBrokers", totalBrokers))
for _, broker := range d.Config.MQTTBrokers {
client, err := d.connectToBroker(broker, maxRetries)
// Pre-size MQTTClients so each slot stays aligned with its broker index even
// when some brokers fail to connect (a failed broker leaves a nil slot that the
// monitor retries). The old append-only approach misaligned indices whenever an
// earlier broker failed.
d.MQTTClients = make([]mqtt.Client, totalBrokers)
for i, broker := range d.Config.MQTTBrokers {
client, err := d.connectToBroker(broker, i, maxRetries)
if err != nil {
zap.L().Warn("Failed to connect to broker", zap.String("broker", broker.URL), zap.Error(err))
d.BrokerStatus[broker.URL] = false
d.setBrokerStatus(broker.URL, false)
continue
}
d.MQTTClients = append(d.MQTTClients, client)
d.BrokerStatus[broker.URL] = true
d.MQTTClients[i] = client
d.setBrokerStatus(broker.URL, true)
successfulConnections++
}
@@ -198,9 +246,10 @@ func (d *Daemon) ConnectMQTT() error {
return fmt.Errorf("failed to connect to any MQTT brokers")
}
zap.L().Debug("Successfully connected to MQTT brokers",
zap.L().Info("Connected to MQTT brokers",
zap.Int("successfulConnections", successfulConnections),
zap.Int("totalBrokers", totalBrokers))
zap.Int("totalBrokers", totalBrokers),
zap.Duration("staleAfter", d.staleAfter))
return nil
}
@@ -248,45 +297,52 @@ func (d *Daemon) MonitorConnections() {
}
}
// checkAndReconnectBrokers is a watchdog, not the primary reconnection path.
// paho's auto-reconnect handles transient drops and the OnConnect handler
// resubscribes, so this only intervenes to (a) (re)establish brokers that have no
// live client and (b) tear down zombie connections — clients that report connected
// but have gone silent past the staleness window (e.g. after an upstream broker
// swap dropped the subscription). Forcing a full rebuild gets a fresh session and
// resubscribe, which is what actually restores delivery.
func (d *Daemon) checkAndReconnectBrokers() {
for i, broker := range d.Config.MQTTBrokers {
if i >= len(d.MQTTClients) {
client, err := d.connectToBroker(broker, 3)
if err != nil {
zap.L().Warn("Background reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err))
continue
}
d.MQTTClients = append(d.MQTTClients, client)
d.BrokerStatus[broker.URL] = true
zap.L().Debug("Successfully reconnected to broker", zap.String("broker", broker.URL))
continue
}
client := d.MQTTClients[i]
if client == nil {
newClient, err := d.connectToBroker(broker, 3)
newClient, err := d.connectToBroker(broker, i, 3)
if err != nil {
zap.L().Warn("Background reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err))
d.BrokerStatus[broker.URL] = false
d.setBrokerStatus(broker.URL, false)
continue
}
d.MQTTClients[i] = newClient
d.BrokerStatus[broker.URL] = true
zap.L().Debug("Successfully reconnected to broker", zap.String("broker", broker.URL))
d.setBrokerStatus(broker.URL, true)
zap.L().Info("Established connection to broker", zap.String("broker", broker.URL))
continue
}
if !client.IsConnected() {
d.BrokerStatus[broker.URL] = false
newClient, err := d.connectToBroker(broker, 3)
if d.isStale(broker.URL) {
zap.L().Warn("No messages received within staleness window; forcing reconnect",
zap.String("broker", broker.URL),
zap.Bool("reportedConnected", client.IsConnected()),
zap.Duration("staleAfter", d.staleAfter))
// Explicitly disconnect so the old client stops auto-reconnecting and
// does not race the replacement under the same client ID.
client.Disconnect(250)
d.setBrokerStatus(broker.URL, false)
newClient, err := d.connectToBroker(broker, i, 3)
if err != nil {
zap.L().Warn("Background reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err))
zap.L().Warn("Forced reconnection failed for broker", zap.String("broker", broker.URL), zap.Error(err))
d.MQTTClients[i] = nil
continue
}
d.MQTTClients[i] = newClient
d.BrokerStatus[broker.URL] = true
zap.L().Debug("Successfully reconnected to broker", zap.String("broker", broker.URL))
} else {
d.BrokerStatus[broker.URL] = true
d.recordActivity(broker.URL)
d.setBrokerStatus(broker.URL, true)
continue
}
d.setBrokerStatus(broker.URL, client.IsConnected())
}
}
@@ -0,0 +1,65 @@
package ingestcommon
import (
"testing"
"time"
)
func TestIsStale(t *testing.T) {
d := NewDaemon(&Config{}, nil)
d.staleAfter = 5 * time.Minute
const url = "wss://broker.example:443"
// A broker we have never heard from is not considered stale: there is no
// activity baseline yet, and connectToBroker seeds one on a successful connect.
if d.isStale(url) {
t.Fatalf("broker with no recorded activity should not be stale")
}
// Fresh activity -> healthy.
d.recordActivity(url)
if d.isStale(url) {
t.Fatalf("broker with recent activity should not be stale")
}
// Activity older than the staleness window -> zombie.
d.mu.Lock()
d.lastActivity[url] = time.Now().Add(-10 * time.Minute)
d.mu.Unlock()
if !d.isStale(url) {
t.Fatalf("broker silent for longer than staleAfter should be stale")
}
// recordActivity clears staleness again.
d.recordActivity(url)
if d.isStale(url) {
t.Fatalf("recordActivity should reset the staleness clock")
}
}
func TestNewDaemonStaleAfterDefault(t *testing.T) {
d := NewDaemon(&Config{}, nil)
if d.staleAfter != 300*time.Second {
t.Fatalf("expected default staleAfter of 300s, got %s", d.staleAfter)
}
if d.lastActivity == nil {
t.Fatalf("lastActivity map should be initialized")
}
}
func TestNewDaemonStaleAfterFromEnv(t *testing.T) {
t.Setenv("MQTT_STALE_AFTER_SECONDS", "42")
d := NewDaemon(&Config{}, nil)
if d.staleAfter != 42*time.Second {
t.Fatalf("expected staleAfter of 42s from env, got %s", d.staleAfter)
}
}
func TestSetBrokerStatus(t *testing.T) {
d := NewDaemon(&Config{}, nil)
d.setBrokerStatus("a", true)
d.setBrokerStatus("b", false)
if !d.BrokerStatus["a"] || d.BrokerStatus["b"] {
t.Fatalf("setBrokerStatus did not record expected values: %#v", d.BrokerStatus)
}
}