Compare commits

...

7 Commits

Author SHA1 Message Date
l5y c64a46a481 web: address review comments 2026-02-14 23:09:41 +01:00
l5y 11e5de2851 web: address review comments 2026-02-14 22:09:04 +01:00
l5y 8b3aa2533c web: address review comments 2026-02-14 22:02:02 +01:00
l5y e004cfacd7 web: address review comments 2026-02-14 21:38:56 +01:00
l5y 05d267d0c0 web: add cursor pagination to apis 2026-02-14 21:16:38 +01:00
l5y 5b0a6f5f8b web: expose node stats in distinct api (#641)
* web: expose node stats in distinct api

* web: address review comments

* web: address review comments

* web: address review comments

* web: address review comments
2026-02-14 21:14:10 +01:00
l5y 2e8b5ad856 web: do not merge channels by name (#640) 2026-02-14 15:42:14 +01:00
14 changed files with 2261 additions and 290 deletions
+49 -9
View File
@@ -739,6 +739,34 @@ module PotatoMesh
[nil, errors]
end
# Resolve the best matching active-node count from a remote /api/stats payload.
#
# @param payload [Hash, nil] decoded JSON payload from /api/stats.
# @param max_age_seconds [Integer] activity window currently expected for federation freshness.
# @return [Integer, nil] selected active-node count when available.
def remote_active_node_count_from_stats(payload, max_age_seconds:)
return nil unless payload.is_a?(Hash)
active_nodes = payload["active_nodes"]
return nil unless active_nodes.is_a?(Hash)
age = coerce_integer(max_age_seconds) || 0
key = if age <= 3600
"hour"
elsif age <= 86_400
"day"
elsif age <= PotatoMesh::Config.week_seconds
"week"
else
"month"
end
value = coerce_integer(active_nodes[key])
return nil unless value
[value, 0].max
end
# Parse a remote federation instance payload into canonical attributes.
#
# @param payload [Hash] JSON object describing a remote instance.
@@ -1049,21 +1077,33 @@ module PotatoMesh
attributes[:is_private] = false if attributes[:is_private].nil?
stats_payload, stats_metadata = fetch_instance_json(attributes[:domain], "/api/stats")
stats_count = remote_active_node_count_from_stats(
stats_payload,
max_age_seconds: PotatoMesh::Config.remote_instance_max_node_age,
)
attributes[:nodes_count] = stats_count if stats_count
nodes_since_path = "/api/nodes?since=#{recent_cutoff}&limit=1000"
nodes_since_window, nodes_since_metadata = fetch_instance_json(attributes[:domain], nodes_since_path)
if nodes_since_window.is_a?(Array)
if stats_count.nil? && attributes[:nodes_count].nil? && nodes_since_window.is_a?(Array)
attributes[:nodes_count] = nodes_since_window.length
elsif nodes_since_metadata
warn_log(
"Failed to load remote node window",
context: "federation.instances",
domain: attributes[:domain],
reason: Array(nodes_since_metadata).map(&:to_s).join("; "),
)
end
remote_nodes, node_metadata = fetch_instance_json(attributes[:domain], "/api/nodes")
remote_nodes ||= nodes_since_window if nodes_since_window.is_a?(Array)
remote_nodes = nodes_since_window if remote_nodes.nil? && nodes_since_window.is_a?(Array)
if attributes[:nodes_count].nil? && remote_nodes.is_a?(Array)
attributes[:nodes_count] = remote_nodes.length
end
if stats_count.nil? && Array(stats_metadata).any?
debug_log(
"Remote instance /api/stats unavailable; using node list fallback",
context: "federation.instances",
domain: attributes[:domain],
reason: Array(stats_metadata).map(&:to_s).join("; "),
)
end
unless remote_nodes
warn_log(
"Failed to load remote node data",
+81 -22
View File
@@ -165,37 +165,96 @@ module PotatoMesh
# malformed rows gracefully. The dataset is restricted to records updated
# within the rolling window defined by PotatoMesh::Config.week_seconds.
#
# @return [Array<Hash>] list of cleaned instance payloads.
def load_instances_for_api
# @param limit [Integer, nil] optional page size used when pagination is enabled.
# @param cursor [String, nil] optional keyset cursor for pagination.
# @param with_pagination [Boolean] when true, return items and next cursor metadata.
# @return [Array<Hash>, Hash] list of cleaned instance payloads or pagination metadata hash.
def load_instances_for_api(limit: nil, cursor: nil, with_pagination: false)
clean_duplicate_instances!
db = open_database(readonly: true)
db.results_as_hash = true
now = Time.now.to_i
min_last_update_time = now - PotatoMesh::Config.week_seconds
sql = <<~SQL
SELECT id, domain, pubkey, name, version, channel, frequency,
latitude, longitude, last_update_time, is_private, nodes_count, contact_link, signature
FROM instances
WHERE domain IS NOT NULL AND TRIM(domain) != ''
AND pubkey IS NOT NULL AND TRIM(pubkey) != ''
AND last_update_time IS NOT NULL AND last_update_time >= ?
ORDER BY LOWER(domain)
SQL
safe_limit = coerce_query_limit(limit) if with_pagination
fetch_limit = with_pagination ? safe_limit + 1 : nil
where_clauses = [
"id IS NOT NULL",
"TRIM(id) != ''",
"domain IS NOT NULL",
"TRIM(domain) != ''",
"pubkey IS NOT NULL",
"TRIM(pubkey) != ''",
"last_update_time IS NOT NULL",
"last_update_time >= ?",
]
items = []
cursor_payload = with_pagination ? decode_query_cursor(cursor) : nil
cursor_domain = cursor_payload ? sanitize_instance_domain(cursor_payload["domain"])&.downcase : nil
cursor_id = cursor_payload ? string_or_nil(cursor_payload["id"]) : nil
rows = with_busy_retry do
db.execute(sql, min_last_update_time)
loop do
page_where_clauses = where_clauses.dup
page_params = [min_last_update_time]
if with_pagination && cursor_domain && cursor_id
page_where_clauses << "(LOWER(domain) > ? OR (LOWER(domain) = ? AND id > ?))"
page_params.concat([cursor_domain, cursor_domain, cursor_id])
end
sql = <<~SQL
SELECT id, domain, pubkey, name, version, channel, frequency,
latitude, longitude, last_update_time, is_private, nodes_count, contact_link, signature
FROM instances
WHERE #{page_where_clauses.join("\n AND ")}
ORDER BY LOWER(domain), id
SQL
sql += " LIMIT ?" if with_pagination
page_params << fetch_limit if with_pagination
rows = with_busy_retry do
db.execute(sql, page_params)
end
rows.each do |row|
normalized = normalize_instance_row(row)
next unless normalized
last_update_time = normalized["lastUpdateTime"]
next unless last_update_time.is_a?(Integer) && last_update_time >= min_last_update_time
items << normalized
end
return items unless with_pagination
break if items.length > safe_limit
break if rows.length < fetch_limit
marker_row = rows.reverse.find do |row|
string_or_nil(row["domain"]) && string_or_nil(row["id"])
end
break unless marker_row
marker_domain = string_or_nil(marker_row["domain"])&.downcase
marker_id = string_or_nil(marker_row["id"])
break unless marker_domain && marker_id
cursor_domain = marker_domain
cursor_id = marker_id
end
rows.each_with_object([]) do |row, memo|
normalized = normalize_instance_row(row)
next unless normalized
last_update_time = normalized["lastUpdateTime"]
next unless last_update_time.is_a?(Integer) && last_update_time >= min_last_update_time
memo << normalized
has_more = items.length > safe_limit
paged_items = has_more ? items.first(safe_limit) : items
next_cursor = nil
if has_more && !paged_items.empty?
marker = paged_items.last
next_cursor = encode_query_cursor({
"domain" => string_or_nil(marker["domain"]),
"id" => string_or_nil(marker["id"]),
})
end
{ items: paged_items, next_cursor: next_cursor }
rescue SQLite3::Exception => e
warn_log(
"Failed to load instance records",
@@ -203,7 +262,7 @@ module PotatoMesh
error_class: e.class.name,
error_message: e.message,
)
[]
with_pagination ? { items: [], next_cursor: nil } : []
ensure
db&.close
end
+409 -50
View File
@@ -127,6 +127,162 @@ module PotatoMesh
[threshold, floor].max
end
# Normalise an optional upper-bound timestamp for keyset pagination.
#
# @param before [Object] requested upper bound expressed as unix seconds.
# @param ceiling [Integer] maximum allowable timestamp.
# @return [Integer, nil] normalized upper bound or nil when absent.
def normalize_before_threshold(before, ceiling: Time.now.to_i)
value = coerce_integer(before)
return nil if value.nil?
value = 0 if value.negative?
[value, ceiling].min
end
# Decode a keyset cursor token previously emitted by {encode_query_cursor}.
#
# @param token [String, nil] base64 cursor token.
# @return [Hash, nil] decoded cursor payload.
def decode_query_cursor(token)
value = string_or_nil(token)
return nil unless value
decoded = Base64.urlsafe_decode64(value)
parsed = JSON.parse(decoded)
parsed.is_a?(Hash) ? parsed : nil
rescue ArgumentError, JSON::ParserError
nil
end
# Encode a cursor payload for keyset pagination transport.
#
# @param payload [Hash] cursor components.
# @return [String] URL-safe base64 cursor token.
def encode_query_cursor(payload)
Base64.urlsafe_encode64(JSON.generate(payload))
end
# Parse a rowid-based time cursor payload from pagination token.
#
# @param cursor [String, nil] cursor token.
# @param time_key [String] payload key containing the timestamp component.
# @return [Array<(Integer, Integer)>, Array<(nil, nil)>] decoded time/rowid pair.
def decode_rowid_time_cursor(cursor, time_key:)
cursor_payload = decode_query_cursor(cursor)
return [nil, nil] unless cursor_payload
[coerce_integer(cursor_payload[time_key]), coerce_integer(cursor_payload["rowid"])]
end
# Build pagination metadata for rowid-keyset collections.
#
# @param items [Array<Hash>] compacted rows.
# @param limit [Integer] requested limit.
# @param time_key [String] cursor payload timestamp key.
# @param marker_time [Proc] extractor receiving marker row hash.
# @return [Hash{Symbol => Object}] items and optional next_cursor.
def build_rowid_pagination_response(items, limit, time_key:, marker_time:)
has_more = items.length > limit
paged_items = has_more ? items.first(limit) : items
next_cursor = nil
if has_more && !paged_items.empty?
marker = paged_items.last
next_cursor = encode_query_cursor({
time_key => coerce_integer(marker_time.call(marker)),
"rowid" => coerce_integer(marker["_cursor_rowid"]),
})
end
paged_items.each { |item| item.delete("_cursor_time") }
paged_items.each { |item| item.delete("_cursor_rowid") }
{ items: paged_items, next_cursor: next_cursor }
end
# Append normalized since/before predicates for time-windowed collections.
#
# @param where_clauses [Array<String>] mutable SQL predicate fragments.
# @param params [Array<Object>] mutable SQL bind parameters.
# @param since [Object] lower-bound timestamp candidate.
# @param before [Object] upper-bound timestamp candidate.
# @param since_floor [Integer] minimum accepted since threshold.
# @param ceiling [Integer] maximum accepted before threshold.
# @param time_expression [String] SQL expression used for temporal filtering.
# @return [Integer] normalized since threshold.
def append_time_window_filters!(
where_clauses:,
params:,
since:,
before:,
since_floor:,
ceiling:,
time_expression:
)
since_threshold = normalize_since_threshold(since, floor: since_floor)
before_threshold = normalize_before_threshold(before, ceiling: ceiling)
where_clauses << "#{time_expression} >= ?"
params << since_threshold
if before_threshold
where_clauses << "#{time_expression} <= ?"
params << before_threshold
end
since_threshold
end
# Append rowid/timestamp keyset predicates for descending time-ordered tables.
#
# @param where_clauses [Array<String>] mutable SQL predicate fragments.
# @param params [Array<Object>] mutable SQL bind parameters.
# @param cursor [String, nil] encoded cursor token.
# @param time_key [String] cursor payload timestamp key.
# @param time_expression [String] SQL timestamp expression used for ordering.
# @return [void]
def append_rowid_time_cursor_filter!(where_clauses:, params:, cursor:, time_key:, time_expression:)
cursor_time, cursor_rowid = decode_rowid_time_cursor(cursor, time_key: time_key)
return unless cursor_time && cursor_rowid
where_clauses << "(#{time_expression} < ? OR (#{time_expression} = ? AND rowid < ?))"
params.concat([cursor_time, cursor_time, cursor_rowid])
end
# Return exact active-node counts across common activity windows.
#
# Counts are resolved directly in SQL with COUNT(*) thresholds against
# +nodes.last_heard+ to avoid sampling bias from list endpoint limits.
#
# @param now [Integer] reference unix timestamp in seconds.
# @param db [SQLite3::Database, nil] optional open database handle to reuse.
# @return [Hash{String => Integer}] counts keyed by hour/day/week/month.
def query_active_node_stats(now: Time.now.to_i, db: nil)
handle = db || open_database(readonly: true)
handle.results_as_hash = true
reference_now = coerce_integer(now) || Time.now.to_i
hour_cutoff = reference_now - 3600
day_cutoff = reference_now - 86_400
week_cutoff = reference_now - PotatoMesh::Config.week_seconds
month_cutoff = reference_now - (30 * 24 * 60 * 60)
private_filter = private_mode? ? " AND (role IS NULL OR role <> 'CLIENT_HIDDEN')" : ""
sql = <<~SQL
SELECT
(SELECT COUNT(*) FROM nodes WHERE last_heard >= ?#{private_filter}) AS hour_count,
(SELECT COUNT(*) FROM nodes WHERE last_heard >= ?#{private_filter}) AS day_count,
(SELECT COUNT(*) FROM nodes WHERE last_heard >= ?#{private_filter}) AS week_count,
(SELECT COUNT(*) FROM nodes WHERE last_heard >= ?#{private_filter}) AS month_count
SQL
row = with_busy_retry do
handle.get_first_row(sql, [hour_cutoff, day_cutoff, week_cutoff, month_cutoff])
end || {}
{
"hour" => row["hour_count"].to_i,
"day" => row["day_count"].to_i,
"week" => row["week_count"].to_i,
"month" => row["month_count"].to_i,
}
ensure
handle&.close unless db
end
def node_reference_tokens(node_ref)
parts = canonical_node_parts(node_ref)
canonical_id, numeric_id = parts ? parts[0, 2] : [nil, nil]
@@ -215,26 +371,43 @@ module PotatoMesh
# @param node_ref [String, Integer, nil] optional node reference to narrow results.
# @param since [Integer] unix timestamp threshold applied in addition to the rolling window for collections.
# @return [Array<Hash>] compacted node rows suitable for API responses.
def query_nodes(limit, node_ref: nil, since: 0)
def query_nodes(limit, node_ref: nil, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
db = open_database(readonly: true)
db.results_as_hash = true
now = Time.now.to_i
min_last_heard = now - PotatoMesh::Config.week_seconds
since_floor = node_ref ? 0 : min_last_heard
since_threshold = normalize_since_threshold(since, floor: since_floor)
before_threshold = normalize_before_threshold(before, ceiling: now)
params = []
where_clauses = []
if node_ref
clause = node_lookup_clause(node_ref, string_columns: ["node_id"], numeric_columns: ["num"])
return [] unless clause
return with_pagination ? { items: [], next_cursor: nil } : [] unless clause
where_clauses << clause.first
params.concat(clause.last)
else
where_clauses << "last_heard >= ?"
params << since_threshold
end
if before_threshold
where_clauses << "last_heard <= ?"
params << before_threshold
end
if with_pagination
cursor_payload = decode_query_cursor(cursor)
if cursor_payload
cursor_last_heard = coerce_integer(cursor_payload["last_heard"])
cursor_node_id = string_or_nil(cursor_payload["node_id"])
if cursor_last_heard && cursor_node_id
where_clauses << "(last_heard < ? OR (last_heard = ? AND node_id < ?))"
params.concat([cursor_last_heard, cursor_last_heard, cursor_node_id])
end
end
end
if private_mode?
where_clauses << "(role IS NULL OR role <> 'CLIENT_HIDDEN')"
@@ -250,10 +423,10 @@ module PotatoMesh
SQL
sql += " WHERE #{where_clauses.join(" AND ")}\n" if where_clauses.any?
sql += <<~SQL
ORDER BY last_heard DESC
ORDER BY last_heard DESC, node_id DESC
LIMIT ?
SQL
params << limit
params << fetch_limit
rows = db.execute(sql, params)
rows = rows.select do |r|
@@ -263,7 +436,15 @@ module PotatoMesh
.max
last_candidate && last_candidate >= since_threshold
end
rows.each do |r|
has_more = with_pagination && rows.length > limit
paged_rows = has_more ? rows.first(limit) : rows
marker_row = has_more ? paged_rows.last : nil
marker_last_heard = marker_row ? coerce_integer(marker_row["last_heard"]) : nil
marker_node_id = marker_row ? string_or_nil(marker_row["node_id"]) : nil
output_rows = with_pagination ? paged_rows : rows
output_rows.each do |r|
r["role"] ||= "CLIENT"
lh = r["last_heard"]&.to_i
pt = r["position_time"]&.to_i
@@ -276,7 +457,18 @@ module PotatoMesh
pb = r["precision_bits"]
r["precision_bits"] = pb.to_i if pb
end
rows.map { |row| compact_api_row(row) }
items = output_rows.map { |row| compact_api_row(row) }
items.each { |item| item.delete("_cursor_rowid") }
return items unless with_pagination
next_cursor = nil
if has_more && marker_last_heard && marker_node_id
next_cursor = encode_query_cursor({
"last_heard" => marker_last_heard,
"node_id" => marker_node_id,
})
end
{ items: items, next_cursor: next_cursor }
ensure
db&.close
end
@@ -286,22 +478,41 @@ module PotatoMesh
# @param limit [Integer] maximum number of ingestors to return.
# @param since [Integer] unix timestamp threshold applied in addition to the rolling window for collections.
# @return [Array<Hash>] compacted ingestor rows suitable for API responses.
def query_ingestors(limit, since: 0)
def query_ingestors(limit, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
db = open_database(readonly: true)
db.results_as_hash = true
now = Time.now.to_i
cutoff = now - PotatoMesh::Config.week_seconds
since_threshold = normalize_since_threshold(since, floor: cutoff)
before_threshold = normalize_before_threshold(before, ceiling: now)
where_clauses = ["last_seen_time >= ?"]
params = [since_threshold]
if before_threshold
where_clauses << "last_seen_time <= ?"
params << before_threshold
end
if with_pagination
cursor_payload = decode_query_cursor(cursor)
if cursor_payload
cursor_last_seen = coerce_integer(cursor_payload["last_seen_time"])
cursor_node_id = string_or_nil(cursor_payload["node_id"])
if cursor_last_seen && cursor_node_id
where_clauses << "(last_seen_time < ? OR (last_seen_time = ? AND node_id < ?))"
params.concat([cursor_last_seen, cursor_last_seen, cursor_node_id])
end
end
end
sql = <<~SQL
SELECT node_id, start_time, last_seen_time, version, lora_freq, modem_preset
FROM ingestors
WHERE last_seen_time >= ?
ORDER BY last_seen_time DESC
WHERE #{where_clauses.join(" AND ")}
ORDER BY last_seen_time DESC, node_id DESC
LIMIT ?
SQL
rows = db.execute(sql, [since_threshold, limit])
rows = db.execute(sql, params + [fetch_limit])
rows.each do |row|
row.delete_if { |key, _| key.is_a?(Integer) }
start_time = coerce_integer(row["start_time"])
@@ -317,7 +528,21 @@ module PotatoMesh
row["last_seen_iso"] = Time.at(last_seen_time).utc.iso8601 if last_seen_time
end
rows.map { |row| compact_api_row(row) }
items = rows.map { |row| compact_api_row(row) }
items.each { |item| item.delete("_cursor_rowid") }
return items unless with_pagination
has_more = items.length > limit
paged_items = has_more ? items.first(limit) : items
next_cursor = nil
if has_more && !paged_items.empty?
marker = paged_items.last
next_cursor = encode_query_cursor({
"last_seen_time" => coerce_integer(marker["last_seen_time"]),
"node_id" => string_or_nil(marker["node_id"]),
})
end
{ items: paged_items, next_cursor: next_cursor }
ensure
db&.close
end
@@ -329,9 +554,11 @@ module PotatoMesh
# @param include_encrypted [Boolean] when true, include encrypted payloads in the response.
# @param since [Integer] unix timestamp threshold; messages with rx_time older than this are excluded.
# @return [Array<Hash>] compacted message rows safe for API responses.
def query_messages(limit, node_ref: nil, include_encrypted: false, since: 0)
def query_messages(limit, node_ref: nil, include_encrypted: false, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
since_threshold = normalize_since_threshold(since, floor: 0)
before_threshold = normalize_before_threshold(before)
db = open_database(readonly: true)
db.results_as_hash = true
params = []
@@ -341,10 +568,25 @@ module PotatoMesh
include_encrypted = !!include_encrypted
where_clauses << "m.rx_time >= ?"
params << since_threshold
if before_threshold
where_clauses << "m.rx_time <= ?"
params << before_threshold
end
unless include_encrypted
where_clauses << "COALESCE(TRIM(m.encrypted), '') = ''"
end
if with_pagination
cursor_payload = decode_query_cursor(cursor)
if cursor_payload
cursor_rx_time = coerce_integer(cursor_payload["rx_time"])
cursor_id = string_or_nil(cursor_payload["id"])
if cursor_rx_time && cursor_id
where_clauses << "(m.rx_time < ? OR (m.rx_time = ? AND m.id < ?))"
params.concat([cursor_rx_time, cursor_rx_time, cursor_id])
end
end
end
if node_ref
clause = node_lookup_clause(node_ref, string_columns: ["m.from_id", "m.to_id"])
@@ -362,10 +604,10 @@ module PotatoMesh
SQL
sql += " WHERE #{where_clauses.join(" AND ")}\n"
sql += <<~SQL
ORDER BY m.rx_time DESC
ORDER BY m.rx_time DESC, m.id DESC
LIMIT ?
SQL
params << limit
params << fetch_limit
rows = db.execute(sql, params)
rows.each do |r|
r.delete_if { |key, _| key.is_a?(Integer) }
@@ -407,7 +649,21 @@ module PotatoMesh
)
end
end
rows.map { |row| compact_api_row(row) }
items = rows.map { |row| compact_api_row(row) }
items.each { |item| item.delete("_cursor_rowid") }
return items unless with_pagination
has_more = items.length > limit
paged_items = has_more ? items.first(limit) : items
next_cursor = nil
if has_more && !paged_items.empty?
marker = paged_items.last
next_cursor = encode_query_cursor({
"rx_time" => coerce_integer(marker["rx_time"]),
"id" => string_or_nil(marker["id"]),
})
end
{ items: paged_items, next_cursor: next_cursor }
ensure
db&.close
end
@@ -418,18 +674,26 @@ module PotatoMesh
# @param node_ref [String, Integer, nil] optional node reference to scope results.
# @param since [Integer] unix timestamp threshold applied in addition to the rolling window.
# @return [Array<Hash>] compacted position rows suitable for API responses.
def query_positions(limit, node_ref: nil, since: 0)
def query_positions(limit, node_ref: nil, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
db = open_database(readonly: true)
db.results_as_hash = true
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.week_seconds
time_expression = "COALESCE(rx_time, position_time, 0)"
since_floor = node_ref ? 0 : min_rx_time
since_threshold = normalize_since_threshold(since, floor: since_floor)
where_clauses << "COALESCE(rx_time, position_time, 0) >= ?"
params << since_threshold
append_time_window_filters!(
where_clauses: where_clauses,
params: params,
since: since,
before: before,
since_floor: since_floor,
ceiling: now,
time_expression: time_expression,
)
if node_ref
clause = node_lookup_clause(node_ref, string_columns: ["node_id"], numeric_columns: ["node_num"])
@@ -438,15 +702,22 @@ module PotatoMesh
params.concat(clause.last)
end
sql = <<~SQL
SELECT * FROM positions
SQL
append_rowid_time_cursor_filter!(
where_clauses: where_clauses,
params: params,
cursor: cursor,
time_key: "cursor_time",
time_expression: time_expression,
) if with_pagination
select_sql = with_pagination ? "SELECT *, rowid AS _cursor_rowid, COALESCE(rx_time, position_time, 0) AS _cursor_time FROM positions" : "SELECT * FROM positions"
sql = "#{select_sql}\n"
sql += " WHERE #{where_clauses.join(" AND ")}\n" if where_clauses.any?
sql += <<~SQL
ORDER BY rx_time DESC
ORDER BY COALESCE(rx_time, position_time, 0) DESC, rowid DESC
LIMIT ?
SQL
params << limit
params << fetch_limit
rows = db.execute(sql, params)
rows.each do |r|
rx_time = coerce_integer(r["rx_time"])
@@ -466,7 +737,16 @@ module PotatoMesh
r["pdop"] = coerce_float(r["pdop"])
r["snr"] = coerce_float(r["snr"])
end
rows.map { |row| compact_api_row(row) }
items = rows.map { |row| compact_api_row(row) }
items.each { |item| item.delete("_cursor_rowid") } unless with_pagination
return items unless with_pagination
build_rowid_pagination_response(
items,
limit,
time_key: "cursor_time",
marker_time: ->(marker) { marker["_cursor_time"] },
)
ensure
db&.close
end
@@ -477,18 +757,26 @@ module PotatoMesh
# @param node_ref [String, Integer, nil] optional node reference to scope results.
# @param since [Integer] unix timestamp threshold applied in addition to the rolling window for collections.
# @return [Array<Hash>] compacted neighbor rows suitable for API responses.
def query_neighbors(limit, node_ref: nil, since: 0)
def query_neighbors(limit, node_ref: nil, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
db = open_database(readonly: true)
db.results_as_hash = true
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.week_seconds
time_expression = "COALESCE(rx_time, 0)"
since_floor = node_ref ? 0 : min_rx_time
since_threshold = normalize_since_threshold(since, floor: since_floor)
where_clauses << "COALESCE(rx_time, 0) >= ?"
params << since_threshold
append_time_window_filters!(
where_clauses: where_clauses,
params: params,
since: since,
before: before,
since_floor: since_floor,
ceiling: now,
time_expression: time_expression,
)
if node_ref
clause = node_lookup_clause(node_ref, string_columns: ["node_id", "neighbor_id"])
@@ -497,15 +785,22 @@ module PotatoMesh
params.concat(clause.last)
end
sql = <<~SQL
SELECT * FROM neighbors
SQL
append_rowid_time_cursor_filter!(
where_clauses: where_clauses,
params: params,
cursor: cursor,
time_key: "rx_time",
time_expression: "rx_time",
) if with_pagination
select_sql = with_pagination ? "SELECT *, rowid AS _cursor_rowid FROM neighbors" : "SELECT * FROM neighbors"
sql = "#{select_sql}\n"
sql += " WHERE #{where_clauses.join(" AND ")}\n" if where_clauses.any?
sql += <<~SQL
ORDER BY rx_time DESC
ORDER BY rx_time DESC, rowid DESC
LIMIT ?
SQL
params << limit
params << fetch_limit
rows = db.execute(sql, params)
rows.each do |r|
rx_time = coerce_integer(r["rx_time"])
@@ -514,7 +809,16 @@ module PotatoMesh
r["rx_iso"] = Time.at(rx_time).utc.iso8601 if rx_time
r["snr"] = coerce_float(r["snr"])
end
rows.map { |row| compact_api_row(row) }
items = rows.map { |row| compact_api_row(row) }
items.each { |item| item.delete("_cursor_rowid") } unless with_pagination
return items unless with_pagination
build_rowid_pagination_response(
items,
limit,
time_key: "rx_time",
marker_time: ->(marker) { marker["rx_time"] },
)
ensure
db&.close
end
@@ -525,18 +829,26 @@ module PotatoMesh
# @param node_ref [String, Integer, nil] optional node reference to scope results.
# @param since [Integer] unix timestamp threshold applied in addition to the rolling window for collections.
# @return [Array<Hash>] compacted telemetry rows suitable for API responses.
def query_telemetry(limit, node_ref: nil, since: 0)
def query_telemetry(limit, node_ref: nil, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
db = open_database(readonly: true)
db.results_as_hash = true
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.week_seconds
time_expression = "COALESCE(rx_time, telemetry_time, 0)"
since_floor = node_ref ? 0 : min_rx_time
since_threshold = normalize_since_threshold(since, floor: since_floor)
where_clauses << "COALESCE(rx_time, telemetry_time, 0) >= ?"
params << since_threshold
append_time_window_filters!(
where_clauses: where_clauses,
params: params,
since: since,
before: before,
since_floor: since_floor,
ceiling: now,
time_expression: time_expression,
)
if node_ref
clause = node_lookup_clause(node_ref, string_columns: ["node_id"], numeric_columns: ["node_num"])
@@ -545,15 +857,22 @@ module PotatoMesh
params.concat(clause.last)
end
sql = <<~SQL
SELECT * FROM telemetry
SQL
append_rowid_time_cursor_filter!(
where_clauses: where_clauses,
params: params,
cursor: cursor,
time_key: "cursor_time",
time_expression: time_expression,
) if with_pagination
select_sql = with_pagination ? "SELECT *, rowid AS _cursor_rowid, COALESCE(rx_time, telemetry_time, 0) AS _cursor_time FROM telemetry" : "SELECT * FROM telemetry"
sql = "#{select_sql}\n"
sql += " WHERE #{where_clauses.join(" AND ")}\n" if where_clauses.any?
sql += <<~SQL
ORDER BY rx_time DESC
ORDER BY COALESCE(rx_time, telemetry_time, 0) DESC, rowid DESC
LIMIT ?
SQL
params << limit
params << fetch_limit
rows = db.execute(sql, params)
rows.each do |r|
rx_time = coerce_integer(r["rx_time"])
@@ -601,7 +920,16 @@ module PotatoMesh
r["soil_moisture"] = coerce_integer(r["soil_moisture"])
r["soil_temperature"] = coerce_float(r["soil_temperature"])
end
rows.map { |row| compact_api_row(row) }
items = rows.map { |row| compact_api_row(row) }
items.each { |item| item.delete("_cursor_rowid") } unless with_pagination
return items unless with_pagination
build_rowid_pagination_response(
items,
limit,
time_key: "cursor_time",
marker_time: ->(marker) { marker["_cursor_time"] },
)
ensure
db&.close
end
@@ -734,8 +1062,9 @@ module PotatoMesh
# @param node_ref [String, Integer, nil] optional node reference to scope results.
# @param since [Integer] unix timestamp threshold applied in addition to the rolling window.
# @return [Array<Hash>] compacted trace rows suitable for API responses.
def query_traces(limit, node_ref: nil, since: 0)
def query_traces(limit, node_ref: nil, since: 0, before: nil, cursor: nil, with_pagination: false)
limit = coerce_query_limit(limit)
fetch_limit = with_pagination ? limit + 1 : limit
db = open_database(readonly: true)
db.results_as_hash = true
params = []
@@ -743,14 +1072,19 @@ module PotatoMesh
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.trace_neighbor_window_seconds
since_threshold = normalize_since_threshold(since, floor: min_rx_time)
before_threshold = normalize_before_threshold(before, ceiling: now)
where_clauses << "COALESCE(rx_time, 0) >= ?"
params << since_threshold
if before_threshold
where_clauses << "COALESCE(rx_time, 0) <= ?"
params << before_threshold
end
if node_ref
tokens = node_reference_tokens(node_ref)
numeric_values = tokens[:numeric_values]
if numeric_values.empty?
return []
return with_pagination ? { items: [], next_cursor: nil } : []
end
placeholders = Array.new(numeric_values.length, "?").join(", ")
candidate_clauses = []
@@ -761,16 +1095,28 @@ module PotatoMesh
3.times { params.concat(numeric_values) }
end
if with_pagination
cursor_payload = decode_query_cursor(cursor)
if cursor_payload
cursor_rx_time = coerce_integer(cursor_payload["rx_time"])
cursor_id = coerce_integer(cursor_payload["id"])
if cursor_rx_time && cursor_id
where_clauses << "(rx_time < ? OR (rx_time = ? AND id < ?))"
params.concat([cursor_rx_time, cursor_rx_time, cursor_id])
end
end
end
sql = <<~SQL
SELECT id, request_id, src, dest, rx_time, rx_iso, rssi, snr, elapsed_ms
FROM traces
SQL
sql += " WHERE #{where_clauses.join(" AND ")}\n" if where_clauses.any?
sql += <<~SQL
ORDER BY rx_time DESC
ORDER BY rx_time DESC, id DESC
LIMIT ?
SQL
params << limit
params << fetch_limit
rows = db.execute(sql, params)
trace_ids = rows.map { |row| coerce_integer(row["id"]) }.compact
@@ -807,7 +1153,20 @@ module PotatoMesh
r["hops"] = hops_by_trace[trace_id]
end
end
rows.map { |row| compact_api_row(row) }
items = rows.map { |row| compact_api_row(row) }
return items unless with_pagination
has_more = items.length > limit
paged_items = has_more ? items.first(limit) : items
next_cursor = nil
if has_more && !paged_items.empty?
marker = paged_items.last
next_cursor = encode_query_cursor({
"rx_time" => coerce_integer(marker["rx_time"]),
"id" => coerce_integer(marker["id"]),
})
end
{ items: paged_items, next_cursor: next_cursor }
ensure
db&.close
end
+126 -15
View File
@@ -64,7 +64,23 @@ module PotatoMesh
app.get "/api/nodes" do
content_type :json
limit = [params["limit"]&.to_i || 200, 1000].min
query_nodes(limit, since: params["since"]).to_json
result = query_nodes(
limit,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/stats" do
content_type :json
{
active_nodes: query_active_node_stats,
sampled: false,
}.to_json
end
app.get "/api/nodes/:id" do
@@ -80,7 +96,15 @@ module PotatoMesh
app.get "/api/ingestors" do
content_type :json
limit = coerce_query_limit(params["limit"])
query_ingestors(limit, since: params["since"]).to_json
result = query_ingestors(
limit,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/messages" do
@@ -89,7 +113,16 @@ module PotatoMesh
include_encrypted = coerce_boolean(params["encrypted"]) || false
since = coerce_integer(params["since"])
since = 0 if since.nil? || since.negative?
query_messages(limit, include_encrypted: include_encrypted, since: since).to_json
result = query_messages(
limit,
include_encrypted: include_encrypted,
since: since,
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/messages/:id" do
@@ -100,18 +133,31 @@ module PotatoMesh
include_encrypted = coerce_boolean(params["encrypted"]) || false
since = coerce_integer(params["since"])
since = 0 if since.nil? || since.negative?
query_messages(
result = query_messages(
limit,
node_ref: node_ref,
include_encrypted: include_encrypted,
since: since,
).to_json
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/positions" do
content_type :json
limit = [params["limit"]&.to_i || 200, 1000].min
query_positions(limit, since: params["since"]).to_json
result = query_positions(
limit,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/positions/:id" do
@@ -119,13 +165,30 @@ module PotatoMesh
node_ref = string_or_nil(params["id"])
halt 400, { error: "missing node id" }.to_json unless node_ref
limit = [params["limit"]&.to_i || 200, 1000].min
query_positions(limit, node_ref: node_ref, since: params["since"]).to_json
result = query_positions(
limit,
node_ref: node_ref,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/neighbors" do
content_type :json
limit = [params["limit"]&.to_i || 200, 1000].min
query_neighbors(limit, since: params["since"]).to_json
result = query_neighbors(
limit,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/neighbors/:id" do
@@ -133,13 +196,30 @@ module PotatoMesh
node_ref = string_or_nil(params["id"])
halt 400, { error: "missing node id" }.to_json unless node_ref
limit = [params["limit"]&.to_i || 200, 1000].min
query_neighbors(limit, node_ref: node_ref, since: params["since"]).to_json
result = query_neighbors(
limit,
node_ref: node_ref,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/telemetry" do
content_type :json
limit = [params["limit"]&.to_i || 200, 1000].min
query_telemetry(limit, since: params["since"]).to_json
result = query_telemetry(
limit,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/telemetry/aggregated" do
@@ -182,13 +262,30 @@ module PotatoMesh
node_ref = string_or_nil(params["id"])
halt 400, { error: "missing node id" }.to_json unless node_ref
limit = [params["limit"]&.to_i || 200, 1000].min
query_telemetry(limit, node_ref: node_ref, since: params["since"]).to_json
result = query_telemetry(
limit,
node_ref: node_ref,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/traces" do
content_type :json
limit = [params["limit"]&.to_i || 200, 1000].min
query_traces(limit, since: params["since"]).to_json
result = query_traces(
limit,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/traces/:id" do
@@ -196,7 +293,16 @@ module PotatoMesh
node_ref = string_or_nil(params["id"])
halt 400, { error: "missing node id" }.to_json unless node_ref
limit = [params["limit"]&.to_i || 200, 1000].min
query_traces(limit, node_ref: node_ref, since: params["since"]).to_json
result = query_traces(
limit,
node_ref: node_ref,
since: params["since"],
before: params["before"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
result[:items].to_json
end
app.get "/api/instances" do
@@ -205,8 +311,13 @@ module PotatoMesh
content_type :json
ensure_self_instance_record!
payload = load_instances_for_api
JSON.generate(payload)
result = load_instances_for_api(
limit: params["limit"],
cursor: params["cursor"],
with_pagination: true,
)
response["X-Next-Cursor"] = result[:next_cursor] if result[:next_cursor]
JSON.generate(result[:items])
end
end
end
@@ -130,7 +130,7 @@ test('buildChatTabModel returns sorted nodes and channel buckets', () => {
const secondaryChannel = channelByLabel.BerlinMesh;
assert.equal(secondaryChannel.index, 1);
assert.match(secondaryChannel.id, /^channel-secondary-1-berlinmesh-[a-z0-9]+$/);
assert.match(secondaryChannel.id, /^channel-secondary-name-berlinmesh-[a-z0-9]+$/);
assert.equal(secondaryChannel.entries.length, 1);
assert.deepEqual(secondaryChannel.entries.map(entry => entry.message.id), ['recent-alt']);
});
@@ -294,7 +294,7 @@ test('buildChatTabModel ignores plaintext log-only entries', () => {
assert.equal(encryptedEntries[0]?.message?.id, 'enc');
});
test('buildChatTabModel keeps secondary channels distinct by index even with matching labels', () => {
test('buildChatTabModel merges secondary channels with matching labels across indexes', () => {
const primaryId = 'primary';
const secondaryFirstId = 'secondary-one';
const secondarySecondId = 'secondary-two';
@@ -311,22 +311,20 @@ test('buildChatTabModel keeps secondary channels distinct by index even with mat
});
const meshChannels = model.channels.filter(channel => channel.label === label);
assert.equal(meshChannels.length, 3);
assert.equal(meshChannels.length, 2);
const primaryChannel = meshChannels.find(channel => channel.index === 0);
assert.ok(primaryChannel);
assert.equal(primaryChannel.entries.length, 1);
assert.equal(primaryChannel.entries[0]?.message?.id, primaryId);
const secondaryFirstChannel = meshChannels.find(channel => channel.index === 7);
assert.ok(secondaryFirstChannel);
assert.match(secondaryFirstChannel.id, /^channel-secondary-7-meshtown-[a-z0-9]+$/);
assert.deepEqual(secondaryFirstChannel.entries.map(entry => entry.message.id), [secondaryFirstId]);
const secondarySecondChannel = meshChannels.find(channel => channel.index === 3);
assert.ok(secondarySecondChannel);
assert.match(secondarySecondChannel.id, /^channel-secondary-3-meshtown-[a-z0-9]+$/);
assert.deepEqual(secondarySecondChannel.entries.map(entry => entry.message.id), [secondarySecondId]);
const mergedSecondaryChannel = meshChannels.find(channel => channel.index === 3);
assert.ok(mergedSecondaryChannel);
assert.match(mergedSecondaryChannel.id, /^channel-secondary-name-meshtown-[a-z0-9]+$/);
assert.deepEqual(
mergedSecondaryChannel.entries.map(entry => entry.message.id),
[secondaryFirstId, secondarySecondId]
);
});
test('buildChatTabModel keeps unnamed secondary buckets separate when a label later arrives', () => {
@@ -338,7 +336,7 @@ test('buildChatTabModel keeps unnamed secondary buckets separate when a label la
{ id: 'unnamed', rx_time: NOW - 15, channel: 4 },
{ id: 'named', rx_time: NOW - 10, channel: 4, channel_name: 'SideMesh' }
],
namedId: /^channel-secondary-4-sidemesh-[a-z0-9]+$/,
namedId: /^channel-secondary-name-sidemesh-[a-z0-9]+$/,
namedMessages: ['named'],
unnamedMessages: ['unnamed']
},
@@ -349,7 +347,7 @@ test('buildChatTabModel keeps unnamed secondary buckets separate when a label la
{ id: 'named', rx_time: NOW - 12, channel: 5, channel_name: 'MeshNorth' },
{ id: 'unlabeled', rx_time: NOW - 8, channel: 5 }
],
namedId: /^channel-secondary-5-meshnorth-[a-z0-9]+$/,
namedId: /^channel-secondary-name-meshnorth-[a-z0-9]+$/,
namedMessages: ['named'],
unnamedMessages: ['unlabeled']
}
@@ -392,18 +390,37 @@ test('buildChatTabModel keeps same-index channels with different names in separa
assertChannelMessages(model, {
label: 'PUBLIC',
id: /^channel-secondary-1-public-[a-z0-9]+$/,
id: /^channel-secondary-name-public-[a-z0-9]+$/,
index: 1,
messageIds: ['public-msg']
});
assertChannelMessages(model, {
label: 'BerlinMesh',
id: /^channel-secondary-1-berlinmesh-[a-z0-9]+$/,
id: /^channel-secondary-name-berlinmesh-[a-z0-9]+$/,
index: 1,
messageIds: ['berlin-msg']
});
});
test('buildChatTabModel merges same-name channels even when indexes differ', () => {
const model = buildChatTabModel({
nodes: [],
messages: [
{ id: 'test-1', rx_time: NOW - 12, channel: 1, channel_name: 'TEST' },
{ id: 'test-2', rx_time: NOW - 8, channel: 2, channel_name: 'TEST' }
],
nowSeconds: NOW,
windowSeconds: WINDOW
});
assertChannelMessages(model, {
label: 'TEST',
id: /^channel-secondary-name-test-[a-z0-9]+$/,
index: 1,
messageIds: ['test-1', 'test-2']
});
});
test('buildChatTabModel keeps same-index slug-colliding labels on distinct tab ids', () => {
const model = buildChatTabModel({
nodes: [],
@@ -419,8 +436,8 @@ test('buildChatTabModel keeps same-index slug-colliding labels on distinct tab i
const fooDashChannel = findChannelByLabel(model, 'Foo-Bar');
assert.ok(fooSpaceChannel);
assert.ok(fooDashChannel);
assert.match(fooSpaceChannel.id, /^channel-secondary-1-foo-bar-[a-z0-9]+$/);
assert.match(fooDashChannel.id, /^channel-secondary-1-foo-bar-[a-z0-9]+$/);
assert.match(fooSpaceChannel.id, /^channel-secondary-name-foo-bar-[a-z0-9]+$/);
assert.match(fooDashChannel.id, /^channel-secondary-name-foo-bar-[a-z0-9]+$/);
assert.notEqual(fooSpaceChannel.id, fooDashChannel.id);
});
@@ -434,6 +451,6 @@ test('buildChatTabModel falls back to hashed id for unsluggable secondary labels
const channel = findChannelByLabel(model, '###');
assert.ok(channel);
assert.equal(channel.index, 2);
assert.ok(channel.id.startsWith('channel-secondary-2-'));
assert.ok(channel.id.length > 'channel-secondary-2-'.length);
assert.ok(channel.id.startsWith('channel-secondary-name-'));
assert.ok(channel.id.length > 'channel-secondary-name-'.length);
});
@@ -21,6 +21,55 @@ import { createDomEnvironment } from './dom-environment.js';
import { initializeFederationPage } from '../federation-page.js';
import { roleColors } from '../role-helpers.js';
function createFailureScenarioPage(env) {
const { document, createElement, registerElement } = env;
registerElement('map', createElement('div', 'map'));
const statusEl = createElement('div', 'status');
registerElement('status', statusEl);
const tableEl = createElement('table', 'instances');
const tbodyEl = createElement('tbody');
registerElement('instances', tableEl);
const configEl = createElement('div');
configEl.setAttribute('data-app-config', JSON.stringify({}));
document.querySelector = selector => {
if (selector === '[data-app-config]') return configEl;
if (selector === '#instances tbody') return tbodyEl;
return null;
};
return { statusEl };
}
function createMinimalLeafletStub() {
return {
map() {
return {
setView() {},
on() {},
getPane() {
return null;
}
};
},
tileLayer() {
return {
addTo() {
return this;
},
getContainer() {
return null;
},
on() {}
};
},
layerGroup() {
return { addLayer() {}, addTo() { return this; } };
},
circleMarker() {
return { bindPopup() { return this; } };
}
};
}
test('federation map centers on configured coordinates and follows theme filters', async () => {
const env = createDomEnvironment({ includeBody: true, bodyHasDarkClass: true });
const { document, window, createElement, registerElement, cleanup } = env;
@@ -604,51 +653,9 @@ test('federation legend toggle respects media query changes', async () => {
test('federation page tolerates fetch failures', async () => {
const env = createDomEnvironment({ includeBody: true, bodyHasDarkClass: false });
const { document, createElement, registerElement, cleanup } = env;
const mapEl = createElement('div', 'map');
registerElement('map', mapEl);
const statusEl = createElement('div', 'status');
registerElement('status', statusEl);
const tableEl = createElement('table', 'instances');
const tbodyEl = createElement('tbody');
registerElement('instances', tableEl);
const configEl = createElement('div');
configEl.setAttribute('data-app-config', JSON.stringify({}));
document.querySelector = selector => {
if (selector === '[data-app-config]') return configEl;
if (selector === '#instances tbody') return tbodyEl;
return null;
};
const leafletStub = {
map() {
return {
setView() {},
on() {},
getPane() {
return null;
}
};
},
tileLayer() {
return {
addTo() {
return this;
},
getContainer() {
return null;
},
on() {}
};
},
layerGroup() {
return { addLayer() {}, addTo() { return this; } };
},
circleMarker() {
return { bindPopup() { return this; } };
}
};
const { cleanup } = env;
createFailureScenarioPage(env);
const leafletStub = createMinimalLeafletStub();
const fetchImpl = async () => {
throw new Error('boom');
@@ -657,3 +664,16 @@ test('federation page tolerates fetch failures', async () => {
await initializeFederationPage({ config: {}, fetchImpl, leaflet: leafletStub });
cleanup();
});
test('federation page tolerates non-ok paginated instance responses', async () => {
const env = createDomEnvironment({ includeBody: true, bodyHasDarkClass: false });
const { statusEl } = createFailureScenarioPage(env);
const { cleanup } = env;
const leafletStub = createMinimalLeafletStub();
const fetchImpl = async () => ({ ok: false, json: async () => [] });
await initializeFederationPage({ config: {}, fetchImpl, leaflet: leafletStub });
assert.match(statusEl.textContent, /0 instances/);
cleanup();
});
@@ -221,6 +221,74 @@ test('initializeInstanceSelector updates federation navigation labels with insta
}
});
test('initializeInstanceSelector follows paginated instance responses', async () => {
const env = createDomEnvironment();
const select = setupSelectElement(env.document);
const calls = [];
const fetchImpl = async url => {
calls.push(url);
if (url === '/api/instances?limit=500') {
return {
ok: true,
headers: { get: name => (name === 'X-Next-Cursor' ? 'cursor-1' : null) },
async json() {
return [{ domain: 'alpha.mesh' }];
}
};
}
if (url === '/api/instances?limit=500&cursor=cursor-1') {
return {
ok: true,
headers: { get: () => null },
async json() {
return [{ domain: 'bravo.mesh' }];
}
};
}
throw new Error(`unexpected url ${url}`);
};
try {
await initializeInstanceSelector({
selectElement: select,
fetchImpl,
windowObject: env.window,
documentObject: env.document
});
assert.deepEqual(calls, ['/api/instances?limit=500', '/api/instances?limit=500&cursor=cursor-1']);
assert.equal(select.options.length, 3);
} finally {
env.cleanup();
}
});
test('initializeInstanceSelector handles non-ok instance responses without adding options', async () => {
const env = createDomEnvironment();
const select = setupSelectElement(env.document);
const fetchImpl = async () => ({
ok: false,
async json() {
return [{ domain: 'ignored.mesh' }];
}
});
try {
await initializeInstanceSelector({
selectElement: select,
fetchImpl,
windowObject: env.window,
documentObject: env.document
});
assert.equal(select.options.length, 1);
} finally {
env.cleanup();
}
});
test('updateFederationNavCount prefers stored labels and normalizes counts', () => {
const env = createDomEnvironment();
const navLink = env.document.createElement('a');
@@ -0,0 +1,399 @@
/*
* Copyright © 2025-26 l5yth & contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import test from 'node:test';
import assert from 'node:assert/strict';
import {
computeLocalActiveNodeStats,
fetchPaginatedCollection,
fetchActiveNodeStats,
formatActiveNodeStatsText,
normaliseActiveNodeStatsPayload,
readNextCursorHeader,
} from '../main.js';
const NOW = 1_700_000_000;
test('computeLocalActiveNodeStats calculates local hour/day/week/month counts', () => {
const nodes = [
{ last_heard: NOW - 60 },
{ last_heard: NOW - 4_000 },
{ last_heard: NOW - 90_000 },
{ last_heard: NOW - (8 * 86_400) },
{ last_heard: NOW - (20 * 86_400) },
];
const stats = computeLocalActiveNodeStats(nodes, NOW);
assert.deepEqual(stats, {
hour: 1,
day: 2,
week: 3,
month: 5,
sampled: true,
});
});
test('normaliseActiveNodeStatsPayload validates and normalizes API payload', () => {
const payload = {
active_nodes: {
hour: '11',
day: 22,
week: 33,
month: 44,
},
sampled: false,
};
assert.deepEqual(normaliseActiveNodeStatsPayload(payload), {
hour: 11,
day: 22,
week: 33,
month: 44,
sampled: false,
});
assert.equal(normaliseActiveNodeStatsPayload({}), null);
});
test('normaliseActiveNodeStatsPayload rejects malformed stat values', () => {
assert.equal(
normaliseActiveNodeStatsPayload({ active_nodes: { hour: 'x', day: 1, week: 1, month: 1 } }),
null
);
assert.equal(
normaliseActiveNodeStatsPayload({ active_nodes: null }),
null
);
});
test('normaliseActiveNodeStatsPayload clamps negatives and truncates floats', () => {
assert.deepEqual(
normaliseActiveNodeStatsPayload({
active_nodes: { hour: -1.9, day: 2.8, week: 3.1, month: 4.9 },
sampled: 1
}),
{ hour: 0, day: 2, week: 3, month: 4, sampled: true }
);
});
test('fetchActiveNodeStats uses /api/stats when available', async () => {
const calls = [];
const fetchImpl = async (url) => {
calls.push(url);
return {
ok: true,
async json() {
return {
active_nodes: { hour: 5, day: 15, week: 25, month: 35 },
sampled: false,
};
},
};
};
const stats = await fetchActiveNodeStats({ nodes: [], nowSeconds: NOW, fetchImpl });
assert.equal(calls[0], '/api/stats');
assert.deepEqual(stats, {
hour: 5,
day: 15,
week: 25,
month: 35,
sampled: false,
});
});
test('fetchActiveNodeStats reuses cached /api/stats response for repeated calls', async () => {
const calls = [];
const fetchImpl = async (url) => {
calls.push(url);
return {
ok: true,
async json() {
return {
active_nodes: { hour: 2, day: 4, week: 6, month: 8 },
sampled: false,
};
},
};
};
const first = await fetchActiveNodeStats({ nodes: [], nowSeconds: NOW, fetchImpl });
const second = await fetchActiveNodeStats({ nodes: [], nowSeconds: NOW, fetchImpl });
assert.equal(calls.length, 1);
assert.deepEqual(first, second);
});
test('fetchActiveNodeStats does not reuse cache across different fetch implementations', async () => {
const callsA = [];
const callsB = [];
const fetchImplA = async (url) => {
callsA.push(url);
return {
ok: true,
async json() {
return { active_nodes: { hour: 1, day: 1, week: 1, month: 1 }, sampled: false };
},
};
};
const fetchImplB = async (url) => {
callsB.push(url);
return {
ok: true,
async json() {
return { active_nodes: { hour: 2, day: 2, week: 2, month: 2 }, sampled: false };
},
};
};
await fetchActiveNodeStats({ nodes: [], nowSeconds: NOW, fetchImpl: fetchImplA });
await fetchActiveNodeStats({ nodes: [], nowSeconds: NOW, fetchImpl: fetchImplB });
assert.equal(callsA.length, 1);
assert.equal(callsB.length, 1);
});
test('fetchActiveNodeStats falls back to local counts when stats fetch fails', async () => {
const nodes = [
{ last_heard: NOW - 120 },
{ last_heard: NOW - (10 * 86_400) },
];
const fetchImpl = async () => {
throw new Error('network down');
};
const stats = await fetchActiveNodeStats({ nodes, nowSeconds: NOW, fetchImpl });
assert.deepEqual(stats, {
hour: 1,
day: 1,
week: 1,
month: 2,
sampled: true,
});
});
test('fetchActiveNodeStats falls back to local counts on non-OK HTTP responses', async () => {
const stats = await fetchActiveNodeStats({
nodes: [{ last_heard: NOW - 10 }],
nowSeconds: NOW,
fetchImpl: async () => ({ ok: false, status: 503 })
});
assert.equal(stats.sampled, true);
assert.equal(stats.hour, 1);
});
test('fetchActiveNodeStats falls back to local counts on invalid payloads', async () => {
const stats = await fetchActiveNodeStats({
nodes: [{ last_heard: NOW - (31 * 86_400) }],
nowSeconds: NOW,
fetchImpl: async () => ({
ok: true,
async json() {
return { active_nodes: { hour: 'bad' } };
}
})
});
assert.equal(stats.sampled, true);
assert.equal(stats.month, 0);
});
test('formatActiveNodeStatsText emits expected dashboard string', () => {
const text = formatActiveNodeStatsText({
channel: 'LongFast',
frequency: '868MHz',
stats: { hour: 1, day: 2, week: 3, month: 4, sampled: false },
});
assert.equal(
text,
'LongFast (868MHz) — active nodes: 1/hour, 2/day, 3/week, 4/month.'
);
});
test('formatActiveNodeStatsText appends sampled marker when local fallback is used', () => {
const text = formatActiveNodeStatsText({
channel: 'LongFast',
frequency: '868MHz',
stats: { hour: 9, day: 8, week: 7, month: 6, sampled: true },
});
assert.equal(
text,
'LongFast (868MHz) — active nodes: 9/hour, 8/day, 7/week, 6/month (sampled).'
);
});
test('readNextCursorHeader reads cursor token from response headers', () => {
const response = {
headers: {
get(name) {
return name === 'X-Next-Cursor' ? 'cursor-token' : null;
},
},
};
assert.equal(readNextCursorHeader(response), 'cursor-token');
});
test('readNextCursorHeader returns null when response headers are missing', () => {
assert.equal(readNextCursorHeader(null), null);
assert.equal(readNextCursorHeader({ headers: {} }), null);
});
test('fetchPaginatedCollection follows cursor headers and merges pages', async () => {
const calls = [];
const pages = new Map([
['/api/nodes?limit=2', { items: [{ id: 'a' }, { id: 'b' }], next: 'cursor-1' }],
['/api/nodes?limit=2&cursor=cursor-1', { items: [{ id: 'c' }], next: null }],
]);
const fetchImpl = async (url) => {
calls.push(url);
const page = pages.get(url);
if (!page) {
throw new Error(`unexpected url ${url}`);
}
return {
ok: true,
headers: { get: (name) => (name === 'X-Next-Cursor' ? page.next : null) },
async json() {
return page.items;
},
};
};
const items = await fetchPaginatedCollection({
path: '/api/nodes',
limit: 2,
maxRows: 10,
fetchImpl,
});
assert.deepEqual(calls, ['/api/nodes?limit=2', '/api/nodes?limit=2&cursor=cursor-1']);
assert.deepEqual(items.map((item) => item.id), ['a', 'b', 'c']);
});
test('fetchPaginatedCollection returns empty list when path is missing', async () => {
const items = await fetchPaginatedCollection({
path: '',
limit: 10,
fetchImpl: async () => ({ ok: true, headers: { get: () => null }, json: async () => [] }),
});
assert.deepEqual(items, []);
});
test('fetchPaginatedCollection enforces maxRows and propagates params', async () => {
const calls = [];
const fetchImpl = async (url) => {
calls.push(url);
return {
ok: true,
headers: { get: () => (url.includes('cursor=') ? null : 'next-1') },
async json() {
return [{ id: 1 }, { id: 2 }, { id: 3 }];
},
};
};
const items = await fetchPaginatedCollection({
path: '/api/messages',
limit: 3,
maxRows: 4,
params: { since: '123', encrypted: 'true' },
fetchImpl,
});
assert.equal(calls[0], '/api/messages?limit=3&since=123&encrypted=true');
assert.equal(items.length, 4);
});
test('fetchPaginatedCollection throws on non-ok responses', async () => {
await assert.rejects(
fetchPaginatedCollection({
path: '/api/messages',
limit: 2,
fetchImpl: async () => ({ ok: false, status: 503, json: async () => [] }),
}),
/HTTP 503/
);
});
test('fetchPaginatedCollection throws on invalid payload shapes', async () => {
await assert.rejects(
fetchPaginatedCollection({
path: '/api/messages',
limit: 2,
fetchImpl: async () => ({ ok: true, headers: { get: () => null }, json: async () => ({}) }),
}),
/invalid paginated payload/
);
});
test('fetchPaginatedCollection ignores blank params and defaults invalid limits', async () => {
const calls = [];
const items = await fetchPaginatedCollection({
path: '/api/messages',
limit: 0,
maxRows: 0,
params: { since: ' ', encrypted: null, scope: 'recent' },
fetchImpl: async (url) => {
calls.push(url);
return {
ok: true,
headers: { get: () => null },
async json() {
return [{ id: 1 }];
},
};
},
});
assert.deepEqual(items, [{ id: 1 }]);
assert.equal(calls[0], '/api/messages?limit=200&scope=recent');
});
test('fetchPaginatedCollection stops when a page is empty even if cursor was present', async () => {
const calls = [];
const fetchImpl = async (url) => {
calls.push(url);
if (url === '/api/nodes?limit=2') {
return {
ok: true,
headers: { get: (name) => (name === 'X-Next-Cursor' ? 'cursor-1' : null) },
async json() {
return [{ id: 'a' }];
},
};
}
return {
ok: true,
headers: { get: () => null },
async json() {
return [];
},
};
};
const items = await fetchPaginatedCollection({
path: '/api/nodes',
limit: 2,
fetchImpl,
});
assert.deepEqual(items, [{ id: 'a' }]);
assert.deepEqual(calls, ['/api/nodes?limit=2', '/api/nodes?limit=2&cursor=cursor-1']);
});
+11 -3
View File
@@ -556,21 +556,29 @@ function buildPrimaryBucketKey(primaryChannelLabel) {
function buildSecondaryNameBucketKey(index, labelInfo) {
const label = labelInfo?.label ?? null;
const priority = labelInfo?.priority ?? CHANNEL_LABEL_PRIORITY.INDEX;
const safeIndex = Number.isFinite(index) ? Math.max(0, Math.trunc(index)) : 0;
if (safeIndex <= 0 || priority !== CHANNEL_LABEL_PRIORITY.NAME || !label) {
if (!Number.isFinite(index) || index <= 0 || priority !== CHANNEL_LABEL_PRIORITY.NAME || !label) {
return null;
}
const trimmedLabel = label.trim().toLowerCase();
if (!trimmedLabel.length) {
return null;
}
return `secondary::${safeIndex}::${trimmedLabel}`;
return `secondary-name::${trimmedLabel}`;
}
function buildChannelTabId(bucketKey) {
if (bucketKey === '0') {
return 'channel-0';
}
const secondaryNameParts = /^secondary-name::(.+)$/.exec(String(bucketKey));
if (secondaryNameParts) {
const secondaryLabelSlug = slugify(secondaryNameParts[1]);
const secondaryHash = hashChannelKey(bucketKey);
if (secondaryLabelSlug) {
return `channel-secondary-name-${secondaryLabelSlug}-${secondaryHash}`;
}
return `channel-secondary-name-${secondaryHash}`;
}
const secondaryParts = /^secondary::(\d+)::(.+)$/.exec(String(bucketKey));
if (secondaryParts) {
const secondaryIndex = secondaryParts[1];
+54 -7
View File
@@ -80,6 +80,59 @@ function buildInstanceUrl(domain) {
return `https://${trimmed}`;
}
/**
* Read the next-page cursor token from response headers.
*
* @param {*} response Fetch response candidate.
* @returns {string|null} Cursor token when present.
*/
function readNextCursorHeader(response) {
const headers = response && response.headers;
if (!headers || typeof headers.get !== 'function') return null;
const cursor = headers.get('X-Next-Cursor');
return cursor && String(cursor).trim() ? String(cursor).trim() : null;
}
/**
* Fetch all federation instances using keyset cursor pagination.
*
* @param {Function} fetchImpl Fetch-compatible function.
* @returns {Promise<Array<Object>>} Combined instance rows.
*/
async function fetchAllInstances(fetchImpl) {
const results = [];
let cursor = null;
let pageCount = 0;
const limit = 500;
while (pageCount < 100) {
const query = new URLSearchParams({ limit: String(limit) });
if (cursor) {
query.set('cursor', cursor);
}
const response = await fetchImpl(`/api/instances?${query.toString()}`, {
headers: { Accept: 'application/json' },
credentials: 'omit'
});
if (!response || !response.ok || typeof response.json !== 'function') {
return results;
}
const payload = await response.json();
if (!Array.isArray(payload) || payload.length === 0) {
return results;
}
results.push(...payload);
cursor = readNextCursorHeader(response);
pageCount += 1;
if (!cursor) {
return results;
}
}
return results;
}
const NODE_COUNT_COLOR_STOPS = [
{ limit: 100, color: roleColors.CLIENT_HIDDEN },
{ limit: 200, color: roleColors.SENSOR },
@@ -524,13 +577,7 @@ export async function initializeFederationPage(options = {}) {
// Fetch instances data
let instances = [];
try {
const response = await fetchImpl('/api/instances', {
headers: { Accept: 'application/json' },
credentials: 'omit'
});
if (response.ok) {
instances = await response.json();
}
instances = await fetchAllInstances(fetchImpl);
} catch (err) {
console.warn('Failed to fetch federation instances', err);
}
+60 -21
View File
@@ -78,6 +78,64 @@ function updateFederationNavCount(options) {
});
}
/**
* Read the next-page cursor header from an HTTP response.
*
* @param {*} response Fetch response candidate.
* @returns {string|null} Cursor token when available.
*/
function readNextCursorHeader(response) {
const headers = response && response.headers;
if (!headers || typeof headers.get !== 'function') {
return null;
}
const cursor = headers.get('X-Next-Cursor');
return cursor && String(cursor).trim().length > 0 ? String(cursor).trim() : null;
}
/**
* Load federation instances across paginated API responses.
*
* @param {Function} fetchImpl Fetch-compatible function.
* @returns {Promise<Array<Object>>} Combined instance payload rows.
*/
async function fetchAllInstances(fetchImpl) {
const results = [];
let cursor = null;
let pageCount = 0;
const limit = 500;
while (pageCount < 100) {
const query = new URLSearchParams({ limit: String(limit) });
if (cursor) {
query.set('cursor', cursor);
}
const response = await fetchImpl(`/api/instances?${query.toString()}`, {
headers: { Accept: 'application/json' },
credentials: 'omit',
});
if (!response || typeof response.json !== 'function' || !response.ok) {
return results;
}
const payload = await response.json();
if (!Array.isArray(payload) || payload.length === 0) {
return results;
}
results.push(...payload);
cursor = readNextCursorHeader(response);
pageCount += 1;
if (!cursor) {
return results;
}
}
return results;
}
/**
* Construct a navigable URL for the provided instance domain.
*
@@ -179,30 +237,11 @@ export async function initializeInstanceSelector(options) {
return;
}
let response;
try {
response = await fetchImpl('/api/instances', {
headers: { Accept: 'application/json' },
credentials: 'omit',
});
} catch (error) {
console.warn('Failed to load federation instances', error);
return;
}
if (!response || typeof response.json !== 'function') {
return;
}
if (!response.ok) {
return;
}
let payload;
try {
payload = await response.json();
payload = await fetchAllInstances(fetchImpl);
} catch (error) {
console.warn('Invalid federation instances payload', error);
console.warn('Failed to load federation instances', error);
return;
}
+279 -31
View File
@@ -69,6 +69,221 @@ import {
roleRenderOrder,
} from './role-helpers.js';
/**
* Compute active-node counts from a local node array.
*
* @param {Array<Object>} nodes Node payloads.
* @param {number} nowSeconds Reference timestamp.
* @returns {{hour: number, day: number, week: number, month: number, sampled: boolean}} Local count snapshot.
*/
export function computeLocalActiveNodeStats(nodes, nowSeconds) {
const safeNodes = Array.isArray(nodes) ? nodes : [];
const referenceNow = Number.isFinite(nowSeconds) ? nowSeconds : Date.now() / 1000;
const windows = [
{ key: 'hour', secs: 3600 },
{ key: 'day', secs: 86_400 },
{ key: 'week', secs: 7 * 86_400 },
{ key: 'month', secs: 30 * 86_400 }
];
const counts = { sampled: true };
for (const window of windows) {
counts[window.key] = safeNodes.filter(node => {
const lastHeard = Number(node?.last_heard);
return Number.isFinite(lastHeard) && referenceNow - lastHeard <= window.secs;
}).length;
}
return counts;
}
/**
* Parse and validate the `/api/stats` payload.
*
* @param {*} payload Candidate JSON object from the stats endpoint.
* @returns {{hour: number, day: number, week: number, month: number, sampled: boolean}|null} Normalized stats or null.
*/
export function normaliseActiveNodeStatsPayload(payload) {
const activeNodes = payload && typeof payload === 'object' ? payload.active_nodes : null;
if (!activeNodes || typeof activeNodes !== 'object') {
return null;
}
const hour = Number(activeNodes.hour);
const day = Number(activeNodes.day);
const week = Number(activeNodes.week);
const month = Number(activeNodes.month);
if (![hour, day, week, month].every(Number.isFinite)) {
return null;
}
return {
hour: Math.max(0, Math.trunc(hour)),
day: Math.max(0, Math.trunc(day)),
week: Math.max(0, Math.trunc(week)),
month: Math.max(0, Math.trunc(month)),
sampled: Boolean(payload.sampled)
};
}
const ACTIVE_NODE_STATS_CACHE_TTL_MS = 30_000;
let activeNodeStatsCache = null;
let activeNodeStatsFetchPromise = null;
let activeNodeStatsFetchImpl = null;
/**
* Fetch active-node stats from the dedicated API endpoint with short-lived caching.
*
* @param {Function} fetchImpl Fetch implementation.
* @returns {Promise<{hour: number, day: number, week: number, month: number, sampled: boolean} | null>} Normalized stats or null.
*/
async function fetchRemoteActiveNodeStats(fetchImpl) {
const nowMs = Date.now();
if (activeNodeStatsCache?.fetchImpl === fetchImpl && activeNodeStatsCache.expiresAt > nowMs) {
return activeNodeStatsCache.stats;
}
if (activeNodeStatsFetchPromise && activeNodeStatsFetchImpl === fetchImpl) {
return activeNodeStatsFetchPromise;
}
activeNodeStatsFetchImpl = fetchImpl;
activeNodeStatsFetchPromise = (async () => {
const response = await fetchImpl('/api/stats', { cache: 'no-store' });
if (!response?.ok) {
throw new Error(`stats HTTP ${response?.status ?? 'unknown'}`);
}
const payload = await response.json();
const normalized = normaliseActiveNodeStatsPayload(payload);
if (!normalized) {
throw new Error('invalid stats payload');
}
activeNodeStatsCache = {
fetchImpl,
expiresAt: Date.now() + ACTIVE_NODE_STATS_CACHE_TTL_MS,
stats: normalized
};
return normalized;
})();
try {
return await activeNodeStatsFetchPromise;
} finally {
activeNodeStatsFetchPromise = null;
activeNodeStatsFetchImpl = null;
}
}
/**
* Fetch active-node stats from the dedicated API endpoint with local fallback.
*
* @param {{
* nodes: Array<Object>,
* nowSeconds: number,
* fetchImpl?: Function
* }} params Fetch parameters.
* @returns {Promise<{hour: number, day: number, week: number, month: number, sampled: boolean}>} Stats snapshot.
*/
export async function fetchActiveNodeStats({ nodes, nowSeconds, fetchImpl = fetch }) {
try {
const normalized = await fetchRemoteActiveNodeStats(fetchImpl);
if (normalized) return normalized;
throw new Error('invalid stats payload');
} catch (error) {
console.debug('Failed to fetch /api/stats; using local active-node counts.', error);
return computeLocalActiveNodeStats(nodes, nowSeconds);
}
}
/**
* Format the dashboard refresh-info sentence for active-node counts.
*
* @param {{channel: string, frequency: string, stats: {hour:number,day:number,week:number,month:number,sampled:boolean}}} params Formatting data.
* @returns {string} User-visible sentence for the dashboard header.
*/
export function formatActiveNodeStatsText({ channel, frequency, stats }) {
const parts = [
`${Number(stats?.hour) || 0}/hour`,
`${Number(stats?.day) || 0}/day`,
`${Number(stats?.week) || 0}/week`,
`${Number(stats?.month) || 0}/month`
];
const suffix = stats?.sampled ? ' (sampled)' : '';
return `${channel} (${frequency}) — active nodes: ${parts.join(', ')}${suffix}.`;
}
/**
* Parse the next-page cursor header from an API response.
*
* @param {*} response Fetch response candidate.
* @returns {string|null} Cursor token for the next page.
*/
export function readNextCursorHeader(response) {
const headers = response && response.headers;
if (!headers || typeof headers.get !== 'function') {
return null;
}
const cursor = headers.get('X-Next-Cursor');
return cursor && String(cursor).trim().length > 0 ? String(cursor).trim() : null;
}
/**
* Fetch an API collection endpoint using keyset cursor pagination.
*
* @param {{
* path: string,
* limit: number,
* maxRows?: number,
* params?: Record<string, string>,
* fetchImpl?: Function
* }} options Request options.
* @returns {Promise<Array<Object>>} Aggregated array of collection rows.
*/
export async function fetchPaginatedCollection({
path,
limit,
maxRows = 5000,
params = {},
fetchImpl = fetch
}) {
const safePath = typeof path === 'string' ? path : '';
if (!safePath) {
return [];
}
const safeLimit = Number.isFinite(limit) && limit > 0 ? Math.floor(limit) : 200;
const safeMaxRows = Number.isFinite(maxRows) && maxRows > 0 ? Math.floor(maxRows) : safeLimit;
const results = [];
let cursor = null;
let pageCount = 0;
while (results.length < safeMaxRows && pageCount < 100) {
const query = new URLSearchParams({ limit: String(safeLimit) });
Object.entries(params || {}).forEach(([key, value]) => {
if (value == null) return;
const text = String(value).trim();
if (!text) return;
query.set(key, text);
});
if (cursor) {
query.set('cursor', cursor);
}
const response = await fetchImpl(`${safePath}?${query.toString()}`, { cache: 'no-store' });
if (!response || !response.ok) {
throw new Error('HTTP ' + (response ? response.status : 'unknown'));
}
const payload = await response.json();
if (!Array.isArray(payload)) {
throw new Error('invalid paginated payload');
}
if (payload.length === 0) {
break;
}
results.push(...payload);
cursor = readNextCursorHeader(response);
pageCount += 1;
if (!cursor) {
break;
}
}
return results.slice(0, safeMaxRows);
}
/**
* Entry point for the interactive dashboard. Wires up event listeners,
* initializes the map, and triggers the first data refresh cycle.
@@ -198,6 +413,7 @@ export function initializeApp(config) {
const TRACE_MAX_AGE_SECONDS = 28 * 24 * 60 * 60;
const SNAPSHOT_LIMIT = SNAPSHOT_WINDOW;
const CHAT_LIMIT = MESSAGE_LIMIT;
const RECENT_COLLECTION_WINDOW_SECONDS = 7 * 24 * 60 * 60;
const CHAT_RECENT_WINDOW_SECONDS = 7 * 24 * 60 * 60;
const REFRESH_MS = config.refreshMs;
const CHAT_ENABLED = Boolean(config.chatEnabled);
@@ -222,6 +438,7 @@ export function initializeApp(config) {
/** @type {ReturnType<typeof setTimeout>|null} */
let refreshTimer = null;
let refreshInfoRequestId = 0;
/**
* Close any open short-info overlays that do not contain the provided anchor.
@@ -3466,9 +3683,14 @@ export function initializeApp(config) {
*/
async function fetchNodes(limit = NODE_LIMIT) {
const effectiveLimit = resolveSnapshotLimit(limit, NODE_LIMIT);
const r = await fetch(`/api/nodes?limit=${effectiveLimit}`, { cache: 'no-store' });
if (!r.ok) throw new Error('HTTP ' + r.status);
return r.json();
const maxRows = Math.max(effectiveLimit, effectiveLimit * SNAPSHOT_LIMIT);
const nowSec = Math.floor(Date.now() / 1000);
return fetchPaginatedCollection({
path: '/api/nodes',
limit: effectiveLimit,
maxRows,
params: { since: String(nowSec - RECENT_COLLECTION_WINDOW_SECONDS) }
});
}
/**
@@ -3497,14 +3719,19 @@ export function initializeApp(config) {
async function fetchMessages(limit = MESSAGE_LIMIT, options = {}) {
if (!CHAT_ENABLED) return [];
const safeLimit = normaliseMessageLimit(limit);
const params = new URLSearchParams({ limit: String(safeLimit) });
const nowSec = Math.floor(Date.now() / 1000);
const params = {};
if (options && options.encrypted) {
params.set('encrypted', 'true');
params.encrypted = 'true';
}
const query = params.toString();
const r = await fetch(`/api/messages?${query}`, { cache: 'no-store' });
if (!r.ok) throw new Error('HTTP ' + r.status);
return r.json();
params.since = String(nowSec - CHAT_RECENT_WINDOW_SECONDS);
const maxRows = Math.max(safeLimit, safeLimit * SNAPSHOT_LIMIT);
return fetchPaginatedCollection({
path: '/api/messages',
limit: safeLimit,
maxRows,
params
});
}
/**
@@ -3515,9 +3742,14 @@ export function initializeApp(config) {
*/
async function fetchNeighbors(limit = NODE_LIMIT) {
const effectiveLimit = resolveSnapshotLimit(limit, NODE_LIMIT);
const r = await fetch(`/api/neighbors?limit=${effectiveLimit}`, { cache: 'no-store' });
if (!r.ok) throw new Error('HTTP ' + r.status);
return r.json();
const maxRows = Math.max(effectiveLimit, effectiveLimit * SNAPSHOT_LIMIT);
const nowSec = Math.floor(Date.now() / 1000);
return fetchPaginatedCollection({
path: '/api/neighbors',
limit: effectiveLimit,
maxRows,
params: { since: String(nowSec - RECENT_COLLECTION_WINDOW_SECONDS) }
});
}
/**
@@ -3529,9 +3761,14 @@ export function initializeApp(config) {
async function fetchTraces(limit = TRACE_LIMIT) {
const safeLimit = Number.isFinite(limit) && limit > 0 ? Math.floor(limit) : TRACE_LIMIT;
const effectiveLimit = Math.min(safeLimit, NODE_LIMIT);
const r = await fetch(`/api/traces?limit=${effectiveLimit}`, { cache: 'no-store' });
if (!r.ok) throw new Error('HTTP ' + r.status);
const traces = await r.json();
const maxRows = Math.max(effectiveLimit, effectiveLimit * SNAPSHOT_LIMIT);
const nowSec = Math.floor(Date.now() / 1000);
const traces = await fetchPaginatedCollection({
path: '/api/traces',
limit: effectiveLimit,
maxRows,
params: { since: String(nowSec - TRACE_MAX_AGE_SECONDS) }
});
return filterRecentTraces(traces, TRACE_MAX_AGE_SECONDS);
}
@@ -3543,9 +3780,14 @@ export function initializeApp(config) {
*/
async function fetchTelemetry(limit = NODE_LIMIT) {
const effectiveLimit = resolveSnapshotLimit(limit, NODE_LIMIT);
const r = await fetch(`/api/telemetry?limit=${effectiveLimit}`, { cache: 'no-store' });
if (!r.ok) throw new Error('HTTP ' + r.status);
return r.json();
const maxRows = Math.max(effectiveLimit, effectiveLimit * SNAPSHOT_LIMIT);
const nowSec = Math.floor(Date.now() / 1000);
return fetchPaginatedCollection({
path: '/api/telemetry',
limit: effectiveLimit,
maxRows,
params: { since: String(nowSec - RECENT_COLLECTION_WINDOW_SECONDS) }
});
}
/**
@@ -3556,9 +3798,14 @@ export function initializeApp(config) {
*/
async function fetchPositions(limit = NODE_LIMIT) {
const effectiveLimit = resolveSnapshotLimit(limit, NODE_LIMIT);
const r = await fetch(`/api/positions?limit=${effectiveLimit}`, { cache: 'no-store' });
if (!r.ok) throw new Error('HTTP ' + r.status);
return r.json();
const maxRows = Math.max(effectiveLimit, effectiveLimit * SNAPSHOT_LIMIT);
const nowSec = Math.floor(Date.now() / 1000);
return fetchPaginatedCollection({
path: '/api/positions',
limit: effectiveLimit,
maxRows,
params: { since: String(nowSec - RECENT_COLLECTION_WINDOW_SECONDS) }
});
}
/**
@@ -4395,15 +4642,16 @@ export function initializeApp(config) {
if (!refreshInfo || !isDashboardView) {
return;
}
const windows = [
{ label: 'hour', secs: 3600 },
{ label: 'day', secs: 86400 },
{ label: 'week', secs: 7 * 86400 },
];
const counts = windows.map(w => {
const c = nodes.filter(n => n.last_heard && nowSec - Number(n.last_heard) <= w.secs).length;
return `${c}/${w.label}`;
}).join(', ');
refreshInfo.textContent = `${config.channel} (${config.frequency}) — active nodes: ${counts}.`;
const requestId = ++refreshInfoRequestId;
void fetchActiveNodeStats({ nodes, nowSeconds: nowSec }).then(stats => {
if (requestId !== refreshInfoRequestId) {
return;
}
refreshInfo.textContent = formatActiveNodeStatsText({
channel: config.channel,
frequency: config.frequency,
stats
});
});
}
}
+467 -5
View File
@@ -27,6 +27,7 @@ RSpec.describe "Potato Mesh Sinatra app" do
let(:app) { Sinatra::Application }
let(:application_class) { PotatoMesh::Application }
INSERT_NODE_WITH_LAST_HEARD_SQL = "INSERT INTO nodes(node_id, num, last_heard, first_heard) VALUES (?,?,?,?)".freeze
INSERT_NODE_WITH_METADATA_SQL = "INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)".freeze
SELECT_NODE_LAST_HEARD_SQL = "SELECT last_heard FROM nodes WHERE node_id = ?".freeze
describe "configuration" do
@@ -79,6 +80,29 @@ RSpec.describe "Potato Mesh Sinatra app" do
File.expand_path("../../tests/#{name}", __dir__)
end
# Assert that an API collection exposes keyset pagination with a non-empty
# cursor and a non-empty second page.
#
# @param path [String] collection endpoint path.
# @param limit [Integer] page size for the test request.
# @return [Array<Array<Hash>, Array<Hash>, String]] first page, second page, and cursor.
def expect_collection_cursor_pagination(path, limit: 2)
get "#{path}?limit=#{limit}"
expect(last_response).to be_ok
first_page = JSON.parse(last_response.body)
expect(first_page.length).to eq(limit)
cursor = last_response.headers["X-Next-Cursor"]
expect(cursor).to be_a(String)
expect(cursor).not_to be_empty
get "#{path}?limit=#{limit}&cursor=#{URI.encode_www_form_component(cursor)}"
expect(last_response).to be_ok
second_page = JSON.parse(last_response.body)
expect(second_page).not_to be_empty
[first_page, second_page, cursor]
end
# Execute the provided block with a configured SQLite connection.
#
# @param readonly [Boolean] whether to open the database in read-only mode.
@@ -2643,6 +2667,196 @@ RSpec.describe "Potato Mesh Sinatra app" do
end
end
it "supports cursor pagination for instance catalogs" do
clear_database
now = Time.now.to_i
with_db do |db|
upsert_instance_record(
db,
{
id: "instance-a",
domain: "alpha.mesh",
pubkey: remote_key.public_key.export,
name: "Alpha",
version: "1.0.0",
channel: "#LongFast",
frequency: "868MHz",
latitude: nil,
longitude: nil,
last_update_time: now,
is_private: false,
},
"sig-a",
)
upsert_instance_record(
db,
{
id: "instance-b",
domain: "bravo.mesh",
pubkey: remote_key.public_key.export,
name: "Bravo",
version: "1.0.0",
channel: "#LongFast",
frequency: "868MHz",
latitude: nil,
longitude: nil,
last_update_time: now,
is_private: false,
},
"sig-b",
)
upsert_instance_record(
db,
{
id: "instance-c",
domain: "charlie.mesh",
pubkey: remote_key.public_key.export,
name: "Charlie",
version: "1.0.0",
channel: "#LongFast",
frequency: "868MHz",
latitude: nil,
longitude: nil,
last_update_time: now,
is_private: false,
},
"sig-c",
)
end
get "/api/instances?limit=2"
expect(last_response).to be_ok
first_page = JSON.parse(last_response.body)
expect(first_page.length).to eq(2)
cursor = last_response.headers["X-Next-Cursor"]
expect(cursor).to be_a(String)
expect(cursor).not_to be_empty
get "/api/instances?limit=2&cursor=#{URI.encode_www_form_component(cursor)}"
expect(last_response).to be_ok
second_page = JSON.parse(last_response.body)
expect(second_page.length).to be >= 1
combined_domains = (first_page + second_page).map { |entry| entry["domain"] }
expect(combined_domains).to include("alpha.mesh", "bravo.mesh", "charlie.mesh")
end
it "continues paginating when malformed rows are skipped during normalization" do
clear_database
now = Time.now.to_i
with_db do |db|
insert_sql = <<~SQL
INSERT INTO instances (
id, domain, pubkey, name, version, channel, frequency,
latitude, longitude, last_update_time, is_private, signature
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
SQL
db.execute(
insert_sql,
["instance-a", "alpha.mesh", remote_key.public_key.export, "Alpha", "1.0.0", nil, nil, nil, nil, now, 0, "sig-a"],
)
db.execute(
insert_sql,
["instance-bad", "alpha bad mesh", remote_key.public_key.export, "Bad", "1.0.0", nil, nil, nil, nil, now, 0, "sig-bad"],
)
db.execute(
insert_sql,
["instance-b", "bravo.mesh", remote_key.public_key.export, "Bravo", "1.0.0", nil, nil, nil, nil, now, 0, "sig-b"],
)
db.execute(
insert_sql,
["instance-c", "charlie.mesh", remote_key.public_key.export, "Charlie", "1.0.0", nil, nil, nil, nil, now, 0, "sig-c"],
)
end
first_page = application_class.load_instances_for_api(limit: 2, with_pagination: true)
expect(first_page[:items].map { |entry| entry["id"] }).to eq(["instance-a", "instance-b"])
expect(first_page[:next_cursor]).to be_a(String)
expect(first_page[:next_cursor]).not_to be_empty
second_page = application_class.load_instances_for_api(
limit: 2,
cursor: first_page[:next_cursor],
with_pagination: true,
)
expect(second_page[:items].map { |entry| entry["id"] }).to include("instance-c")
end
it "continues pagination when the page-boundary marker row is malformed" do
clear_database
now = Time.now.to_i
with_db do |db|
insert_sql = <<~SQL
INSERT INTO instances (
id, domain, pubkey, name, version, channel, frequency,
latitude, longitude, last_update_time, is_private, signature
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
SQL
db.execute(insert_sql, ["instance-a", "alpha.mesh", remote_key.public_key.export, "Alpha", "1.0.0", nil, nil, nil, nil, now, 0, "sig-a"])
db.execute(insert_sql, ["instance-b", "bravo.mesh", remote_key.public_key.export, "Bravo", "1.0.0", nil, nil, nil, nil, now, 0, "sig-b"])
db.execute(insert_sql, ["instance-bad", "zzy invalid", remote_key.public_key.export, "Bad", "1.0.0", nil, nil, nil, nil, now, 0, "sig-bad"])
db.execute(insert_sql, ["instance-z", "zzz.mesh", remote_key.public_key.export, "Zulu", "1.0.0", nil, nil, nil, nil, now, 0, "sig-z"])
end
first_page = application_class.load_instances_for_api(limit: 2, with_pagination: true)
expect(first_page[:items].map { |entry| entry["id"] }).to eq(["instance-a", "instance-b"])
expect(first_page[:next_cursor]).to be_a(String)
expect(first_page[:next_cursor]).not_to be_empty
second_page = application_class.load_instances_for_api(
limit: 2,
cursor: first_page[:next_cursor],
with_pagination: true,
)
expect(second_page[:items].map { |entry| entry["id"] }).to include("instance-z")
end
it "continues scanning when a full fetched page contains only malformed rows" do
clear_database
now = Time.now.to_i
with_db do |db|
insert_sql = <<~SQL
INSERT INTO instances (
id, domain, pubkey, name, version, channel, frequency,
latitude, longitude, last_update_time, is_private, signature
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
SQL
db.execute(insert_sql, ["instance-a", "alpha.mesh", remote_key.public_key.export, "Alpha", "1.0.0", nil, nil, nil, nil, now, 0, "sig-a"])
db.execute(insert_sql, ["instance-bad-1", "bad domain 1", remote_key.public_key.export, "Bad 1", "1.0.0", nil, nil, nil, nil, now, 0, "sig-b1"])
db.execute(insert_sql, ["instance-bad-2", "bad domain 2", remote_key.public_key.export, "Bad 2", "1.0.0", nil, nil, nil, nil, now, 0, "sig-b2"])
db.execute(insert_sql, ["instance-z", "zzz.mesh", remote_key.public_key.export, "Zulu", "1.0.0", nil, nil, nil, nil, now, 0, "sig-z"])
end
first_page = application_class.load_instances_for_api(limit: 1, with_pagination: true)
expect(first_page[:items].map { |entry| entry["id"] }).to eq(["instance-a"])
expect(first_page[:next_cursor]).to be_a(String)
expect(first_page[:next_cursor]).not_to be_empty
seen_ids = first_page[:items].map { |entry| entry["id"] }
cursor = first_page[:next_cursor]
5.times do
break unless cursor
page = application_class.load_instances_for_api(
limit: 1,
cursor: cursor,
with_pagination: true,
)
seen_ids.concat(page[:items].map { |entry| entry["id"] })
cursor = page[:next_cursor]
break if seen_ids.include?("instance-z")
end
expect(seen_ids).to include("instance-z")
end
context "when federation is disabled" do
around do |example|
original = ENV["FEDERATION"]
@@ -5572,7 +5786,7 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(payload["node_id"]).to eq("!fresh-node")
end
it "filters node results using the since parameter for collections and single lookups" do
it "filters node collection results using since and before parameters" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
@@ -5593,8 +5807,8 @@ RSpec.describe "Potato Mesh Sinatra app" do
get "/api/nodes?since=#{recent_last_heard}"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
expect(payload.map { |row| row["node_id"] }).to eq(["!recent-node"])
since_payload = JSON.parse(last_response.body)
expect(since_payload.map { |row| row["node_id"] }).to eq(["!recent-node"])
get "/api/nodes/!older-node?since=#{recent_last_heard}"
expect(last_response.status).to eq(404)
@@ -5603,6 +5817,20 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(last_response).to be_ok
detail = JSON.parse(last_response.body)
expect(detail["node_id"]).to eq("!recent-node")
get "/api/nodes?before=#{older_last_heard}"
expect(last_response).to be_ok
before_payload = JSON.parse(last_response.body)
expect(before_payload.map { |row| row["node_id"] }).to eq(["!older-node"])
# Node detail route currently supports `since`, but not `before`.
get "/api/nodes/!older-node"
expect(last_response).to be_ok
expect(JSON.parse(last_response.body)["node_id"]).to eq("!older-node")
get "/api/nodes/!recent-node"
expect(last_response).to be_ok
expect(JSON.parse(last_response.body)["node_id"]).to eq("!recent-node")
end
it "omits blank values from node responses" do
@@ -5662,6 +5890,118 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(payload).not_to have_key("short_name")
expect(payload).not_to have_key("hw_model")
end
it "emits keyset cursor headers for multi-page collection responses" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
with_db do |db|
db.execute(
"INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?,?)",
["!cursor-a", 1, "a", "A", "TBEAM", "CLIENT", 0.0, now - 1, now - 1],
)
db.execute(
"INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?,?)",
["!cursor-b", 2, "b", "B", "TBEAM", "CLIENT", 0.0, now - 2, now - 2],
)
db.execute(
"INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?,?)",
["!cursor-c", 3, "c", "C", "TBEAM", "CLIENT", 0.0, now - 3, now - 3],
)
end
get "/api/nodes?limit=2"
expect(last_response).to be_ok
first_page = JSON.parse(last_response.body)
expect(first_page.length).to eq(2)
cursor = last_response.headers["X-Next-Cursor"]
expect(cursor).to be_a(String)
expect(cursor).not_to be_empty
get "/api/nodes?limit=2&cursor=#{URI.encode_www_form_component(cursor)}"
expect(last_response).to be_ok
second_page = JSON.parse(last_response.body)
expect(second_page.length).to eq(1)
expect((first_page + second_page).map { |row| row["node_id"] }).to eq(
["!cursor-a", "!cursor-b", "!cursor-c"],
)
end
it "keeps paginating future-dated nodes without skipping later pages" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
with_db do |db|
db.execute(
"INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?,?)",
["!future-a", 1, "a", "A", "TBEAM", "CLIENT", 0.0, now + 300, now - 1],
)
db.execute(
"INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?,?)",
["!future-b", 2, "b", "B", "TBEAM", "CLIENT", 0.0, now + 200, now - 1],
)
db.execute(
"INSERT INTO nodes(node_id, num, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?,?)",
["!future-c", 3, "c", "C", "TBEAM", "CLIENT", 0.0, now + 100, now - 1],
)
end
get "/api/nodes?limit=2"
expect(last_response).to be_ok
first_page = JSON.parse(last_response.body)
expect(first_page.map { |row| row["node_id"] }).to eq(["!future-a", "!future-b"])
cursor = last_response.headers["X-Next-Cursor"]
expect(cursor).to be_a(String)
expect(cursor).not_to be_empty
get "/api/nodes?limit=2&cursor=#{URI.encode_www_form_component(cursor)}"
expect(last_response).to be_ok
second_page = JSON.parse(last_response.body)
expect(second_page.map { |row| row["node_id"] }).to eq(["!future-c"])
end
end
describe "GET /api/stats" do
it "returns exact SQL-backed activity counts without list-endpoint sampling" do
clear_database
now = reference_time.to_i
allow(Time).to receive(:now).and_return(reference_time)
with_db do |db|
db.transaction
1005.times do |index|
heard = now - (index % 1800)
node_id = format("!%08x", index + 1)
db.execute(
INSERT_NODE_WITH_METADATA_SQL,
[node_id, index + 1, "n#{index}", "Node #{index}", "TBEAM", "CLIENT", heard, heard],
)
end
db.execute(
INSERT_NODE_WITH_METADATA_SQL,
["!week0001", 200_001, "week", "Week Node", "TBEAM", "CLIENT", now - (2 * 86_400), now - (2 * 86_400)],
)
db.execute(
INSERT_NODE_WITH_METADATA_SQL,
["!month001", 200_002, "month", "Month Node", "TBEAM", "CLIENT", now - (20 * 86_400), now - (20 * 86_400)],
)
db.commit
end
get "/api/stats"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
expect(payload["sampled"]).to eq(false)
expect(payload["active_nodes"]).to include(
"hour" => 1005,
"day" => 1005,
"week" => 1006,
"month" => 1007,
)
end
end
describe "GET /api/messages" do
@@ -5797,7 +6137,7 @@ RSpec.describe "Potato Mesh Sinatra app" do
end
end
it "filters messages by the since parameter while defaulting to the full history" do
it "filters messages by since and before parameters while defaulting to full history" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
@@ -5837,6 +6177,16 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(last_response).to be_ok
scoped = JSON.parse(last_response.body)
expect(scoped.map { |row| row["id"] }).to eq([2])
get "/api/messages?before=#{stale_rx}"
expect(last_response).to be_ok
before_payload = JSON.parse(last_response.body)
expect(before_payload.map { |row| row["id"] }).to eq([1])
get "/api/messages/!old?before=#{stale_rx}"
expect(last_response).to be_ok
scoped_payload = JSON.parse(last_response.body)
expect(scoped_payload.map { |row| row["id"] }).to eq([1])
end
end
@@ -5966,7 +6316,7 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(filtered.map { |row| row["id"] }).to eq([2, 1])
end
it "filters positions using the since parameter for both global and node queries" do
it "filters positions using since and before parameters for global and node queries" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
@@ -5995,6 +6345,57 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(last_response).to be_ok
filtered = JSON.parse(last_response.body)
expect(filtered.map { |row| row["id"] }).to eq([11])
get "/api/positions?before=#{older_rx}"
expect(last_response).to be_ok
before_payload = JSON.parse(last_response.body)
expect(before_payload.map { |row| row["id"] }).to eq([10])
get "/api/positions/!pos-since?before=#{older_rx}"
expect(last_response).to be_ok
before_filtered = JSON.parse(last_response.body)
expect(before_filtered.map { |row| row["id"] }).to eq([10])
end
it "encodes position pagination cursors with cursor_time and advances pages" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
with_db do |db|
db.execute(
"INSERT INTO positions(id, node_id, node_num, rx_time, rx_iso, position_time, latitude, longitude) VALUES(?,?,?,?,?,?,?,?)",
[31, "!pos-cursor", 201, now - 10, Time.at(now - 10).utc.iso8601, now - 10, 52.0, 13.0],
)
db.execute(
"INSERT INTO positions(id, node_id, node_num, rx_time, rx_iso, position_time, latitude, longitude) VALUES(?,?,?,?,?,?,?,?)",
[32, "!pos-cursor", 201, now - 20, Time.at(now - 20).utc.iso8601, now - 20, 53.0, 14.0],
)
db.execute(
"INSERT INTO positions(id, node_id, node_num, rx_time, rx_iso, position_time, latitude, longitude) VALUES(?,?,?,?,?,?,?,?)",
[33, "!pos-cursor", 201, now - 30, Time.at(now - 30).utc.iso8601, now - 30, 54.0, 15.0],
)
end
get "/api/positions?limit=2"
expect(last_response).to be_ok
first_page = JSON.parse(last_response.body)
expect(first_page.map { |row| row["id"] }).to eq([31, 32])
cursor = last_response.headers["X-Next-Cursor"]
expect(cursor).to be_a(String)
expect(cursor).not_to be_empty
cursor_payload = JSON.parse(Base64.urlsafe_decode64(cursor))
expect(cursor_payload).to have_key("cursor_time")
expect(cursor_payload).not_to have_key("rx_time")
get "/api/positions?limit=2&cursor=#{URI.encode_www_form_component(cursor)}"
expect(last_response).to be_ok
second_page = JSON.parse(last_response.body)
expect(second_page.map { |row| row["id"] }).to eq([33])
end
it "omits blank values from position responses" do
@@ -6175,6 +6576,32 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(filtered.length).to eq(1)
expect(filtered.first).not_to have_key("snr")
end
it "emits pagination cursor headers for neighbor collections" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
with_db do |db|
db.execute(
"INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)",
["!origin-cursor", "orig", "Origin", "TBEAM", "CLIENT", 0.0, now, now],
)
%w[!n-a !n-b !n-c].each do |neighbor_id|
db.execute(
"INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)",
[neighbor_id, "n", "Neighbor", "TBEAM", "CLIENT", 0.0, now, now],
)
end
db.execute("INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)", ["!origin-cursor", "!n-a", 1.0, now - 10])
db.execute("INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)", ["!origin-cursor", "!n-b", 2.0, now - 20])
db.execute("INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)", ["!origin-cursor", "!n-c", 3.0, now - 30])
end
first_page, second_page, = expect_collection_cursor_pagination("/api/neighbors", limit: 2)
expect(first_page.length).to eq(2)
expect(second_page.length).to eq(1)
end
end
describe "GET /api/telemetry" do
@@ -6235,6 +6662,22 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect_same_value(second_entry["soil_temperature"], telemetry_metric(second_latest, "soil_temperature"))
end
it "emits pagination cursor headers for telemetry collections" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
with_db do |db|
db.execute("INSERT INTO telemetry(id, node_id, rx_time, rx_iso, telemetry_time, battery_level) VALUES(?,?,?,?,?,?)", [70_001, "!tele-a", now - 10, Time.at(now - 10).utc.iso8601, now - 10, 50.0])
db.execute("INSERT INTO telemetry(id, node_id, rx_time, rx_iso, telemetry_time, battery_level) VALUES(?,?,?,?,?,?)", [70_002, "!tele-b", now - 20, Time.at(now - 20).utc.iso8601, now - 20, 60.0])
db.execute("INSERT INTO telemetry(id, node_id, rx_time, rx_iso, telemetry_time, battery_level) VALUES(?,?,?,?,?,?)", [70_003, "!tele-c", now - 30, Time.at(now - 30).utc.iso8601, now - 30, 70.0])
end
first_page, second_page, = expect_collection_cursor_pagination("/api/telemetry", limit: 2)
expect(first_page.length).to eq(2)
expect(second_page.length).to eq(1)
end
it "excludes telemetry entries older than seven days from collection queries" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
@@ -6610,6 +7053,17 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(earlier["elapsed_ms"]).to eq(trace_fixture.last.dig("metrics", "latency_ms"))
end
it "emits pagination cursor headers for trace collections" do
clear_database
post "/api/traces", trace_fixture.to_json, auth_headers
expect(last_response).to be_ok
first_page, second_page, = expect_collection_cursor_pagination("/api/traces", limit: 1)
expect(first_page.length).to eq(1)
expect(second_page.length).to eq(1)
expect(second_page.first["id"]).not_to eq(first_page.first["id"])
end
it "filters traces by node reference across sources" do
clear_database
post "/api/traces", trace_fixture.to_json, auth_headers
@@ -6687,6 +7141,14 @@ RSpec.describe "Potato Mesh Sinatra app" do
scoped = JSON.parse(last_response.body)
expect(scoped.map { |row| row["id"] }).to eq([60_002])
end
it "returns an empty list for non-numeric trace node references" do
clear_database
get "/api/traces/not-a-node"
expect(last_response).to be_ok
expect(JSON.parse(last_response.body)).to eq([])
end
end
describe "GET /nodes/:id" do
+156 -62
View File
@@ -24,6 +24,8 @@ require "socket"
RSpec.describe PotatoMesh::App::Federation do
NODES_API_PATH = "/api/nodes".freeze
STATS_API_PATH = "/api/stats".freeze
FULL_DATA_UNAVAILABLE_REASON = "full data unavailable".freeze
HTTP_CONNECTION_DOUBLE = "Net::HTTPConnection".freeze
subject(:federation_helpers) do
@@ -294,6 +296,37 @@ RSpec.describe PotatoMesh::App::Federation do
end
end
def configure_remote_node_window(now)
allow(Time).to receive(:now).and_return(now)
allow(PotatoMesh::Config).to receive(:remote_instance_max_node_age).and_return(900)
end
def stats_mapping(now:, stats_response:, full_nodes_response:, window_nodes_response: nil)
recent_cutoff = now.to_i - 900
mapping = { [seed_domain, "/api/instances"] => [payload_entries, :instances] }
attributes_list.each do |attributes|
mapping[[attributes[:domain], STATS_API_PATH]] = stats_response
mapping[[attributes[:domain], NODES_API_PATH]] = full_nodes_response
mapping[[attributes[:domain], "/api/instances"]] = [[], :instances]
next unless window_nodes_response
mapping[[attributes[:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"]] = window_nodes_response
end
mapping
end
def stub_ingest_fetches(mapping, capture_paths: false)
captured_paths = []
allow(federation_helpers).to receive(:fetch_instance_json) do |host, path|
captured_paths << [host, path] if capture_paths
mapping.fetch([host, path]) { [nil, []] }
end
allow(federation_helpers).to receive(:verify_instance_signature).and_return(true)
allow(federation_helpers).to receive(:validate_remote_nodes).and_return([true, nil])
allow(federation_helpers).to receive(:upsert_instance_record)
captured_paths
end
it "stops processing once the per-response limit is exceeded" do
processed_domains = []
allow(federation_helpers).to receive(:upsert_instance_record) do |_db, attrs, _signature|
@@ -329,102 +362,163 @@ RSpec.describe PotatoMesh::App::Federation do
expect(federation_helpers.debug_messages).to include(a_string_including("crawl limit"))
end
it "requests an expanded recent node window when counting remote activity" do
it "prefers /api/stats when counting remote activity" do
now = Time.at(1_700_000_000)
allow(Time).to receive(:now).and_return(now)
allow(PotatoMesh::Config).to receive(:remote_instance_max_node_age).and_return(900)
recent_cutoff = now.to_i - 900
configure_remote_node_window(now)
mapping = { [seed_domain, "/api/instances"] => [payload_entries, :instances] }
attributes_list.each_with_index do |attributes, index|
mapping[[attributes[:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"]] = [node_payload, :nodes]
mapping[[attributes[:domain], NODES_API_PATH]] = [node_payload, :nodes]
mapping[[attributes[:domain], "/api/instances"]] = [[], :instances]
allow(federation_helpers).to receive(:remote_instance_attributes_from_payload).with(payload_entries[index]).and_return([attributes, "signature-#{index}", nil])
end
captured_paths = []
allow(federation_helpers).to receive(:fetch_instance_json) do |host, path|
captured_paths << [host, path]
mapping.fetch([host, path]) { [nil, []] }
end
allow(federation_helpers).to receive(:verify_instance_signature).and_return(true)
allow(federation_helpers).to receive(:validate_remote_nodes).and_return([true, nil])
allow(federation_helpers).to receive(:upsert_instance_record)
mapping = stats_mapping(
now:,
stats_response: [{ "active_nodes" => { "hour" => 5, "day" => 7, "week" => 9, "month" => 11 }, "sampled" => false }, :stats],
full_nodes_response: [node_payload, :nodes],
)
captured_paths = stub_ingest_fetches(mapping, capture_paths: true)
federation_helpers.ingest_known_instances_from!(db, seed_domain)
expect(captured_paths).to include(
[attributes_list[0][:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"],
[attributes_list[1][:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"],
[attributes_list[2][:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"],
[attributes_list[0][:domain], STATS_API_PATH],
[attributes_list[1][:domain], STATS_API_PATH],
[attributes_list[2][:domain], STATS_API_PATH],
)
expect(captured_paths).to include(
[attributes_list[0][:domain], NODES_API_PATH],
[attributes_list[1][:domain], NODES_API_PATH],
[attributes_list[2][:domain], NODES_API_PATH],
)
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(node_payload.length))
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(5))
end
it "falls back to full node data when the recent window request fails" do
it "prefers recent node window counts when /api/stats is unavailable" do
now = Time.at(1_700_000_000)
allow(Time).to receive(:now).and_return(now)
allow(PotatoMesh::Config).to receive(:remote_instance_max_node_age).and_return(900)
recent_cutoff = now.to_i - 900
configure_remote_node_window(now)
full_nodes_payload = node_payload.take(2)
recent_window_payload = node_payload
recent_path = "/api/nodes?since=#{now.to_i - 900}&limit=1000"
mapping = { [seed_domain, "/api/instances"] => [payload_entries, :instances] }
attributes_list.each_with_index do |attributes, index|
mapping[[attributes[:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"]] = [nil, ["no window"]]
mapping[[attributes[:domain], NODES_API_PATH]] = [node_payload, :nodes]
mapping[[attributes[:domain], "/api/instances"]] = [[], :instances]
allow(federation_helpers).to receive(:remote_instance_attributes_from_payload).with(payload_entries[index]).and_return([attributes, "signature-#{index}", nil])
end
captured_paths = []
allow(federation_helpers).to receive(:fetch_instance_json) do |host, path|
captured_paths << [host, path]
mapping.fetch([host, path]) { [nil, []] }
end
allow(federation_helpers).to receive(:verify_instance_signature).and_return(true)
allow(federation_helpers).to receive(:validate_remote_nodes).and_return([true, nil])
allow(federation_helpers).to receive(:upsert_instance_record)
mapping = stats_mapping(
now:,
stats_response: [nil, ["stats unavailable"]],
full_nodes_response: [full_nodes_payload, :nodes],
window_nodes_response: [recent_window_payload, :nodes],
)
captured_paths = stub_ingest_fetches(mapping, capture_paths: true)
federation_helpers.ingest_known_instances_from!(db, seed_domain)
expect(captured_paths).to include(
[attributes_list[0][:domain], STATS_API_PATH],
[attributes_list[1][:domain], STATS_API_PATH],
[attributes_list[2][:domain], STATS_API_PATH],
)
expect(captured_paths).to include(
[attributes_list[0][:domain], NODES_API_PATH],
[attributes_list[1][:domain], NODES_API_PATH],
[attributes_list[2][:domain], NODES_API_PATH],
)
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(be_nil)
expect(captured_paths).to include(
[attributes_list[0][:domain], recent_path],
[attributes_list[1][:domain], recent_path],
[attributes_list[2][:domain], recent_path],
)
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(recent_window_payload.length))
end
it "falls back to recent node window when full node data is unavailable" do
now = Time.at(1_700_000_000)
allow(Time).to receive(:now).and_return(now)
allow(PotatoMesh::Config).to receive(:remote_instance_max_node_age).and_return(900)
recent_cutoff = now.to_i - 900
configure_remote_node_window(now)
mapping = { [seed_domain, "/api/instances"] => [payload_entries, :instances] }
attributes_list.each_with_index do |attributes, index|
mapping[[attributes[:domain], "/api/nodes?since=#{recent_cutoff}&limit=1000"]] = [node_payload, :nodes]
mapping[[attributes[:domain], NODES_API_PATH]] = [nil, ["full data unavailable"]]
mapping[[attributes[:domain], "/api/instances"]] = [[], :instances]
allow(federation_helpers).to receive(:remote_instance_attributes_from_payload).with(payload_entries[index]).and_return([attributes, "signature-#{index}", nil])
end
allow(federation_helpers).to receive(:fetch_instance_json) do |host, path|
mapping.fetch([host, path]) { [nil, []] }
end
allow(federation_helpers).to receive(:verify_instance_signature).and_return(true)
allow(federation_helpers).to receive(:validate_remote_nodes).and_return([true, nil])
allow(federation_helpers).to receive(:upsert_instance_record)
mapping = stats_mapping(
now:,
stats_response: [nil, ["stats unavailable"]],
full_nodes_response: [nil, [FULL_DATA_UNAVAILABLE_REASON]],
window_nodes_response: [node_payload, :nodes],
)
stub_ingest_fetches(mapping)
federation_helpers.ingest_known_instances_from!(db, seed_domain)
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(node_payload.length))
end
it "uses recent node window fallback when stats succeed but full node data is unavailable" do
now = Time.at(1_700_000_000)
configure_remote_node_window(now)
recent_path = "/api/nodes?since=#{now.to_i - 900}&limit=1000"
mapping = stats_mapping(
now:,
stats_response: [{ "active_nodes" => { "hour" => 9, "day" => 10, "week" => 11, "month" => 12 }, "sampled" => false }, :stats],
full_nodes_response: [nil, [FULL_DATA_UNAVAILABLE_REASON]],
window_nodes_response: [node_payload, :nodes],
)
captured_paths = stub_ingest_fetches(mapping, capture_paths: true)
federation_helpers.ingest_known_instances_from!(db, seed_domain)
expect(captured_paths).to include(
[attributes_list[0][:domain], STATS_API_PATH],
[attributes_list[1][:domain], STATS_API_PATH],
[attributes_list[2][:domain], STATS_API_PATH],
)
expect(captured_paths).to include(
[attributes_list[0][:domain], recent_path],
[attributes_list[1][:domain], recent_path],
[attributes_list[2][:domain], recent_path],
)
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(9))
end
it "handles URI metadata from malformed /api/stats payloads without crashing" do
now = Time.at(1_700_000_000)
configure_remote_node_window(now)
mapping = stats_mapping(
now:,
stats_response: [{ "unexpected" => "shape" }, URI.parse("https://ally-0.mesh/api/stats")],
full_nodes_response: [node_payload.take(2), :nodes],
window_nodes_response: [node_payload, :nodes],
)
stub_ingest_fetches(mapping)
expect do
federation_helpers.ingest_known_instances_from!(db, seed_domain)
end.not_to raise_error
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(node_payload.length))
end
it "skips remote entries when both full and window node feeds are unavailable" do
now = Time.at(1_700_000_000)
configure_remote_node_window(now)
recent_path = "/api/nodes?since=#{now.to_i - 900}&limit=1000"
mapping = stats_mapping(
now:,
stats_response: [{ "active_nodes" => { "hour" => 3, "day" => 3, "week" => 3, "month" => 3 }, "sampled" => false }, :stats],
full_nodes_response: [nil, [FULL_DATA_UNAVAILABLE_REASON]],
window_nodes_response: [nil, ["window unavailable"]],
)
captured_paths = stub_ingest_fetches(mapping, capture_paths: true)
upserted = []
allow(federation_helpers).to receive(:upsert_instance_record) do |_db, attrs, _signature|
upserted << attrs
end
federation_helpers.ingest_known_instances_from!(db, seed_domain)
expect(captured_paths).to include(
[attributes_list[0][:domain], NODES_API_PATH],
[attributes_list[1][:domain], NODES_API_PATH],
[attributes_list[2][:domain], NODES_API_PATH],
)
expect(captured_paths).to include(
[attributes_list[0][:domain], recent_path],
[attributes_list[1][:domain], recent_path],
[attributes_list[2][:domain], recent_path],
)
expect(upserted).to be_empty
expect(federation_helpers.warn_messages).to include("Failed to load remote node data")
expect(attributes_list.map { |attrs| attrs[:nodes_count] }).to all(eq(3))
end
end
describe ".upsert_instance_record" do