diff --git a/grafana/provisioning/dashboards/json/meshcore.json b/grafana/provisioning/dashboards/json/meshcore.json index 4837b21..a55b5a8 100644 --- a/grafana/provisioning/dashboards/json/meshcore.json +++ b/grafana/provisioning/dashboards/json/meshcore.json @@ -446,7 +446,7 @@ }, "pluginVersion": "4.10.2", "queryType": "timeseries", - "rawSql": "WITH all_nodes AS (\n SELECT \n $__timeInterval(ingest_timestamp) AS time_bucket, \n public_key, \n latitude, \n longitude, \n is_repeater, \n is_room_server\n FROM meshcore_adverts\n WHERE ingest_timestamp >= ($__fromTime - INTERVAL 7 DAY) \n AND ingest_timestamp <= $__toTime AND ('$region' = '__ALL__' OR region = '$region') AND ('$region_group' = '__ALL__' OR region IN (SELECT region_code FROM region_groups WHERE group_code = '$region_group'))\n),\nfiltered_time_buckets AS (\n SELECT DISTINCT time_bucket \n FROM all_nodes\n WHERE time_bucket >= $__timeInterval($__fromTime)\n AND time_bucket <= $__timeInterval($__toTime)\n ORDER BY time_bucket ASC\n),\nrolling_window AS (\n SELECT \n tb.time_bucket as time,\n n.public_key,\n n.latitude,\n n.longitude,\n n.is_repeater,\n n.is_room_server\n FROM filtered_time_buckets tb\n INNER JOIN all_nodes n ON n.time_bucket BETWEEN (tb.time_bucket - INTERVAL 7 DAY) AND tb.time_bucket\n)\nSELECT \n time,\n count(DISTINCT public_key) AS cumulative_unique_nodes,\n count(DISTINCT CASE WHEN latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0 THEN public_key END) AS nodes_with_location,\n count(DISTINCT CASE WHEN latitude IS NULL OR longitude IS NULL OR latitude = 0 OR longitude = 0 THEN public_key END) AS nodes_without_location,\n count(DISTINCT CASE WHEN is_repeater = 1 THEN public_key END) AS repeaters,\n count(DISTINCT CASE WHEN is_room_server = 1 THEN public_key END) AS room_servers\nFROM rolling_window\nGROUP BY time\nORDER BY time ASC", + "rawSql": "-- Self-join-free rolling 7-day window. The original used a variable\n-- $__timeInterval bucket with an INTERVAL 7 DAY self-join; the inversion\n-- needs a FIXED bucket to fan out over, so we pin the bucket to daily\n-- (toStartOfDay) and fan out range(0, 7) days to match the 7-day window.\nWITH node_days AS (\n SELECT \n toStartOfDay(ingest_timestamp) AS day, \n public_key,\n max(latitude IS NOT NULL AND longitude IS NOT NULL AND latitude != 0 AND longitude != 0) AS has_location,\n max(latitude IS NULL OR longitude IS NULL OR latitude = 0 OR longitude = 0) AS no_location,\n max(is_repeater = 1) AS is_repeater,\n max(is_room_server = 1) AS is_room_server\n FROM meshcore_adverts\n WHERE ingest_timestamp >= ($__fromTime - INTERVAL 7 DAY) \n AND ingest_timestamp <= $__toTime AND ('$region' = '__ALL__' OR region = '$region') AND ('$region_group' = '__ALL__' OR region IN (SELECT region_code FROM region_groups WHERE group_code = '$region_group'))\n GROUP BY day, public_key\n),\nall_days AS (\n SELECT DISTINCT day \n FROM node_days\n WHERE day >= toStartOfDay($__fromTime)\n AND day <= toStartOfDay($__toTime)\n),\ncovered AS (\n SELECT \n arrayJoin(arrayMap(i -> nd.day + toIntervalDay(i), range(0, 7))) AS time,\n nd.public_key AS public_key,\n nd.has_location,\n nd.no_location,\n nd.is_repeater,\n nd.is_room_server\n FROM node_days nd\n)\nSELECT \n time,\n uniqExact(public_key) AS cumulative_unique_nodes,\n uniqExactIf(public_key, has_location) AS nodes_with_location,\n uniqExactIf(public_key, no_location) AS nodes_without_location,\n uniqExactIf(public_key, is_repeater) AS repeaters,\n uniqExactIf(public_key, is_room_server) AS room_servers\nFROM covered\nWHERE time IN (SELECT day FROM all_days)\nGROUP BY time\nORDER BY time ASC", "refId": "A" } ], @@ -1116,7 +1116,7 @@ }, "pluginVersion": "4.10.2", "queryType": "timeseries", - "rawSql": "SELECT \n $__timeInterval(ingest_timestamp) as time,\n channel_hash,\n COUNT(DISTINCT message_id) as count\nFROM meshcore_public_channel_messages \nWHERE $__timeFilter(ingest_timestamp) AND ('$region' = '__ALL__' OR has(regions, '$region')) AND ('$region_group' = '__ALL__' OR hasAny(regions, (SELECT groupArray(region_code) FROM region_groups WHERE group_code = '$region_group')))\n AND channel_hash != ''\nGROUP BY time, channel_hash\nORDER BY time ASC", + "rawSql": "SELECT \n $__timeInterval(ingest_timestamp) as time,\n channel_hash,\n COUNT(DISTINCT message_id) as count\nFROM meshcore_public_channel_messages_raw \nWHERE $__timeFilter(ingest_timestamp) AND ('$region' = '__ALL__' OR region = '$region') AND ('$region_group' = '__ALL__' OR region IN (SELECT region_code FROM region_groups WHERE group_code = '$region_group'))\n AND channel_hash != ''\nGROUP BY time, channel_hash\nORDER BY time ASC", "refId": "A" } ], diff --git a/meshexplorer/src/lib/clickhouse/actions.ts b/meshexplorer/src/lib/clickhouse/actions.ts index 2bdbb12..2c257d5 100644 --- a/meshexplorer/src/lib/clickhouse/actions.ts +++ b/meshexplorer/src/lib/clickhouse/actions.ts @@ -1,6 +1,6 @@ "use server"; import { clickhouse } from "./clickhouse"; -import { generateRegionWhereClauseFromArray, generateRegionWhereClause } from "@/lib/regionFilters"; +import { generateRegionWhereClause } from "@/lib/regionFilters"; import { regionFromTopic, normalizeRegion, groupCodeOf } from "@/lib/regions"; import { publicChannelMessagesSubquery } from "./chatMessages"; import { isHiddenNodeName, visibleNodeSqlClause } from "@/lib/node-privacy"; @@ -65,9 +65,6 @@ export async function getLatestChatMessages({ limit = 20, before, after, channel // ingest_timestamp primary key / partition pruning applies. See // publicChannelMessagesSubquery for why this avoids a full-history scan. const innerWhere: string[] = []; - // Filters that must run after aggregation (origin_path_info only exists on - // the grouped output). - const outerWhere: string[] = []; const params: Record = { limit }; // Inner-scan columns must be table-qualified: unqualified, the analyzer binds @@ -87,14 +84,16 @@ export async function getLatestChatMessages({ limit = 20, before, after, channel params.channelId = channelId; } - // Region filtering keys off origin_path_info, which only exists post-group. - const regionFilter = generateRegionWhereClauseFromArray(region); + // Region filter pushes onto the raw table's stored `region` column (derived identically to + // regionSql) BEFORE the GROUP BY, so unmatched rows are pruned at the scan instead of via a + // post-aggregation arrayExists over origin_path_info. Table-qualify the column so it binds to + // the scan, not the aggregate output. '__ALL__'/unset/unknown -> no filter (empty clause). + const regionFilter = generateRegionWhereClause(region, "meshcore_public_channel_messages_raw"); if (regionFilter.whereClause) { - outerWhere.push(regionFilter.whereClause); + innerWhere.push(regionFilter.whereClause); } - const whereClause = outerWhere.length > 0 ? `WHERE ${outerWhere.join(' AND ')}` : ''; - const query = `SELECT ingest_timestamp, mesh_timestamp, channel_hash, mac, hex(encrypted_message) AS encrypted_message, message_count, origin_path_info, message_id FROM ${publicChannelMessagesSubquery(innerWhere)} ${whereClause} ORDER BY ingest_timestamp DESC LIMIT {limit:UInt32}`; + const query = `SELECT ingest_timestamp, mesh_timestamp, channel_hash, mac, hex(encrypted_message) AS encrypted_message, message_count, origin_path_info, message_id FROM ${publicChannelMessagesSubquery(innerWhere)} ORDER BY ingest_timestamp DESC LIMIT {limit:UInt32}`; const resultSet = await clickhouse.query({ query, query_params: params, format: 'JSONEachRow' }); const rows = await resultSet.json(); return rows as Array<{ @@ -474,10 +473,18 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer return []; } - // Build individual query parts - const queryParts: string[] = []; + // Build a per-query branch condition plus its limit. Instead of emitting one + // UNION ALL branch per query (each of which re-merges the entire + // meshcore_adverts_latest_state AggregatingMergeTree), we scan/merge the + // meshcore_adverts_latest view exactly ONCE and fan each surviving row out to + // every branch it matches via arrayJoin, then rank per branch with a window + // function. Per-query branch predicates are heterogeneous (name vs public_key, + // exact vs prefix/substring, plus independent region/lastSeen/is_repeater + // filters), so each branch keeps its own boolean expression. + const branchConditions: string[] = []; + const branchLimits: number[] = []; const allParams: Record = {}; - + queries.forEach((searchQuery, index) => { const { query: searchString, @@ -487,15 +494,15 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer exact = false, is_repeater } = searchQuery; - + // Node privacy: hidden nodes are unsearchable. const where: string[] = [visibleNodeSqlClause("node_name")]; const queryParams: Record = {}; - + // Add search conditions if (searchString && searchString.trim()) { const trimmedQuery = searchString.trim(); - + // Check if it looks like a public key (hex string) if (/^[0-9A-Fa-f]+$/.test(trimmedQuery)) { if (exact) { @@ -519,67 +526,103 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer } } } - + // Add lastSeen filter if provided if (lastSeen !== null && lastSeen !== undefined && lastSeen !== "") { where.push(`last_seen >= now() - INTERVAL {lastSeen_${index}:UInt32} SECOND`); queryParams[`lastSeen_${index}`] = Number(lastSeen); } - + // Add region filtering if specified const regionFilter = generateRegionWhereClause(region); if (regionFilter.whereClause) { where.push(regionFilter.whereClause); } - + // Add is_repeater filter if specified if (is_repeater !== undefined) { where.push(`is_repeater = {isRepeater_${index}:UInt8}`); queryParams[`isRepeater_${index}`] = is_repeater ? 1 : 0; } - - const whereClause = where.length > 0 ? `WHERE ${where.join(' AND ')}` : ''; - - // Reads the live, state-backed meshcore_adverts_latest view (incremental MV over - // meshcore_adverts_latest_state) instead of re-aggregating meshcore_adverts on every - // request. All filter columns (public_key, node_name, last_seen, region, is_repeater) - // exist on the view, so the WHERE pushes straight onto it. - const queryPart = ` - SELECT - public_key, - node_name, - latitude, - longitude, - has_location, - is_repeater, - is_chat_node, - is_room_server, - has_name, - first_heard, - last_seen, - broker, - topic, - ${index} as query_index - FROM meshcore_adverts_latest - ${whereClause} - ORDER BY last_seen DESC - LIMIT {limit_${index}:UInt32} - `; - - queryParts.push(queryPart); - queryParams[`limit_${index}`] = limit; - + + // A branch with no predicates matches every row (preserves the previous + // behavior where an empty WHERE returned the whole table for that query). + branchConditions.push(where.length > 0 ? `(${where.join(' AND ')})` : '1'); + branchLimits.push(limit); + // Add query params to the global params object Object.assign(allParams, queryParams); }); - - // Combine all queries with UNION ALL - const finalQuery = queryParts.join(' UNION ALL '); - - const resultSet = await clickhouse.query({ - query: finalQuery, - query_params: allParams, - format: 'JSONEachRow' + + // The single scan only needs rows matching at least one branch. + const combinedFilter = branchConditions.some((c) => c === '1') + ? '1' + : branchConditions.join(' OR '); + + // Per-branch match flags drive the arrayJoin fan-out: each row is emitted once + // per branch it satisfies (query_index = that branch; -1 for non-matching + // branches, filtered out below). We then rank within each branch by last_seen + // and keep up to the largest requested limit (exact per-branch limits are + // applied in TS below, since limits can differ per branch). + const matchFlags = branchConditions + .map((cond, index) => `if(${cond}, ${index}, -1)`) + .join(', '); + const maxLimit = branchLimits.length > 0 ? Math.max(...branchLimits) : 0; + + // Reads the live, state-backed meshcore_adverts_latest view (incremental MV over + // meshcore_adverts_latest_state) once. All filter columns (public_key, node_name, + // last_seen, region, is_repeater) exist on the view, so the WHERE pushes straight + // onto it. The state table is merged a single time regardless of branch count. + const finalQuery = ` + SELECT + public_key, + node_name, + latitude, + longitude, + has_location, + is_repeater, + is_chat_node, + is_room_server, + has_name, + first_heard, + last_seen, + broker, + topic, + query_index + FROM ( + SELECT + *, + row_number() OVER (PARTITION BY query_index ORDER BY last_seen DESC) AS rn + FROM ( + SELECT + public_key, + node_name, + latitude, + longitude, + has_location, + is_repeater, + is_chat_node, + is_room_server, + has_name, + first_heard, + last_seen, + broker, + topic, + arrayJoin([${matchFlags}]) AS query_index + FROM meshcore_adverts_latest + WHERE ${combinedFilter} + ) + WHERE query_index >= 0 + ) + WHERE rn <= {maxLimit:UInt32} + ORDER BY query_index ASC, last_seen DESC + `; + allParams['maxLimit'] = maxLimit; + + const resultSet = await clickhouse.query({ + query: finalQuery, + query_params: allParams, + format: 'JSONEachRow' }); const rows = await resultSet.json(); @@ -600,25 +643,26 @@ export async function searchMeshcoreNodes(searchParams: SearchQuery | SearchQuer query_index?: number; }; - // If single query, return results without query_index - if (!Array.isArray(searchParams)) { - return (rows as SearchResult[]).map(row => { - const { query_index, ...result } = row; - return result; - }); - } - - // For batch queries, group results by query_index + // Group results by query_index. Rows arrive ordered by (query_index, last_seen + // DESC) and pre-trimmed to at most maxLimit per branch by the window function; + // we slice to each branch's exact limit here since branch limits can differ. const groupedResults = (rows as SearchResult[]).reduce((acc, row) => { const index = row.query_index || 0; if (!acc[index]) { acc[index] = []; } const { query_index, ...result } = row; - acc[index].push(result); + if (acc[index].length < branchLimits[index]) { + acc[index].push(result); + } return acc; }, {} as Record); - + + // If single query, return its results without query_index + if (!Array.isArray(searchParams)) { + return groupedResults[0] || []; + } + // Return results in the same order as input queries return queries.map((_, index) => groupedResults[index] || []); } catch (error) { diff --git a/meshexplorer/src/lib/clickhouse/streaming.ts b/meshexplorer/src/lib/clickhouse/streaming.ts index b2d0026..a44f28b 100644 --- a/meshexplorer/src/lib/clickhouse/streaming.ts +++ b/meshexplorer/src/lib/clickhouse/streaming.ts @@ -1,5 +1,5 @@ import { clickhouse } from './clickhouse'; -import { generateRegionConditionForStreaming, generateRegionArrayConditionForStreaming } from '../regionFilters'; +import { generateRegionConditionForStreaming } from '../regionFilters'; import { publicChannelMessagesSubquery } from './chatMessages'; /** @@ -254,13 +254,14 @@ export function createChatMessagesStreamerConfig( innerConditions.push('meshcore_public_channel_messages_raw.channel_hash = {channelId:String}'); } - // Region filtering keys off origin_path_info, which only exists post-group, so it stays as an - // outer predicate spliced in by the streamer. - let additionalWhereClause = ''; + // Region filter pushes onto the raw table's stored `region` column (derived identically to + // regionSql) BEFORE the GROUP BY, so unmatched rows are pruned at the scan instead of via a + // post-aggregation arrayExists over origin_path_info. Table-qualify the column so it binds to + // the scan, not the aggregate output. Unset/unknown selector -> no filter (empty clause). if (region) { - const regionClause = generateRegionArrayConditionForStreaming(region); + const regionClause = generateRegionConditionForStreaming(region, 'meshcore_public_channel_messages_raw'); if (regionClause) { - additionalWhereClause = regionClause; + innerConditions.push(regionClause); } } @@ -282,8 +283,7 @@ export function createChatMessagesStreamerConfig( `, timeColumn: 'ingest_timestamp', pollInterval: 250, - maxRowsPerPoll: 50, - additionalWhereClause: additionalWhereClause || undefined + maxRowsPerPoll: 50 }; } diff --git a/meshexplorer/src/lib/regionFilters.ts b/meshexplorer/src/lib/regionFilters.ts index b0bcbeb..0a88d87 100644 --- a/meshexplorer/src/lib/regionFilters.ts +++ b/meshexplorer/src/lib/regionFilters.ts @@ -19,8 +19,8 @@ export function generateRegionWhereClauseFromArray(region?: string) { } /** Region condition string for streaming queries (column-based). '' when no/unknown selector. */ -export function generateRegionConditionForStreaming(region?: string): string { - return generateRegionCondition(region); +export function generateRegionConditionForStreaming(region?: string, tableAlias: string = ""): string { + return generateRegionCondition(region, tableAlias); } /** Region condition string for streaming queries over origin_path_info. '' when no/unknown selector. */