Files
meshexplorer/ingest
Alex Vanderpot 7cea182c6d ingest: batch ClickHouse inserts to stop MQTT flapping & packet loss (#41)
* ingest: batch ClickHouse inserts to stop MQTT flapping & packet loss

The meshcore handler did a synchronous per-message ClickHouse insert on
paho's single inbound goroutine. At ~86ms/insert (single-row inserts +
async_insert wait + materialized views) the goroutine couldn't keep up
with the high-volume letsmesh feed, so it stalled past PingTimeout and
paho declared "pingresp not received" and reconnected — ~847 cycles in
19.5h, ~45% downtime, ~50% of letsmesh packets lost. The low-volume
davekeogh broker never saturated the goroutine and was unaffected.

Decouple receipt from insertion: the handler now enqueues decoded rows
onto a buffered channel and a single background writer flushes them to
meshcore_packets in batched native inserts (every MESHCORE_BATCH_FLUSH_
SECONDS or MESHCORE_BATCH_MAX_ROWS rows). The inbound goroutine never
blocks, so PINGRESP is always processed in time.

- New batch writer with env-configurable flush interval / max rows /
  buffer size (MESHCORE_BATCH_* ), wired in docker-compose.
- Drop server-side async_insert (redundant once we batch app-side).
- Bump PingTimeout 10s -> 20s (env MQTT_PING_TIMEOUT_SECONDS) for margin
  against Cloudflare WebSocket buffering jitter.
- Enqueue is non-blocking; rows are dropped+counted only if the buffer
  fills (ClickHouse unavailable). A failed batch is dropped and retried
  by the next flush (native blocks commit atomically).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* ingest: make MQTT KeepAlive configurable (MQTT_KEEPALIVE_SECONDS)

As a near-silent subscriber, paho emits a PINGREQ roughly every KeepAlive
seconds; lowering it sends client->server frames more often to keep the
Cloudflare-proxied WebSocket path warm in both directions, a lever for the
residual mid-stream "pingresp not received" stalls on the letsmesh broker.
Default unchanged (30s); wired through docker-compose.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

* ingest: add configurable MQTT write timeout (MQTT_WRITE_TIMEOUT_SECONDS)

Bounds PINGREQ/SUBSCRIBE writes so a stalled write through the Cloudflare
WebSocket proxy can't hang the client. Default 0 (paho's existing no-timeout
behavior); wired through docker-compose. Recommended ~20s when behind a
buffering reverse proxy.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Alex Vanderpot <alex@Alexs-MacBook-Pro-2.local>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 22:42:46 -04:00
..
2026-05-29 01:01:33 -04:00

MeshCore Ingest

A Go service that ingests MeshCore MQTT messages into ClickHouse, plus the ClickHouse image and SQL migrations for the schema.

This directory is normally run as part of the full stack via the root docker compose. The notes below cover running and developing it on its own.

Components

  • cmd/meshcoreingest — the ingest daemon. Subscribes to MeshCore MQTT topics and writes raw packets into the meshcore_packets table.
  • internal/ingestcommon — shared MQTT + ClickHouse connection/daemon logic.
  • internal/migrate — a goose based migration runner (ClickHouse dialect).
  • migrations/ — the ClickHouse schema: the meshcore_packets table, the decoded meshcore_adverts / meshcore_adverts_latest / meshcore_public_channel_messages views, and the unified_latest_nodeinfo view consumed by the web app.
  • clickhouse/ — a thin ClickHouse server image plus the read-only user used by the web app.

Configuration

All configuration is via environment variables (no credentials are baked into the source):

Variable Description
MQTT_BROKERS JSON array of brokers: [{"url","username","password","topics"}]. topics defaults to ["meshcore/#"]. Required; the daemon exits if unset.
MQTT_CLIENT_ID MQTT client id prefix (default meshcore-ingest).
CLICKHOUSE_HOST / CLICKHOUSE_PORT ClickHouse address (native protocol, default 127.0.0.1:9000).
CLICKHOUSE_DB / CLICKHOUSE_USER / CLICKHOUSE_PASSWORD ClickHouse database and read/write credentials.

Building

go build ./...
go test ./...

Running migrations

go run ./internal/migrate \
  -host localhost -port 9000 \
  -username default -password "$CLICKHOUSE_PASSWORD" \
  -path migrations -action up

Actions: up, down, reset, status, version.

Running the ingest daemon

export MQTT_BROKERS='[{"url":"tcp://mqtt.example.com:1883","username":"u","password":"p","topics":["meshcore/#"]}]'
export CLICKHOUSE_HOST=localhost CLICKHOUSE_PORT=9000 CLICKHOUSE_PASSWORD=...
go run ./cmd/meshcoreingest