clickhouse: speed up node search & chat region filter, fix stats dashboards (#45)

Node search now scans/merges meshcore_adverts_latest once and fans each row
out to matching query branches via arrayJoin + a window function, instead of
one UNION ALL branch per query each re-merging the full AggregatingMergeTree.

Chat message queries push the region filter onto the raw table's stored
`region` column before the GROUP BY (instead of a post-aggregation arrayExists
over origin_path_info), eliminating the multi-second full-table scans.

Grafana: rewrite the nodes-over-time panel to drop the rolling self-join, and
fix the messages-by-channel-hash panel which referenced a nonexistent
`regions` column (ClickHouse error code 47).

Co-authored-by: Alex Vanderpot <alex@Alexs-MacBook-Pro-2.local>
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Vanderpot
2026-06-19 01:23:23 -04:00
committed by GitHub
parent c7bfce1268
commit 2a0e238e06
4 changed files with 125 additions and 81 deletions
@@ -446,7 +446,7 @@
},
"pluginVersion": "4.10.2",
"queryType": "timeseries",
"rawSql": "WITH all_nodes AS (\n SELECT \n $__timeInterval(ingest_timestamp) AS time_bucket, \n public_key, \n latitude, \n longitude, \n is_repeater, \n is_room_server\n FROM meshcore_adverts\n WHERE ingest_timestamp >= ($__fromTime - INTERVAL 7 DAY) \n AND ingest_timestamp <= $__toTime AND ('$region' = '__ALL__' OR region = '$region') AND ('$region_group' = '__ALL__' OR region IN (SELECT region_code FROM region_groups WHERE group_code = '$region_group'))\n),\nfiltered_time_buckets AS (\n SELECT DISTINCT time_bucket \n FROM all_nodes\n WHERE time_bucket >= $__timeInterval($__fromTime)\n AND time_bucket <= $__timeInterval($__toTime)\n ORDER BY time_bucket ASC\n),\nrolling_window AS (\n SELECT \n tb.time_bucket as time,\n n.public_key,\n n.latitude,\n n.longitude,\n n.is_repeater,\n n.is_room_server\n FROM filtered_time_buckets tb\n INNER JOIN all_nodes n ON n.time_bucket BETWEEN (tb.time_bucket - INTERVAL 7 DAY) AND tb.time_bucket\n)\nSELECT \n time,\n count(DISTINCT public_key) AS cumulative_unique_nodes,\n count(DISTINCT CASE WHEN latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0 THEN public_key END) AS nodes_with_location,\n count(DISTINCT CASE WHEN latitude IS NULL OR longitude IS NULL OR latitude = 0 OR longitude = 0 THEN public_key END) AS nodes_without_location,\n count(DISTINCT CASE WHEN is_repeater = 1 THEN public_key END) AS repeaters,\n count(DISTINCT CASE WHEN is_room_server = 1 THEN public_key END) AS room_servers\nFROM rolling_window\nGROUP BY time\nORDER BY time ASC",
"rawSql": "-- Self-join-free rolling 7-day window. The original used a variable\n-- $__timeInterval bucket with an INTERVAL 7 DAY self-join; the inversion\n-- needs a FIXED bucket to fan out over, so we pin the bucket to daily\n-- (toStartOfDay) and fan out range(0, 7) days to match the 7-day window.\nWITH node_days AS (\n SELECT \n toStartOfDay(ingest_timestamp) AS day, \n public_key,\n max(latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0) AS has_location,\n max(latitude IS NULL OR longitude IS NULL OR latitude = 0 OR longitude = 0) AS no_location,\n max(is_repeater = 1) AS is_repeater,\n max(is_room_server = 1) AS is_room_server\n FROM meshcore_adverts\n WHERE ingest_timestamp >= ($__fromTime - INTERVAL 7 DAY) \n AND ingest_timestamp <= $__toTime AND ('$region' = '__ALL__' OR region = '$region') AND ('$region_group' = '__ALL__' OR region IN (SELECT region_code FROM region_groups WHERE group_code = '$region_group'))\n GROUP BY day, public_key\n),\nall_days AS (\n SELECT DISTINCT day \n FROM node_days\n WHERE day >= toStartOfDay($__fromTime)\n AND day <= toStartOfDay($__toTime)\n),\ncovered AS (\n SELECT \n arrayJoin(arrayMap(i -> nd.day + toIntervalDay(i), range(0, 7))) AS time,\n nd.public_key AS public_key,\n nd.has_location,\n nd.no_location,\n nd.is_repeater,\n nd.is_room_server\n FROM node_days nd\n)\nSELECT \n time,\n uniqExact(public_key) AS cumulative_unique_nodes,\n uniqExactIf(public_key, has_location) AS nodes_with_location,\n uniqExactIf(public_key, no_location) AS nodes_without_location,\n uniqExactIf(public_key, is_repeater) AS repeaters,\n uniqExactIf(public_key, is_room_server) AS room_servers\nFROM covered\nWHERE time IN (SELECT day FROM all_days)\nGROUP BY time\nORDER BY time ASC",
"refId": "A"
}
],
@@ -1116,7 +1116,7 @@
},
"pluginVersion": "4.10.2",
"queryType": "timeseries",
"rawSql": "SELECT \n $__timeInterval(ingest_timestamp) as time,\n channel_hash,\n COUNT(DISTINCT message_id) as count\nFROM meshcore_public_channel_messages \nWHERE $__timeFilter(ingest_timestamp) AND ('$region' = '__ALL__' OR has(regions, '$region')) AND ('$region_group' = '__ALL__' OR hasAny(regions, (SELECT groupArray(region_code) FROM region_groups WHERE group_code = '$region_group')))\n AND channel_hash != ''\nGROUP BY time, channel_hash\nORDER BY time ASC",
"rawSql": "SELECT \n $__timeInterval(ingest_timestamp) as time,\n channel_hash,\n COUNT(DISTINCT message_id) as count\nFROM meshcore_public_channel_messages_raw \nWHERE $__timeFilter(ingest_timestamp) AND ('$region' = '__ALL__' OR region = '$region') AND ('$region_group' = '__ALL__' OR region IN (SELECT region_code FROM region_groups WHERE group_code = '$region_group'))\n AND channel_hash != ''\nGROUP BY time, channel_hash\nORDER BY time ASC",
"refId": "A"
}
],
+113 -69
View File
@@ -1,6 +1,6 @@
"use server";
import { clickhouse } from "./clickhouse";
import { generateRegionWhereClauseFromArray, generateRegionWhereClause } from "@/lib/regionFilters";
import { generateRegionWhereClause } from "@/lib/regionFilters";
import { regionFromTopic, normalizeRegion, groupCodeOf } from "@/lib/regions";
import { publicChannelMessagesSubquery } from "./chatMessages";
import { isHiddenNodeName, visibleNodeSqlClause } from "@/lib/node-privacy";
@@ -65,9 +65,6 @@ export async function getLatestChatMessages({ limit = 20, before, after, channel
// ingest_timestamp primary key / partition pruning applies. See
// publicChannelMessagesSubquery for why this avoids a full-history scan.
const innerWhere: string[] = [];
// Filters that must run after aggregation (origin_path_info only exists on
// the grouped output).
const outerWhere: string[] = [];
const params: Record<string, any> = { limit };
// Inner-scan columns must be table-qualified: unqualified, the analyzer binds
@@ -87,14 +84,16 @@ export async function getLatestChatMessages({ limit = 20, before, after, channel
params.channelId = channelId;
}
// Region filtering keys off origin_path_info, which only exists post-group.
const regionFilter = generateRegionWhereClauseFromArray(region);
// Region filter pushes onto the raw table's stored `region` column (derived identically to
// regionSql) BEFORE the GROUP BY, so unmatched rows are pruned at the scan instead of via a
// post-aggregation arrayExists over origin_path_info. Table-qualify the column so it binds to
// the scan, not the aggregate output. '__ALL__'/unset/unknown -> no filter (empty clause).
const regionFilter = generateRegionWhereClause(region, "meshcore_public_channel_messages_raw");
if (regionFilter.whereClause) {
outerWhere.push(regionFilter.whereClause);
innerWhere.push(regionFilter.whereClause);
}
const whereClause = outerWhere.length > 0 ? `WHERE ${outerWhere.join(' AND ')}` : '';
const query = `SELECT ingest_timestamp, mesh_timestamp, channel_hash, mac, hex(encrypted_message) AS encrypted_message, message_count, origin_path_info, message_id FROM ${publicChannelMessagesSubquery(innerWhere)} ${whereClause} ORDER BY ingest_timestamp DESC LIMIT {limit:UInt32}`;
const query = `SELECT ingest_timestamp, mesh_timestamp, channel_hash, mac, hex(encrypted_message) AS encrypted_message, message_count, origin_path_info, message_id FROM ${publicChannelMessagesSubquery(innerWhere)} ORDER BY ingest_timestamp DESC LIMIT {limit:UInt32}`;
const resultSet = await clickhouse.query({ query, query_params: params, format: 'JSONEachRow' });
const rows = await resultSet.json();
return rows as Array<{
@@ -474,10 +473,18 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer
return [];
}
// Build individual query parts
const queryParts: string[] = [];
// Build a per-query branch condition plus its limit. Instead of emitting one
// UNION ALL branch per query (each of which re-merges the entire
// meshcore_adverts_latest_state AggregatingMergeTree), we scan/merge the
// meshcore_adverts_latest view exactly ONCE and fan each surviving row out to
// every branch it matches via arrayJoin, then rank per branch with a window
// function. Per-query branch predicates are heterogeneous (name vs public_key,
// exact vs prefix/substring, plus independent region/lastSeen/is_repeater
// filters), so each branch keeps its own boolean expression.
const branchConditions: string[] = [];
const branchLimits: number[] = [];
const allParams: Record<string, any> = {};
queries.forEach((searchQuery, index) => {
const {
query: searchString,
@@ -487,15 +494,15 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer
exact = false,
is_repeater
} = searchQuery;
// Node privacy: hidden nodes are unsearchable.
const where: string[] = [visibleNodeSqlClause("node_name")];
const queryParams: Record<string, any> = {};
// Add search conditions
if (searchString && searchString.trim()) {
const trimmedQuery = searchString.trim();
// Check if it looks like a public key (hex string)
if (/^[0-9A-Fa-f]+$/.test(trimmedQuery)) {
if (exact) {
@@ -519,67 +526,103 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer
}
}
}
// Add lastSeen filter if provided
if (lastSeen !== null && lastSeen !== undefined && lastSeen !== "") {
where.push(`last_seen >= now() - INTERVAL {lastSeen_${index}:UInt32} SECOND`);
queryParams[`lastSeen_${index}`] = Number(lastSeen);
}
// Add region filtering if specified
const regionFilter = generateRegionWhereClause(region);
if (regionFilter.whereClause) {
where.push(regionFilter.whereClause);
}
// Add is_repeater filter if specified
if (is_repeater !== undefined) {
where.push(`is_repeater = {isRepeater_${index}:UInt8}`);
queryParams[`isRepeater_${index}`] = is_repeater ? 1 : 0;
}
const whereClause = where.length > 0 ? `WHERE ${where.join(' AND ')}` : '';
// Reads the live, state-backed meshcore_adverts_latest view (incremental MV over
// meshcore_adverts_latest_state) instead of re-aggregating meshcore_adverts on every
// request. All filter columns (public_key, node_name, last_seen, region, is_repeater)
// exist on the view, so the WHERE pushes straight onto it.
const queryPart = `
SELECT
public_key,
node_name,
latitude,
longitude,
has_location,
is_repeater,
is_chat_node,
is_room_server,
has_name,
first_heard,
last_seen,
broker,
topic,
${index} as query_index
FROM meshcore_adverts_latest
${whereClause}
ORDER BY last_seen DESC
LIMIT {limit_${index}:UInt32}
`;
queryParts.push(queryPart);
queryParams[`limit_${index}`] = limit;
// A branch with no predicates matches every row (preserves the previous
// behavior where an empty WHERE returned the whole table for that query).
branchConditions.push(where.length > 0 ? `(${where.join(' AND ')})` : '1');
branchLimits.push(limit);
// Add query params to the global params object
Object.assign(allParams, queryParams);
});
// Combine all queries with UNION ALL
const finalQuery = queryParts.join(' UNION ALL ');
const resultSet = await clickhouse.query({
query: finalQuery,
query_params: allParams,
format: 'JSONEachRow'
// The single scan only needs rows matching at least one branch.
const combinedFilter = branchConditions.some((c) => c === '1')
? '1'
: branchConditions.join(' OR ');
// Per-branch match flags drive the arrayJoin fan-out: each row is emitted once
// per branch it satisfies (query_index = that branch; -1 for non-matching
// branches, filtered out below). We then rank within each branch by last_seen
// and keep up to the largest requested limit (exact per-branch limits are
// applied in TS below, since limits can differ per branch).
const matchFlags = branchConditions
.map((cond, index) => `if(${cond}, ${index}, -1)`)
.join(', ');
const maxLimit = branchLimits.length > 0 ? Math.max(...branchLimits) : 0;
// Reads the live, state-backed meshcore_adverts_latest view (incremental MV over
// meshcore_adverts_latest_state) once. All filter columns (public_key, node_name,
// last_seen, region, is_repeater) exist on the view, so the WHERE pushes straight
// onto it. The state table is merged a single time regardless of branch count.
const finalQuery = `
SELECT
public_key,
node_name,
latitude,
longitude,
has_location,
is_repeater,
is_chat_node,
is_room_server,
has_name,
first_heard,
last_seen,
broker,
topic,
query_index
FROM (
SELECT
*,
row_number() OVER (PARTITION BY query_index ORDER BY last_seen DESC) AS rn
FROM (
SELECT
public_key,
node_name,
latitude,
longitude,
has_location,
is_repeater,
is_chat_node,
is_room_server,
has_name,
first_heard,
last_seen,
broker,
topic,
arrayJoin([${matchFlags}]) AS query_index
FROM meshcore_adverts_latest
WHERE ${combinedFilter}
)
WHERE query_index >= 0
)
WHERE rn <= {maxLimit:UInt32}
ORDER BY query_index ASC, last_seen DESC
`;
allParams['maxLimit'] = maxLimit;
const resultSet = await clickhouse.query({
query: finalQuery,
query_params: allParams,
format: 'JSONEachRow'
});
const rows = await resultSet.json();
@@ -600,25 +643,26 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer
query_index?: number;
};
// If single query, return results without query_index
if (!Array.isArray(searchParams)) {
return (rows as SearchResult[]).map(row => {
const { query_index, ...result } = row;
return result;
});
}
// For batch queries, group results by query_index
// Group results by query_index. Rows arrive ordered by (query_index, last_seen
// DESC) and pre-trimmed to at most maxLimit per branch by the window function;
// we slice to each branch's exact limit here since branch limits can differ.
const groupedResults = (rows as SearchResult[]).reduce((acc, row) => {
const index = row.query_index || 0;
if (!acc[index]) {
acc[index] = [];
}
const { query_index, ...result } = row;
acc[index].push(result);
if (acc[index].length < branchLimits[index]) {
acc[index].push(result);
}
return acc;
}, {} as Record<number, SearchResult[]>);
// If single query, return its results without query_index
if (!Array.isArray(searchParams)) {
return groupedResults[0] || [];
}
// Return results in the same order as input queries
return queries.map((_, index) => groupedResults[index] || []);
} catch (error) {
+8 -8
View File
@@ -1,5 +1,5 @@
import { clickhouse } from './clickhouse';
import { generateRegionConditionForStreaming, generateRegionArrayConditionForStreaming } from '../regionFilters';
import { generateRegionConditionForStreaming } from '../regionFilters';
import { publicChannelMessagesSubquery } from './chatMessages';
/**
@@ -254,13 +254,14 @@ export function createChatMessagesStreamerConfig(
innerConditions.push('meshcore_public_channel_messages_raw.channel_hash = {channelId:String}');
}
// Region filtering keys off origin_path_info, which only exists post-group, so it stays as an
// outer predicate spliced in by the streamer.
let additionalWhereClause = '';
// Region filter pushes onto the raw table's stored `region` column (derived identically to
// regionSql) BEFORE the GROUP BY, so unmatched rows are pruned at the scan instead of via a
// post-aggregation arrayExists over origin_path_info. Table-qualify the column so it binds to
// the scan, not the aggregate output. Unset/unknown selector -> no filter (empty clause).
if (region) {
const regionClause = generateRegionArrayConditionForStreaming(region);
const regionClause = generateRegionConditionForStreaming(region, 'meshcore_public_channel_messages_raw');
if (regionClause) {
additionalWhereClause = regionClause;
innerConditions.push(regionClause);
}
}
@@ -282,8 +283,7 @@ export function createChatMessagesStreamerConfig(
`,
timeColumn: 'ingest_timestamp',
pollInterval: 250,
maxRowsPerPoll: 50,
additionalWhereClause: additionalWhereClause || undefined
maxRowsPerPoll: 50
};
}
+2 -2
View File
@@ -19,8 +19,8 @@ export function generateRegionWhereClauseFromArray(region?: string) {
}
/** Region condition string for streaming queries (column-based). '' when no/unknown selector. */
export function generateRegionConditionForStreaming(region?: string): string {
return generateRegionCondition(region);
export function generateRegionConditionForStreaming(region?: string, tableAlias: string = ""): string {
return generateRegionCondition(region, tableAlias);
}
/** Region condition string for streaming queries over origin_path_info. '' when no/unknown selector. */