Merge pull request #34 from ajvpot/neighbor-refreshable-mvs

Speed up neighbors with hourly refreshable materialized views
This commit is contained in:
Alex Vanderpot
2026-05-29 03:03:10 -04:00
committed by GitHub
3 changed files with 219 additions and 255 deletions
@@ -0,0 +1,162 @@
-- +goose Up
-- Hourly refreshable materialized views that precompute the neighbor graph so the
-- web app reads small tables instead of re-aggregating meshcore_packets per request.
-- Both views fully recompute (atomic replace) every hour.
-- ============================================================================
-- MV 1: meshcore_all_neighbor_edges (powers "show all neighbors")
-- The global edge graph per region (direct path_len=0 adverts + repeater-prefix
-- path edges), with denormalized endpoint details. The app filters by region +
-- bounding box + lastSeen + has_location at request time.
-- ============================================================================
-- +goose StatementBegin
CREATE MATERIALIZED VIEW IF NOT EXISTS meshcore_all_neighbor_edges
REFRESH EVERY 1 HOUR
ENGINE = MergeTree
ORDER BY (region, source_node, target_node)
AS
WITH
regions_topics AS (
SELECT 'seattle' AS region, 'wss://mqtt-us-v1.letsmesh.net:443' AS broker, arrayJoin(['meshcore/SEA']) AS topic
UNION ALL SELECT 'portland', 'tcp://mqtt.davekeogh.com:1883', 'meshcore/pdx'
UNION ALL SELECT 'boston', 'tcp://mqtt.davekeogh.com:1883', 'meshcore/bos'
),
node_details AS (
SELECT public_key, node_name, latitude, longitude, has_location, last_seen
FROM meshcore_adverts_latest
),
-- Repeater prefixes per region (exactly one repeater per 2-char prefix), last 2 days
repeater_prefixes AS (
SELECT
rt.region AS region,
substring(a.public_key, 1, 2) AS prefix,
count() AS node_count,
any(a.public_key) AS representative_key
FROM meshcore_adverts_latest AS a
INNER JOIN regions_topics AS rt ON a.broker = rt.broker AND a.topic = rt.topic
WHERE a.is_repeater = 1 AND a.last_seen >= now() - INTERVAL 2 DAY
GROUP BY rt.region, prefix
HAVING node_count = 1
),
-- Distinct multi-hop path packets (last 1 day), tagged by region
path_src AS (
SELECT DISTINCT payload, path, path_len, broker, topic
FROM meshcore_packets
WHERE path_len >= 2 AND ingest_timestamp >= now() - INTERVAL 1 DAY
),
path_pairs AS (
SELECT DISTINCT
rt.region AS region,
payload,
-- path is a hex string of 1-byte hop prefixes; take 2 hex chars per hop.
-- (The original query used hex(substring(path,i,1)), which re-hexed a single
-- hex char and never matched the 2-char repeater prefixes -> no path edges.)
upper(substring(path, 2 * i - 1, 2)) AS source_prefix,
upper(substring(path, 2 * i + 1, 2)) AS target_prefix
FROM path_src AS p
INNER JOIN regions_topics AS rt ON p.broker = rt.broker AND p.topic = rt.topic
ARRAY JOIN range(1, path_len) AS i
WHERE i < path_len
),
path_neighbors AS (
SELECT region, source_prefix, target_prefix, count() AS packet_count
FROM path_pairs
WHERE source_prefix != target_prefix
GROUP BY region, source_prefix, target_prefix
),
path_connections AS (
SELECT
pn.region AS region,
sm.representative_key AS source_node,
tm.representative_key AS target_node,
pn.packet_count AS packet_count
FROM path_neighbors AS pn
INNER JOIN repeater_prefixes AS sm ON sm.region = pn.region AND sm.prefix = pn.source_prefix
INNER JOIN repeater_prefixes AS tm ON tm.region = pn.region AND tm.prefix = pn.target_prefix
),
-- Global direct connections (path_len = 0 adverts), last 7 days
direct_connections AS (
SELECT DISTINCT hex(origin_pubkey) AS source_node, public_key AS target_node
FROM meshcore_adverts
WHERE path_len = 0
AND hex(origin_pubkey) != public_key
AND ingest_timestamp >= now() - INTERVAL 7 DAY
),
-- Per-region edges: path edges + direct edges not already covered by a path edge
edges AS (
SELECT region, source_node, target_node, 'path' AS connection_type, packet_count
FROM path_connections
UNION ALL
SELECT r.region, d.source_node, d.target_node, 'direct' AS connection_type, CAST(1 AS UInt64) AS packet_count
FROM direct_connections AS d
CROSS JOIN (SELECT DISTINCT region FROM regions_topics) AS r
WHERE (r.region, d.source_node, d.target_node) NOT IN (SELECT region, source_node, target_node FROM path_connections)
AND (r.region, d.target_node, d.source_node) NOT IN (SELECT region, source_node, target_node FROM path_connections)
)
SELECT
e.region AS region,
e.source_node AS source_node,
e.target_node AS target_node,
e.connection_type AS connection_type,
e.packet_count AS packet_count,
sd.node_name AS source_name,
sd.latitude AS source_latitude,
sd.longitude AS source_longitude,
sd.has_location AS source_has_location,
sd.last_seen AS source_last_seen,
td.node_name AS target_name,
td.latitude AS target_latitude,
td.longitude AS target_longitude,
td.has_location AS target_has_location,
td.last_seen AS target_last_seen
FROM edges AS e
INNER JOIN node_details AS sd ON e.source_node = sd.public_key
INNER JOIN node_details AS td ON e.target_node = td.public_key;
-- +goose StatementEnd
-- ============================================================================
-- MV 2: meshcore_node_direct_neighbors (powers node hover + node page)
-- Direct adjacency (path_len = 0) for every node, both directions, with the
-- neighbor's latest attributes. The app filters by node_public_key at request time.
-- ============================================================================
-- +goose StatementBegin
CREATE MATERIALIZED VIEW IF NOT EXISTS meshcore_node_direct_neighbors
REFRESH EVERY 1 HOUR
ENGINE = MergeTree
ORDER BY (node_public_key)
AS
WITH
node_details AS (
SELECT public_key, node_name, latitude, longitude, has_location,
is_repeater, is_chat_node, is_room_server, has_name, last_seen
FROM meshcore_adverts_latest
),
directions AS (
SELECT DISTINCT hex(origin_pubkey) AS node_public_key, public_key AS neighbor_public_key, 'incoming' AS direction
FROM meshcore_adverts
WHERE path_len = 0 AND hex(origin_pubkey) != public_key AND ingest_timestamp >= now() - INTERVAL 7 DAY
UNION ALL
SELECT DISTINCT public_key AS node_public_key, hex(origin_pubkey) AS neighbor_public_key, 'outgoing' AS direction
FROM meshcore_adverts
WHERE path_len = 0 AND hex(origin_pubkey) != public_key AND ingest_timestamp >= now() - INTERVAL 7 DAY
)
SELECT
d.node_public_key AS node_public_key,
d.neighbor_public_key AS neighbor_public_key,
d.direction AS direction,
nd.node_name AS neighbor_name,
nd.latitude AS neighbor_latitude,
nd.longitude AS neighbor_longitude,
nd.has_location AS neighbor_has_location,
nd.is_repeater AS neighbor_is_repeater,
nd.is_chat_node AS neighbor_is_chat_node,
nd.is_room_server AS neighbor_is_room_server,
nd.has_name AS neighbor_has_name,
nd.last_seen AS neighbor_last_seen
FROM directions AS d
INNER JOIN node_details AS nd ON d.neighbor_public_key = nd.public_key;
-- +goose StatementEnd
-- +goose Down
DROP VIEW IF EXISTS meshcore_node_direct_neighbors;
DROP VIEW IF EXISTS meshcore_all_neighbor_edges;
+55 -253
View File
@@ -282,211 +282,59 @@ export async function getMeshcoreNodeInfo(publicKey: string, limit: number = 50)
export async function getAllNodeNeighbors(lastSeen: string | null = null, minLat?: string | null, maxLat?: string | null, minLng?: string | null, maxLng?: string | null, nodeTypes?: string[], region?: string) {
try {
// Build where conditions for visible nodes
let visibleNodeWhereConditions = [
"latitude IS NOT NULL",
"longitude IS NOT NULL"
// Reads the precomputed (hourly-refreshed) neighbor edge graph and filters it
// by region + bounding box + lastSeen. The heavy graph computation lives in the
// refreshable materialized view meshcore_all_neighbor_edges.
const params: Record<string, any> = { region: region || 'seattle' };
const whereConditions = [
"region = {region:String}",
"source_has_location = 1",
"target_has_location = 1",
"source_latitude IS NOT NULL",
"source_longitude IS NOT NULL",
"target_latitude IS NOT NULL",
"target_longitude IS NOT NULL",
];
const params: Record<string, any> = {};
// Add location bounds for visible nodes
// Bounding box: both endpoints must be within view (matches the old visible_nodes behavior)
if (minLat !== null && minLat !== undefined && minLat !== "") {
visibleNodeWhereConditions.push("latitude >= {minLat:Float64}");
whereConditions.push("source_latitude >= {minLat:Float64} AND target_latitude >= {minLat:Float64}");
params.minLat = Number(minLat);
}
if (maxLat !== null && maxLat !== undefined && maxLat !== "") {
visibleNodeWhereConditions.push("latitude <= {maxLat:Float64}");
whereConditions.push("source_latitude <= {maxLat:Float64} AND target_latitude <= {maxLat:Float64}");
params.maxLat = Number(maxLat);
}
if (minLng !== null && minLng !== undefined && minLng !== "") {
visibleNodeWhereConditions.push("longitude >= {minLng:Float64}");
whereConditions.push("source_longitude >= {minLng:Float64} AND target_longitude >= {minLng:Float64}");
params.minLng = Number(minLng);
}
if (maxLng !== null && maxLng !== undefined && maxLng !== "") {
visibleNodeWhereConditions.push("longitude <= {maxLng:Float64}");
whereConditions.push("source_longitude <= {maxLng:Float64} AND target_longitude <= {maxLng:Float64}");
params.maxLng = Number(maxLng);
}
if (nodeTypes && nodeTypes.length > 0) {
visibleNodeWhereConditions.push("type IN {nodeTypes:Array(String)}");
params.nodeTypes = nodeTypes;
}
if (lastSeen !== null && lastSeen !== undefined && lastSeen !== "") {
visibleNodeWhereConditions.push("last_seen >= now() - INTERVAL {lastSeen:UInt32} SECOND");
whereConditions.push("source_last_seen >= now() - INTERVAL {lastSeen:UInt32} SECOND AND target_last_seen >= now() - INTERVAL {lastSeen:UInt32} SECOND");
params.lastSeen = Number(lastSeen);
}
// Build where conditions for meshcore adverts
let meshcoreWhereConditions = [];
if (lastSeen !== null && lastSeen !== undefined && lastSeen !== "") {
meshcoreWhereConditions.push("ingest_timestamp >= now() - INTERVAL {lastSeen:UInt32} SECOND");
}
const meshcoreWhere = meshcoreWhereConditions.length > 0 ? `AND ${meshcoreWhereConditions.join(" AND ")}` : '';
// Build region filtering for meshcore_packets
const regionFilter = generateRegionWhereClause(region);
const packetsRegionWhere = regionFilter.whereClause ? `AND ${regionFilter.whereClause}` : '';
const allNeighborsQuery = `
WITH visible_nodes AS (
-- Get only nodes visible on the current map view
SELECT
node_id,
name,
short_name,
latitude,
longitude,
last_seen,
first_seen,
type
FROM unified_latest_nodeinfo
WHERE ${visibleNodeWhereConditions.join(" AND ")}
),
visible_node_details AS (
-- Get latest attributes for visible nodes from meshcore_adverts
SELECT
public_key,
argMax(node_name, ingest_timestamp) as node_name,
argMax(latitude, ingest_timestamp) as latitude,
argMax(longitude, ingest_timestamp) as longitude,
argMax(has_location, ingest_timestamp) as has_location,
argMax(is_repeater, ingest_timestamp) as is_repeater,
argMax(is_chat_node, ingest_timestamp) as is_chat_node,
argMax(is_room_server, ingest_timestamp) as is_room_server,
argMax(has_name, ingest_timestamp) as has_name
FROM meshcore_adverts
WHERE public_key IN (SELECT node_id FROM visible_nodes)
${meshcoreWhere}
GROUP BY public_key
),
repeater_prefixes AS (
-- Get repeater prefixes info, excluding collisions (multiple repeaters per prefix)
-- Only include repeaters from the selected region
SELECT
substring(public_key, 1, 2) as prefix,
count() as node_count,
any(public_key) as representative_key,
any(node_name) as representative_name
FROM meshcore_adverts_latest
WHERE is_repeater = 1
AND last_seen >= now() - INTERVAL 2 DAY
${regionFilter.whereClause ? `AND ${regionFilter.whereClause}` : ''}
GROUP BY prefix
HAVING node_count = 1 -- Only include prefixes with exactly one repeater
),
direct_connections AS (
-- Get all direct connections (path_len = 0) but only between visible nodes
SELECT DISTINCT
hex(origin_pubkey) as source_node,
public_key as target_node,
'direct' as connection_type,
1 as packet_count -- Direct connections don't have packet counts, use 1 as default
FROM meshcore_adverts
WHERE path_len = 0
AND hex(origin_pubkey) != public_key
-- Only include connections where both nodes are visible
AND hex(origin_pubkey) IN (SELECT node_id FROM visible_nodes)
AND public_key IN (SELECT node_id FROM visible_nodes)
${meshcoreWhere}
),
path_neighbors AS (
-- Extract neighbors from routing paths with unique payload counts
-- Group by payload first to avoid double counting same message propagation
SELECT
source_prefix,
target_prefix,
'path' as connection_type,
count() as packet_count
FROM (
SELECT DISTINCT
payload,
upper(hex(substring(path, i, 1))) as source_prefix,
upper(hex(substring(path, i + 1, 1))) as target_prefix
FROM (
SELECT DISTINCT
payload,
path,
path_len
FROM meshcore_packets
WHERE path_len >= 2
AND ingest_timestamp >= now() - INTERVAL 1 DAY
${packetsRegionWhere}
) p
ARRAY JOIN range(1, path_len) as i
WHERE i < path_len
) path_pairs
WHERE source_prefix IN (SELECT prefix FROM repeater_prefixes)
AND target_prefix IN (SELECT prefix FROM repeater_prefixes)
AND source_prefix != target_prefix
GROUP BY source_prefix, target_prefix
),
prefix_to_key_map AS (
-- Map prefixes back to full public keys for visible nodes
SELECT
rp.prefix,
rp.representative_key as public_key,
rp.representative_name as node_name
FROM repeater_prefixes rp
WHERE rp.representative_key IN (SELECT node_id FROM visible_nodes)
),
path_connections AS (
-- Convert prefix-based path neighbors to public key connections
-- Include all path connections (no exclusion of direct connections)
SELECT
source_map.public_key as source_node,
target_map.public_key as target_node,
'path' as connection_type,
pn.packet_count
FROM path_neighbors pn
JOIN prefix_to_key_map source_map ON pn.source_prefix = source_map.prefix
JOIN prefix_to_key_map target_map ON pn.target_prefix = target_map.prefix
),
direct_connections_filtered AS (
-- Get direct connections but exclude pairs that already have path connections
SELECT
source_node,
target_node,
connection_type,
packet_count
FROM direct_connections
WHERE (source_node, target_node) NOT IN (
SELECT source_node, target_node FROM path_connections
)
AND (target_node, source_node) NOT IN (
SELECT source_node, target_node FROM path_connections
)
),
neighbor_connections AS (
-- Combine path connections and filtered direct connections (path connections take precedence)
SELECT source_node, target_node, connection_type, packet_count FROM path_connections
UNION ALL
SELECT source_node, target_node, connection_type, packet_count FROM direct_connections_filtered
)
SELECT
connections.source_node,
connections.target_node,
connections.connection_type,
connections.packet_count,
source_details.node_name as source_name,
source_details.latitude as source_latitude,
source_details.longitude as source_longitude,
source_details.has_location as source_has_location,
target_details.node_name as target_name,
target_details.latitude as target_latitude,
target_details.longitude as target_longitude,
target_details.has_location as target_has_location
FROM neighbor_connections AS connections
LEFT JOIN visible_node_details AS source_details ON connections.source_node = source_details.public_key
LEFT JOIN visible_node_details AS target_details ON connections.target_node = target_details.public_key
WHERE source_details.public_key IS NOT NULL
AND target_details.public_key IS NOT NULL
AND source_details.has_location = 1
AND target_details.has_location = 1
AND source_details.latitude IS NOT NULL
AND source_details.longitude IS NOT NULL
AND target_details.latitude IS NOT NULL
AND target_details.longitude IS NOT NULL
ORDER BY connections.connection_type, connections.source_node, connections.target_node
SELECT
source_node,
target_node,
connection_type,
packet_count,
source_name,
source_latitude,
source_longitude,
source_has_location,
target_name,
target_latitude,
target_longitude,
target_has_location
FROM meshcore_all_neighbor_edges
WHERE ${whereConditions.join(" AND ")}
ORDER BY connection_type, source_node, target_node
`;
const neighborsResult = await clickhouse.query({
@@ -518,77 +366,31 @@ export async function getAllNodeNeighbors(lastSeen: string | null = null, minLat
export async function getMeshcoreNodeNeighbors(publicKey: string, lastSeen: string | null = null) {
try {
// Build base where conditions for both directions
let baseWhereConditions = [];
// Reads the precomputed (hourly-refreshed) per-node direct adjacency from the
// refreshable materialized view meshcore_node_direct_neighbors.
const params: Record<string, any> = { publicKey };
// Add lastSeen filter if provided
const whereConditions = ["node_public_key = {publicKey:String}"];
if (lastSeen !== null) {
baseWhereConditions.push("ingest_timestamp >= now() - INTERVAL {lastSeen:UInt32} SECOND");
whereConditions.push("neighbor_last_seen >= now() - INTERVAL {lastSeen:UInt32} SECOND");
params.lastSeen = Number(lastSeen);
}
const baseWhere = baseWhereConditions.length > 0 ? `AND ${baseWhereConditions.join(" AND ")}` : '';
const neighborsQuery = `
WITH neighbor_details AS (
-- Get latest attributes for all nodes based on ingest_timestamp
SELECT
public_key,
argMax(node_name, ingest_timestamp) as node_name,
argMax(latitude, ingest_timestamp) as latitude,
argMax(longitude, ingest_timestamp) as longitude,
argMax(has_location, ingest_timestamp) as has_location,
argMax(is_repeater, ingest_timestamp) as is_repeater,
argMax(is_chat_node, ingest_timestamp) as is_chat_node,
argMax(is_room_server, ingest_timestamp) as is_room_server,
argMax(has_name, ingest_timestamp) as has_name
FROM meshcore_adverts
GROUP BY public_key
),
neighbor_directions AS (
-- Direction 1: Adverts heard directly by the queried node
-- (hex(origin_pubkey) is the queried node, public_key is the neighbor)
SELECT DISTINCT
public_key as neighbor_public_key,
'incoming' as direction
FROM meshcore_adverts
WHERE hex(origin_pubkey) = {publicKey:String}
AND path_len = 0
AND public_key != {publicKey:String}
${baseWhere}
UNION ALL
-- Direction 2: Adverts from the queried node heard by other nodes
-- (public_key is the queried node, origin_pubkey is the neighbor)
SELECT DISTINCT
hex(origin_pubkey) as neighbor_public_key,
'outgoing' as direction
FROM meshcore_adverts
WHERE public_key = {publicKey:String}
AND path_len = 0
AND hex(origin_pubkey) != {publicKey:String}
${baseWhere}
)
SELECT
neighbors.neighbor_public_key as public_key,
details.node_name,
details.latitude,
details.longitude,
details.has_location,
details.is_repeater,
details.is_chat_node,
details.is_room_server,
details.has_name,
groupUniqArray(neighbors.direction) as directions
FROM neighbor_directions AS neighbors
LEFT JOIN neighbor_details AS details ON neighbors.neighbor_public_key = details.public_key
WHERE details.public_key IS NOT NULL
GROUP BY neighbors.neighbor_public_key, details.node_name, details.latitude, details.longitude,
details.has_location, details.is_repeater, details.is_chat_node,
details.is_room_server, details.has_name
ORDER BY neighbors.neighbor_public_key
SELECT
neighbor_public_key AS public_key,
any(neighbor_name) AS node_name,
any(neighbor_latitude) AS latitude,
any(neighbor_longitude) AS longitude,
any(neighbor_has_location) AS has_location,
any(neighbor_is_repeater) AS is_repeater,
any(neighbor_is_chat_node) AS is_chat_node,
any(neighbor_is_room_server) AS is_room_server,
any(neighbor_has_name) AS has_name,
groupUniqArray(direction) AS directions
FROM meshcore_node_direct_neighbors
WHERE ${whereConditions.join(" AND ")}
GROUP BY neighbor_public_key
ORDER BY neighbor_public_key
`;
const neighborsResult = await clickhouse.query({
+2 -2
View File
@@ -9,8 +9,8 @@ export const REGIONS: RegionConfig[] = [
{
name: "seattle",
friendlyName: "Seattle (PugetMesh, SalishMesh)",
broker: "tcp://mqtt.davekeogh.com:1883",
topics: ["meshcore", "meshcore/salish"]
broker: "wss://mqtt-us-v1.letsmesh.net:443",
topics: ["meshcore/SEA"]
},
{
name: "portland",