ingest: batch ClickHouse inserts to stop MQTT flapping & packet loss (#41)

* ingest: batch ClickHouse inserts to stop MQTT flapping & packet loss

The meshcore handler did a synchronous per-message ClickHouse insert on
paho's single inbound goroutine. At ~86ms/insert (single-row inserts +
async_insert wait + materialized views) the goroutine couldn't keep up
with the high-volume letsmesh feed, so it stalled past PingTimeout and
paho declared "pingresp not received" and reconnected — ~847 cycles in
19.5h, ~45% downtime, ~50% of letsmesh packets lost. The low-volume
davekeogh broker never saturated the goroutine and was unaffected.

Decouple receipt from insertion: the handler now enqueues decoded rows
onto a buffered channel and a single background writer flushes them to
meshcore_packets in batched native inserts (every MESHCORE_BATCH_FLUSH_
SECONDS or MESHCORE_BATCH_MAX_ROWS rows). The inbound goroutine never
blocks, so PINGRESP is always processed in time.

- New batch writer with env-configurable flush interval / max rows /
  buffer size (MESHCORE_BATCH_* ), wired in docker-compose.
- Drop server-side async_insert (redundant once we batch app-side).
- Bump PingTimeout 10s -> 20s (env MQTT_PING_TIMEOUT_SECONDS) for margin
  against Cloudflare WebSocket buffering jitter.
- Enqueue is non-blocking; rows are dropped+counted only if the buffer
  fills (ClickHouse unavailable). A failed batch is dropped and retried
  by the next flush (native blocks commit atomically).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* ingest: make MQTT KeepAlive configurable (MQTT_KEEPALIVE_SECONDS)

As a near-silent subscriber, paho emits a PINGREQ roughly every KeepAlive
seconds; lowering it sends client->server frames more often to keep the
Cloudflare-proxied WebSocket path warm in both directions, a lever for the
residual mid-stream "pingresp not received" stalls on the letsmesh broker.
Default unchanged (30s); wired through docker-compose.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* ingest: add configurable MQTT write timeout (MQTT_WRITE_TIMEOUT_SECONDS)

Bounds PINGREQ/SUBSCRIBE writes so a stalled write through the Cloudflare
WebSocket proxy can't hang the client. Default 0 (paho's existing no-timeout
behavior); wired through docker-compose. Recommended ~20s when behind a
buffering reverse proxy.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Alex Vanderpot <alex@Alexs-MacBook-Pro-2.local>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Vanderpot
2026-06-15 22:42:46 -04:00
committed by GitHub
parent 27c94e1aee
commit 7cea182c6d
3 changed files with 195 additions and 43 deletions
+7
View File
@@ -61,6 +61,13 @@ services:
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-}
- MQTT_BROKERS=${MQTT_BROKERS}
- MQTT_CLIENT_ID=${MQTT_CLIENT_ID:-meshcore-ingest}
- MQTT_STALE_AFTER_SECONDS=${MQTT_STALE_AFTER_SECONDS:-300}
- MQTT_KEEPALIVE_SECONDS=${MQTT_KEEPALIVE_SECONDS:-30}
- MQTT_PING_TIMEOUT_SECONDS=${MQTT_PING_TIMEOUT_SECONDS:-20}
- MQTT_WRITE_TIMEOUT_SECONDS=${MQTT_WRITE_TIMEOUT_SECONDS:-0}
- MESHCORE_BATCH_FLUSH_SECONDS=${MESHCORE_BATCH_FLUSH_SECONDS:-10}
- MESHCORE_BATCH_MAX_ROWS=${MESHCORE_BATCH_MAX_ROWS:-5000}
- MESHCORE_BATCH_BUFFER=${MESHCORE_BATCH_BUFFER:-50000}
depends_on:
clickhouse:
condition: service_healthy
+171 -35
View File
@@ -7,16 +7,135 @@ import (
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
"github.com/ClickHouse/ch-go/proto"
"github.com/ajvpot/meshexplorer/ingest/internal/ingestcommon"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// meshcorePacketRow is one buffered row destined for the meshcore_packets
// table. The MQTT handler builds these and enqueues them; a single background
// goroutine batches and inserts them. Keeping the per-message work to an
// in-memory enqueue is what stops paho's inbound goroutine from blocking on
// ClickHouse (a slow synchronous insert there starves PINGRESP handling and
// makes the connection flap).
type meshcorePacketRow struct {
ingestTime time.Time
origin string
originPubkey string
broker string
topic string
meshTimestamp time.Time
packet string
}
// Defaults for the batch writer, overridable via the env vars read in main:
// - MESHCORE_BATCH_FLUSH_SECONDS: flush at least this often (small batches).
// - MESHCORE_BATCH_MAX_ROWS: flush early once a batch reaches this many rows
// so bursts don't sit in memory for the full interval.
// - MESHCORE_BATCH_BUFFER: producer/consumer channel buffer; sized for many
// seconds of headroom at peak so enqueues effectively never drop unless
// ClickHouse is unavailable for an extended period.
const (
defaultBatchFlushSeconds = 10
defaultBatchMaxRows = 5000
defaultBatchBuffer = 50000
)
// packetRows carries decoded rows from the MQTT handler to the batch writer. It
// is created in main once the configured buffer size is known.
var packetRows chan meshcorePacketRow
// droppedRows counts rows dropped because the buffer was full (ClickHouse stuck
// or unreachable). The writer logs and resets it on each flush.
var droppedRows uint64
// enqueuePacket hands a row to the batch writer without ever blocking the
// caller. Blocking here would defeat the whole purpose — it runs on paho's
// inbound goroutine. If the buffer is full we drop and count instead.
func enqueuePacket(row meshcorePacketRow) {
select {
case packetRows <- row:
default:
atomic.AddUint64(&droppedRows, 1)
}
}
// runBatchWriter drains packetRows, accumulating rows and flushing them to
// ClickHouse in a single batched insert whenever the batch reaches maxRows or
// flushInterval elapses. It exits after draining and flushing any remaining
// rows once the daemon context is cancelled.
func runBatchWriter(d *ingestcommon.Daemon, flushInterval time.Duration, maxRows int) {
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
batch := make([]meshcorePacketRow, 0, maxRows)
flush := func() {
if dropped := atomic.SwapUint64(&droppedRows, 0); dropped > 0 {
zap.L().Warn("Dropped MeshCore packets: insert buffer full", zap.Uint64("dropped", dropped))
}
if len(batch) == 0 {
return
}
if err := insertPacketBatch(d, batch); err != nil {
zap.L().Warn("Failed to insert MeshCore packet batch into ClickHouse", zap.Int("rows", len(batch)), zap.Error(err))
} else {
zap.L().Info("Inserted MeshCore packet batch", zap.Int("rows", len(batch)))
}
batch = batch[:0]
}
for {
select {
case <-d.Ctx.Done():
// Drain whatever is already buffered before exiting so a clean
// shutdown doesn't lose the last partial batch.
for {
select {
case row := <-packetRows:
batch = append(batch, row)
if len(batch) >= maxRows {
flush()
}
default:
flush()
return
}
}
case row := <-packetRows:
batch = append(batch, row)
if len(batch) >= maxRows {
flush()
}
case <-ticker.C:
flush()
}
}
}
// insertPacketBatch writes a batch of rows to meshcore_packets in one native
// insert. ClickHouse commits a native block atomically (no per-row partial
// success), so on failure the whole batch is dropped and the next flush simply
// carries on. time.Time is appended directly for the DateTime64 columns; the
// driver scales to each column's precision.
func insertPacketBatch(d *ingestcommon.Daemon, rows []meshcorePacketRow) error {
batch, err := d.CHConn.PrepareBatch(d.Ctx, "INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet)")
if err != nil {
return err
}
for _, r := range rows {
if err := batch.Append(r.ingestTime, r.origin, r.originPubkey, r.broker, r.topic, r.meshTimestamp, r.packet); err != nil {
_ = batch.Abort()
return err
}
}
return batch.Send()
}
func parseMeshCoreRawMessage(payload []byte) (origin string, originPubkey []byte, meshTimestamp time.Time, packet []byte, err error) {
type RawPacket struct {
Origin string `json:"origin"`
@@ -196,8 +315,6 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon
// This ensures we store only the base topic in the database, not the full topic path
baseTopic := extractBaseTopic(msg.Topic())
startTime := time.Now()
// Handle meshcore/raw and meshcore/*/raw topics (including multi-level paths like meshcore/salish/a/raw)
if len(topicParts) >= 2 && topicParts[len(topicParts)-1] == "raw" {
origin, originPubkey, meshTimestamp, decoded, err := parseMeshCoreRawMessage(msg.Payload())
@@ -205,14 +322,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon
zap.L().Warn("Failed to parse meshcore/*/raw message", zap.Error(err))
return
}
query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)`
ingestTime := time.Now()
err = d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), origin, string(originPubkey), broker, baseTopic, meshTimestamp, string(decoded))
if err != nil {
zap.L().Warn("Failed to insert MeshCore packet into ClickHouse", zap.Error(err))
return
}
zap.L().Info("Successfully ingested MeshCore RAW packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.Duration("duration", time.Since(startTime)))
enqueuePacket(meshcorePacketRow{
ingestTime: time.Now(),
origin: origin,
originPubkey: string(originPubkey),
broker: broker,
topic: baseTopic,
meshTimestamp: meshTimestamp,
packet: string(decoded),
})
zap.L().Debug("Enqueued MeshCore RAW packet", zap.String("broker", broker), zap.String("topic", baseTopic))
return
}
@@ -226,14 +345,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon
zap.L().Warn("Failed to parse meshcore/*/[gatewayId]/packets message", zap.Error(err))
return
}
query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)`
ingestTime := time.Now()
err = d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), origin, string(originPubkey), broker, baseTopic, meshTimestamp, string(decoded))
if err != nil {
zap.L().Warn("Failed to insert MeshCore packet into ClickHouse", zap.Error(err))
return
}
zap.L().Info("Successfully ingested MeshCore PACKETS packet (with gateway ID)", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID), zap.Duration("duration", time.Since(startTime)))
enqueuePacket(meshcorePacketRow{
ingestTime: time.Now(),
origin: origin,
originPubkey: string(originPubkey),
broker: broker,
topic: baseTopic,
meshTimestamp: meshTimestamp,
packet: string(decoded),
})
zap.L().Debug("Enqueued MeshCore PACKETS packet (with gateway ID)", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID))
return
}
}
@@ -245,14 +366,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon
zap.L().Warn("Failed to parse meshcore/*/packets message", zap.Error(err))
return
}
query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)`
ingestTime := time.Now()
err = d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), origin, string(originPubkey), broker, baseTopic, meshTimestamp, string(decoded))
if err != nil {
zap.L().Warn("Failed to insert MeshCore packet into ClickHouse", zap.Error(err))
return
}
zap.L().Info("Successfully ingested MeshCore PACKETS packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.Duration("duration", time.Since(startTime)))
enqueuePacket(meshcorePacketRow{
ingestTime: time.Now(),
origin: origin,
originPubkey: string(originPubkey),
broker: broker,
topic: baseTopic,
meshTimestamp: meshTimestamp,
packet: string(decoded),
})
zap.L().Debug("Enqueued MeshCore PACKETS packet", zap.String("broker", broker), zap.String("topic", baseTopic))
return
}
@@ -266,14 +389,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon
originPubkeyBytes = []byte{}
}
query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)`
ingestTime := time.Now()
err := d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), gatewayID, string(originPubkeyBytes), broker, baseTopic, time.Time{}, string(msg.Payload()))
if err != nil {
zap.L().Warn("Failed to insert MeshCore binary packet into ClickHouse", zap.Error(err))
return
}
zap.L().Info("Successfully ingested MeshCore BINARY packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID), zap.Duration("duration", time.Since(startTime)))
enqueuePacket(meshcorePacketRow{
ingestTime: time.Now(),
origin: gatewayID,
originPubkey: string(originPubkeyBytes),
broker: broker,
topic: baseTopic,
meshTimestamp: time.Time{},
packet: string(msg.Payload()),
})
zap.L().Debug("Enqueued MeshCore BINARY packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID))
return
}
}
@@ -314,6 +439,17 @@ func main() {
log.Fatalf("Failed to connect to ClickHouse: %v", err)
}
defer daemon.CHConn.Close()
// Start the batch writer before connecting to MQTT so the consumer is
// draining the buffer the moment the first messages arrive.
flushInterval := time.Duration(ingestcommon.GetEnvIntOrDefault("MESHCORE_BATCH_FLUSH_SECONDS", defaultBatchFlushSeconds)) * time.Second
maxRows := ingestcommon.GetEnvIntOrDefault("MESHCORE_BATCH_MAX_ROWS", defaultBatchMaxRows)
bufSize := ingestcommon.GetEnvIntOrDefault("MESHCORE_BATCH_BUFFER", defaultBatchBuffer)
packetRows = make(chan meshcorePacketRow, bufSize)
zap.L().Info("Starting MeshCore batch writer",
zap.Duration("flushInterval", flushInterval),
zap.Int("maxRows", maxRows),
zap.Int("buffer", bufSize))
go runBatchWriter(daemon, flushInterval, maxRows)
if err := daemon.ConnectMQTT(); err != nil {
log.Fatalf("Failed to connect to MQTT brokers: %v", err)
}
+17 -8
View File
@@ -146,8 +146,18 @@ func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, idx int, maxRetries in
opts.SetConnectRetry(true)
opts.SetConnectTimeout(10 * time.Second)
opts.SetMaxReconnectInterval(30 * time.Second)
opts.SetKeepAlive(30 * time.Second)
opts.SetPingTimeout(10 * time.Second)
// As a near-silent subscriber we send a PINGREQ roughly every KeepAlive
// seconds; lowering it keeps the Cloudflare WebSocket path warm in the
// client->server direction (a lever for the residual mid-stream stalls).
opts.SetKeepAlive(time.Duration(GetEnvIntOrDefault("MQTT_KEEPALIVE_SECONDS", 30)) * time.Second)
// Allow extra margin for PINGRESP to survive the Cloudflare WebSocket
// proxy's buffering/jitter; 10s was tight and produced false
// "pingresp not received" disconnects. Configurable for tuning.
opts.SetPingTimeout(time.Duration(GetEnvIntOrDefault("MQTT_PING_TIMEOUT_SECONDS", 20)) * time.Second)
// Bound writes (PINGREQ/SUBSCRIBE) so a stalled write through the
// Cloudflare WebSocket proxy can't hang the client. 0 (paho default)
// disables it; set MQTT_WRITE_TIMEOUT_SECONDS to enable.
opts.SetWriteTimeout(time.Duration(GetEnvIntOrDefault("MQTT_WRITE_TIMEOUT_SECONDS", 0)) * time.Second)
// Start a fresh session on every (re)connect and resubscribe explicitly in
// the OnConnect handler below. This avoids depending on broker-side session
// persistence — which is exactly what left the client connected-but-
@@ -254,13 +264,12 @@ func (d *Daemon) ConnectMQTT() error {
}
func (d *Daemon) ConnectClickHouse() error {
// The ingest path now batches rows application-side (see meshcoreingest's
// batch writer), so each insert is already a large native block. Server-side
// async_insert buffering on top of that adds latency without benefit, so it
// is left off (default 0).
settings := clickhouse.Settings{
"max_execution_time": 60,
"async_insert": 1,
"wait_for_async_insert": 1,
"async_insert_busy_timeout_ms": 2500, // 2.5 seconds - flush if buffer is busy for this long
"async_insert_max_data_size": 1048576, // 1MB - flush when buffer reaches this size
"async_insert_max_query_number": 5000, // flush after this many insert queries accumulate
"max_execution_time": 60,
}
conn, err := clickhouse.Open(&clickhouse.Options{