From 7cea182c6df346d2d69e64f98d7acfbab2fc1d76 Mon Sep 17 00:00:00 2001 From: Alex Vanderpot <553597+ajvpot@users.noreply.github.com> Date: Mon, 15 Jun 2026 22:42:46 -0400 Subject: [PATCH] ingest: batch ClickHouse inserts to stop MQTT flapping & packet loss (#41) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ingest: batch ClickHouse inserts to stop MQTT flapping & packet loss The meshcore handler did a synchronous per-message ClickHouse insert on paho's single inbound goroutine. At ~86ms/insert (single-row inserts + async_insert wait + materialized views) the goroutine couldn't keep up with the high-volume letsmesh feed, so it stalled past PingTimeout and paho declared "pingresp not received" and reconnected — ~847 cycles in 19.5h, ~45% downtime, ~50% of letsmesh packets lost. The low-volume davekeogh broker never saturated the goroutine and was unaffected. Decouple receipt from insertion: the handler now enqueues decoded rows onto a buffered channel and a single background writer flushes them to meshcore_packets in batched native inserts (every MESHCORE_BATCH_FLUSH_ SECONDS or MESHCORE_BATCH_MAX_ROWS rows). The inbound goroutine never blocks, so PINGRESP is always processed in time. - New batch writer with env-configurable flush interval / max rows / buffer size (MESHCORE_BATCH_* ), wired in docker-compose. - Drop server-side async_insert (redundant once we batch app-side). - Bump PingTimeout 10s -> 20s (env MQTT_PING_TIMEOUT_SECONDS) for margin against Cloudflare WebSocket buffering jitter. - Enqueue is non-blocking; rows are dropped+counted only if the buffer fills (ClickHouse unavailable). A failed batch is dropped and retried by the next flush (native blocks commit atomically). Co-Authored-By: Claude Opus 4.8 (1M context) * ingest: make MQTT KeepAlive configurable (MQTT_KEEPALIVE_SECONDS) As a near-silent subscriber, paho emits a PINGREQ roughly every KeepAlive seconds; lowering it sends client->server frames more often to keep the Cloudflare-proxied WebSocket path warm in both directions, a lever for the residual mid-stream "pingresp not received" stalls on the letsmesh broker. Default unchanged (30s); wired through docker-compose. Co-Authored-By: Claude Opus 4.8 (1M context) * ingest: add configurable MQTT write timeout (MQTT_WRITE_TIMEOUT_SECONDS) Bounds PINGREQ/SUBSCRIBE writes so a stalled write through the Cloudflare WebSocket proxy can't hang the client. Default 0 (paho's existing no-timeout behavior); wired through docker-compose. Recommended ~20s when behind a buffering reverse proxy. Co-Authored-By: Claude Opus 4.8 (1M context) --------- Co-authored-by: Alex Vanderpot Co-authored-by: Claude Opus 4.8 (1M context) --- docker-compose.yml | 7 + ingest/cmd/meshcoreingest/main.go | 206 ++++++++++++++++++++----- ingest/internal/ingestcommon/ingest.go | 25 ++- 3 files changed, 195 insertions(+), 43 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 213f54c..c58dada 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -61,6 +61,13 @@ services: - CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD:-} - MQTT_BROKERS=${MQTT_BROKERS} - MQTT_CLIENT_ID=${MQTT_CLIENT_ID:-meshcore-ingest} + - MQTT_STALE_AFTER_SECONDS=${MQTT_STALE_AFTER_SECONDS:-300} + - MQTT_KEEPALIVE_SECONDS=${MQTT_KEEPALIVE_SECONDS:-30} + - MQTT_PING_TIMEOUT_SECONDS=${MQTT_PING_TIMEOUT_SECONDS:-20} + - MQTT_WRITE_TIMEOUT_SECONDS=${MQTT_WRITE_TIMEOUT_SECONDS:-0} + - MESHCORE_BATCH_FLUSH_SECONDS=${MESHCORE_BATCH_FLUSH_SECONDS:-10} + - MESHCORE_BATCH_MAX_ROWS=${MESHCORE_BATCH_MAX_ROWS:-5000} + - MESHCORE_BATCH_BUFFER=${MESHCORE_BATCH_BUFFER:-50000} depends_on: clickhouse: condition: service_healthy diff --git a/ingest/cmd/meshcoreingest/main.go b/ingest/cmd/meshcoreingest/main.go index 3a2109a..f68ba46 100644 --- a/ingest/cmd/meshcoreingest/main.go +++ b/ingest/cmd/meshcoreingest/main.go @@ -7,16 +7,135 @@ import ( "os" "os/signal" "strings" + "sync/atomic" "syscall" "time" - "github.com/ClickHouse/ch-go/proto" "github.com/ajvpot/meshexplorer/ingest/internal/ingestcommon" mqtt "github.com/eclipse/paho.mqtt.golang" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// meshcorePacketRow is one buffered row destined for the meshcore_packets +// table. The MQTT handler builds these and enqueues them; a single background +// goroutine batches and inserts them. Keeping the per-message work to an +// in-memory enqueue is what stops paho's inbound goroutine from blocking on +// ClickHouse (a slow synchronous insert there starves PINGRESP handling and +// makes the connection flap). +type meshcorePacketRow struct { + ingestTime time.Time + origin string + originPubkey string + broker string + topic string + meshTimestamp time.Time + packet string +} + +// Defaults for the batch writer, overridable via the env vars read in main: +// - MESHCORE_BATCH_FLUSH_SECONDS: flush at least this often (small batches). +// - MESHCORE_BATCH_MAX_ROWS: flush early once a batch reaches this many rows +// so bursts don't sit in memory for the full interval. +// - MESHCORE_BATCH_BUFFER: producer/consumer channel buffer; sized for many +// seconds of headroom at peak so enqueues effectively never drop unless +// ClickHouse is unavailable for an extended period. +const ( + defaultBatchFlushSeconds = 10 + defaultBatchMaxRows = 5000 + defaultBatchBuffer = 50000 +) + +// packetRows carries decoded rows from the MQTT handler to the batch writer. It +// is created in main once the configured buffer size is known. +var packetRows chan meshcorePacketRow + +// droppedRows counts rows dropped because the buffer was full (ClickHouse stuck +// or unreachable). The writer logs and resets it on each flush. +var droppedRows uint64 + +// enqueuePacket hands a row to the batch writer without ever blocking the +// caller. Blocking here would defeat the whole purpose — it runs on paho's +// inbound goroutine. If the buffer is full we drop and count instead. +func enqueuePacket(row meshcorePacketRow) { + select { + case packetRows <- row: + default: + atomic.AddUint64(&droppedRows, 1) + } +} + +// runBatchWriter drains packetRows, accumulating rows and flushing them to +// ClickHouse in a single batched insert whenever the batch reaches maxRows or +// flushInterval elapses. It exits after draining and flushing any remaining +// rows once the daemon context is cancelled. +func runBatchWriter(d *ingestcommon.Daemon, flushInterval time.Duration, maxRows int) { + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + + batch := make([]meshcorePacketRow, 0, maxRows) + flush := func() { + if dropped := atomic.SwapUint64(&droppedRows, 0); dropped > 0 { + zap.L().Warn("Dropped MeshCore packets: insert buffer full", zap.Uint64("dropped", dropped)) + } + if len(batch) == 0 { + return + } + if err := insertPacketBatch(d, batch); err != nil { + zap.L().Warn("Failed to insert MeshCore packet batch into ClickHouse", zap.Int("rows", len(batch)), zap.Error(err)) + } else { + zap.L().Info("Inserted MeshCore packet batch", zap.Int("rows", len(batch))) + } + batch = batch[:0] + } + + for { + select { + case <-d.Ctx.Done(): + // Drain whatever is already buffered before exiting so a clean + // shutdown doesn't lose the last partial batch. + for { + select { + case row := <-packetRows: + batch = append(batch, row) + if len(batch) >= maxRows { + flush() + } + default: + flush() + return + } + } + case row := <-packetRows: + batch = append(batch, row) + if len(batch) >= maxRows { + flush() + } + case <-ticker.C: + flush() + } + } +} + +// insertPacketBatch writes a batch of rows to meshcore_packets in one native +// insert. ClickHouse commits a native block atomically (no per-row partial +// success), so on failure the whole batch is dropped and the next flush simply +// carries on. time.Time is appended directly for the DateTime64 columns; the +// driver scales to each column's precision. +func insertPacketBatch(d *ingestcommon.Daemon, rows []meshcorePacketRow) error { + batch, err := d.CHConn.PrepareBatch(d.Ctx, "INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet)") + if err != nil { + return err + } + for _, r := range rows { + if err := batch.Append(r.ingestTime, r.origin, r.originPubkey, r.broker, r.topic, r.meshTimestamp, r.packet); err != nil { + _ = batch.Abort() + return err + } + } + return batch.Send() +} + func parseMeshCoreRawMessage(payload []byte) (origin string, originPubkey []byte, meshTimestamp time.Time, packet []byte, err error) { type RawPacket struct { Origin string `json:"origin"` @@ -196,8 +315,6 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon // This ensures we store only the base topic in the database, not the full topic path baseTopic := extractBaseTopic(msg.Topic()) - startTime := time.Now() - // Handle meshcore/raw and meshcore/*/raw topics (including multi-level paths like meshcore/salish/a/raw) if len(topicParts) >= 2 && topicParts[len(topicParts)-1] == "raw" { origin, originPubkey, meshTimestamp, decoded, err := parseMeshCoreRawMessage(msg.Payload()) @@ -205,14 +322,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon zap.L().Warn("Failed to parse meshcore/*/raw message", zap.Error(err)) return } - query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)` - ingestTime := time.Now() - err = d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), origin, string(originPubkey), broker, baseTopic, meshTimestamp, string(decoded)) - if err != nil { - zap.L().Warn("Failed to insert MeshCore packet into ClickHouse", zap.Error(err)) - return - } - zap.L().Info("Successfully ingested MeshCore RAW packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.Duration("duration", time.Since(startTime))) + enqueuePacket(meshcorePacketRow{ + ingestTime: time.Now(), + origin: origin, + originPubkey: string(originPubkey), + broker: broker, + topic: baseTopic, + meshTimestamp: meshTimestamp, + packet: string(decoded), + }) + zap.L().Debug("Enqueued MeshCore RAW packet", zap.String("broker", broker), zap.String("topic", baseTopic)) return } @@ -226,14 +345,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon zap.L().Warn("Failed to parse meshcore/*/[gatewayId]/packets message", zap.Error(err)) return } - query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)` - ingestTime := time.Now() - err = d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), origin, string(originPubkey), broker, baseTopic, meshTimestamp, string(decoded)) - if err != nil { - zap.L().Warn("Failed to insert MeshCore packet into ClickHouse", zap.Error(err)) - return - } - zap.L().Info("Successfully ingested MeshCore PACKETS packet (with gateway ID)", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID), zap.Duration("duration", time.Since(startTime))) + enqueuePacket(meshcorePacketRow{ + ingestTime: time.Now(), + origin: origin, + originPubkey: string(originPubkey), + broker: broker, + topic: baseTopic, + meshTimestamp: meshTimestamp, + packet: string(decoded), + }) + zap.L().Debug("Enqueued MeshCore PACKETS packet (with gateway ID)", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID)) return } } @@ -245,14 +366,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon zap.L().Warn("Failed to parse meshcore/*/packets message", zap.Error(err)) return } - query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)` - ingestTime := time.Now() - err = d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), origin, string(originPubkey), broker, baseTopic, meshTimestamp, string(decoded)) - if err != nil { - zap.L().Warn("Failed to insert MeshCore packet into ClickHouse", zap.Error(err)) - return - } - zap.L().Info("Successfully ingested MeshCore PACKETS packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.Duration("duration", time.Since(startTime))) + enqueuePacket(meshcorePacketRow{ + ingestTime: time.Now(), + origin: origin, + originPubkey: string(originPubkey), + broker: broker, + topic: baseTopic, + meshTimestamp: meshTimestamp, + packet: string(decoded), + }) + zap.L().Debug("Enqueued MeshCore PACKETS packet", zap.String("broker", broker), zap.String("topic", baseTopic)) return } @@ -266,14 +389,16 @@ func handleMeshCoreMessage(client mqtt.Client, msg mqtt.Message, d *ingestcommon originPubkeyBytes = []byte{} } - query := `INSERT INTO meshcore_packets (ingest_timestamp, origin, origin_pubkey, broker, topic, mesh_timestamp, packet) VALUES (?, ?, ?, ?, ?, ?, ?)` - ingestTime := time.Now() - err := d.CHConn.Exec(d.Ctx, query, proto.ToDateTime64(ingestTime, proto.PrecisionMilli), gatewayID, string(originPubkeyBytes), broker, baseTopic, time.Time{}, string(msg.Payload())) - if err != nil { - zap.L().Warn("Failed to insert MeshCore binary packet into ClickHouse", zap.Error(err)) - return - } - zap.L().Info("Successfully ingested MeshCore BINARY packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID), zap.Duration("duration", time.Since(startTime))) + enqueuePacket(meshcorePacketRow{ + ingestTime: time.Now(), + origin: gatewayID, + originPubkey: string(originPubkeyBytes), + broker: broker, + topic: baseTopic, + meshTimestamp: time.Time{}, + packet: string(msg.Payload()), + }) + zap.L().Debug("Enqueued MeshCore BINARY packet", zap.String("broker", broker), zap.String("topic", baseTopic), zap.String("gatewayID", gatewayID)) return } } @@ -314,6 +439,17 @@ func main() { log.Fatalf("Failed to connect to ClickHouse: %v", err) } defer daemon.CHConn.Close() + // Start the batch writer before connecting to MQTT so the consumer is + // draining the buffer the moment the first messages arrive. + flushInterval := time.Duration(ingestcommon.GetEnvIntOrDefault("MESHCORE_BATCH_FLUSH_SECONDS", defaultBatchFlushSeconds)) * time.Second + maxRows := ingestcommon.GetEnvIntOrDefault("MESHCORE_BATCH_MAX_ROWS", defaultBatchMaxRows) + bufSize := ingestcommon.GetEnvIntOrDefault("MESHCORE_BATCH_BUFFER", defaultBatchBuffer) + packetRows = make(chan meshcorePacketRow, bufSize) + zap.L().Info("Starting MeshCore batch writer", + zap.Duration("flushInterval", flushInterval), + zap.Int("maxRows", maxRows), + zap.Int("buffer", bufSize)) + go runBatchWriter(daemon, flushInterval, maxRows) if err := daemon.ConnectMQTT(); err != nil { log.Fatalf("Failed to connect to MQTT brokers: %v", err) } diff --git a/ingest/internal/ingestcommon/ingest.go b/ingest/internal/ingestcommon/ingest.go index 6395c81..5c1e40b 100644 --- a/ingest/internal/ingestcommon/ingest.go +++ b/ingest/internal/ingestcommon/ingest.go @@ -146,8 +146,18 @@ func (d *Daemon) connectToBroker(broker MQTTBrokerConfig, idx int, maxRetries in opts.SetConnectRetry(true) opts.SetConnectTimeout(10 * time.Second) opts.SetMaxReconnectInterval(30 * time.Second) - opts.SetKeepAlive(30 * time.Second) - opts.SetPingTimeout(10 * time.Second) + // As a near-silent subscriber we send a PINGREQ roughly every KeepAlive + // seconds; lowering it keeps the Cloudflare WebSocket path warm in the + // client->server direction (a lever for the residual mid-stream stalls). + opts.SetKeepAlive(time.Duration(GetEnvIntOrDefault("MQTT_KEEPALIVE_SECONDS", 30)) * time.Second) + // Allow extra margin for PINGRESP to survive the Cloudflare WebSocket + // proxy's buffering/jitter; 10s was tight and produced false + // "pingresp not received" disconnects. Configurable for tuning. + opts.SetPingTimeout(time.Duration(GetEnvIntOrDefault("MQTT_PING_TIMEOUT_SECONDS", 20)) * time.Second) + // Bound writes (PINGREQ/SUBSCRIBE) so a stalled write through the + // Cloudflare WebSocket proxy can't hang the client. 0 (paho default) + // disables it; set MQTT_WRITE_TIMEOUT_SECONDS to enable. + opts.SetWriteTimeout(time.Duration(GetEnvIntOrDefault("MQTT_WRITE_TIMEOUT_SECONDS", 0)) * time.Second) // Start a fresh session on every (re)connect and resubscribe explicitly in // the OnConnect handler below. This avoids depending on broker-side session // persistence — which is exactly what left the client connected-but- @@ -254,13 +264,12 @@ func (d *Daemon) ConnectMQTT() error { } func (d *Daemon) ConnectClickHouse() error { + // The ingest path now batches rows application-side (see meshcoreingest's + // batch writer), so each insert is already a large native block. Server-side + // async_insert buffering on top of that adds latency without benefit, so it + // is left off (default 0). settings := clickhouse.Settings{ - "max_execution_time": 60, - "async_insert": 1, - "wait_for_async_insert": 1, - "async_insert_busy_timeout_ms": 2500, // 2.5 seconds - flush if buffer is busy for this long - "async_insert_max_data_size": 1048576, // 1MB - flush when buffer reaches this size - "async_insert_max_query_number": 5000, // flush after this many insert queries accumulate + "max_execution_time": 60, } conn, err := clickhouse.Open(&clickhouse.Options{