web: fix liveliness of api data hydration bug (#783)

* web: fix liveliness of api data hydration bug

* web: address review comments
This commit is contained in:
l5y
2026-05-03 13:05:37 +02:00
committed by GitHub
parent 7b38f92b2d
commit b84f165191
17 changed files with 755 additions and 61 deletions
+22
View File
@@ -145,3 +145,25 @@ Heartbeat payload:
All collection GET endpoints (`/api/nodes`, `/api/messages`, `/api/positions`, `/api/telemetry`, `/api/traces`, `/api/neighbors`, `/api/ingestors`) accept an optional `?protocol=<value>` query parameter. When present, only records whose `protocol` column matches the given value are returned. The `protocol` field is included in all GET responses.
### GET endpoint time windows
Every read endpoint enforces a server-side rolling-window floor on the data it returns. The window is fixed per route and **cannot be widened by the caller** — explicit `?since=<unix_seconds>` is treated as `MAX(since, floor)`, so a `since` older than the floor is silently clamped to the floor. Pass a `since` newer than the floor when you want to be more restrictive (incremental refresh).
| Route | Floor (default) | Notes |
| --- | --- | --- |
| `GET /api/nodes` | 7 days | filtered by `nodes.last_heard` |
| `GET /api/messages` | 7 days | filtered by `messages.rx_time` |
| `GET /api/positions` | 7 days | filtered by `COALESCE(rx_time, position_time)` |
| `GET /api/telemetry` | 7 days | filtered by `COALESCE(rx_time, telemetry_time)` |
| `GET /api/instances` | 7 days | filtered by `instances.last_update_time` |
| `GET /api/neighbors` | **28 days** | sparse data; widened to keep slow scrapes visible |
| `GET /api/traces` | **28 days** | sparse data; same rationale |
| `GET /api/ingestors` | **28 days** | sparse heartbeats; same rationale |
| `GET /api/.../:id` (per-id lookup) | **28 days** | every per-id route uses the extended window so callers can backfill historical context for a specific node/conversation that has dropped out of the bulk view. The `since` clamp still applies. |
| `GET /api/telemetry/aggregated` | caller-controlled | `?windowSeconds=<N>` is mandatory; defaults to 86 400 (1 day). Bounded by `MAX_QUERY_LIMIT` on bucket count, not by a hard floor. |
| `GET /api/stats` | n/a | reports counts at fixed `hour`/`day`/`week`/`month` activity buckets. |
Federation peers should not assume an unbounded historical window: a peer that requests `/api/messages?since=0` from a partner expecting "everything" will only ever receive the last seven days. To pull older state, request the per-id endpoint (28 days) for the relevant nodes.
The constants live in `web/lib/potato_mesh/config.rb` (`week_seconds`, `four_weeks_seconds`).
@@ -17,6 +17,29 @@
module PotatoMesh
module App
module Federation
# Process-wide memo for the most recently emitted self-registration
# decision. Sinatra spins up a fresh app instance per request so a
# plain instance variable would not survive across calls; storing the
# state on the module itself keeps the dedupe stable for the lifetime
# of the worker process.
@self_registration_log_state = { mutex: Mutex.new, last: nil }
# Accessor for the dedupe state used by {#ensure_self_instance_record!}.
#
# @return [Hash{Symbol => Object}] mutable state hash holding +:mutex+ and +:last+.
def self.self_registration_log_state
@self_registration_log_state
end
# Reset the dedupe memo. Intended for tests; production code never
# needs to clear the state because each process starts fresh.
#
# @return [void]
def self.reset_self_registration_log_state!
state = @self_registration_log_state
state[:mutex].synchronize { state[:last] = nil }
end
# Resolve the canonical domain for the running instance.
#
# @return [String, nil] sanitized instance domain or nil outside production.
@@ -137,16 +160,30 @@ module PotatoMesh
signature = sign_instance_attributes(attributes)
db = nil
allowed, reason = self_instance_registration_decision(attributes[:domain])
# Decisions are stable per process while INSTANCE_DOMAIN_SOURCE
# remains the same — without dedupe, the federation banner on every
# page navigation produced one log line apiece. Only emit when the
# tuple changes so operators still see the first decision (and any
# later flip) without the spam.
sentinel = [allowed, reason, attributes[:domain]]
state = PotatoMesh::App::Federation.self_registration_log_state
should_log = state[:mutex].synchronize do
changed = state[:last] != sentinel
state[:last] = sentinel if changed
changed
end
if allowed
db = open_database
upsert_instance_record(db, attributes, signature)
debug_log(
"Registered self instance record",
context: "federation.instances",
domain: attributes[:domain],
instance_id: attributes[:id],
)
else
if should_log
debug_log(
"Registered self instance record",
context: "federation.instances",
domain: attributes[:domain],
instance_id: attributes[:id],
)
end
elsif should_log
debug_log(
"Skipped self instance registration",
context: "federation.instances",
@@ -26,7 +26,12 @@ module PotatoMesh
# @return [Array<Hash>] compacted message rows safe for API responses.
def query_messages(limit, node_ref: nil, include_encrypted: false, since: 0, protocol: nil)
limit = coerce_query_limit(limit)
since_threshold = normalize_since_threshold(since, floor: 0)
now = Time.now.to_i
# Default the chat feed to the same seven-day window the dashboard uses
# for the node table; per-id lookups widen to twenty-eight days so
# historical conversation context remains reachable on demand.
since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds
since_threshold = normalize_since_threshold(since, floor: since_floor)
db = open_database(readonly: true)
db.results_as_hash = true
params = []
@@ -30,8 +30,9 @@ module PotatoMesh
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.week_seconds
since_floor = node_ref ? 0 : min_rx_time
# Bulk positions follow the seven-day default; per-id lookups widen
# to twenty-eight days for backfill of historical track data.
since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds
since_threshold = normalize_since_threshold(since, floor: since_floor)
where_clauses << "COALESCE(rx_time, position_time, 0) >= ?"
params << since_threshold
@@ -91,9 +92,11 @@ module PotatoMesh
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.week_seconds
since_floor = node_ref ? 0 : min_rx_time
since_threshold = normalize_since_threshold(since, floor: since_floor)
# Neighbor relationships are reported sporadically and are easy to
# lose between scrapes, so use the twenty-eight-day extended window
# for both bulk and per-id queries.
min_rx_time = now - PotatoMesh::Config.four_weeks_seconds
since_threshold = normalize_since_threshold(since, floor: min_rx_time)
where_clauses << "COALESCE(rx_time, 0) >= ?"
params << since_threshold
@@ -141,7 +144,7 @@ module PotatoMesh
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.trace_neighbor_window_seconds
min_rx_time = now - PotatoMesh::Config.four_weeks_seconds
since_threshold = normalize_since_threshold(since, floor: min_rx_time)
where_clauses << "COALESCE(rx_time, 0) >= ?"
params << since_threshold
@@ -141,8 +141,10 @@ module PotatoMesh
db = open_database(readonly: true)
db.results_as_hash = true
now = Time.now.to_i
min_last_heard = now - PotatoMesh::Config.week_seconds
since_floor = node_ref ? 0 : min_last_heard
# Bulk listings stay on the seven-day window so the dashboard does not
# render stale nodes; per-id lookups widen to twenty-eight days so
# callers can backfill older records that fall outside the bulk floor.
since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds
since_threshold = normalize_since_threshold(since, floor: since_floor)
params = []
where_clauses = []
@@ -227,7 +229,10 @@ module PotatoMesh
db = open_database(readonly: true)
db.results_as_hash = true
now = Time.now.to_i
cutoff = now - PotatoMesh::Config.week_seconds
# Ingestor heartbeats are sparse (one per ingestor per cycle) so widen
# the rolling window to twenty-eight days to keep slow-tick ingestors
# visible in the federation overview.
cutoff = now - PotatoMesh::Config.four_weeks_seconds
since_threshold = normalize_since_threshold(since, floor: cutoff)
where_clauses = ["last_seen_time >= ?"]
params = [since_threshold]
@@ -30,8 +30,9 @@ module PotatoMesh
params = []
where_clauses = []
now = Time.now.to_i
min_rx_time = now - PotatoMesh::Config.week_seconds
since_floor = node_ref ? 0 : min_rx_time
# Bulk telemetry follows the seven-day default; per-id lookups widen
# to twenty-eight days so historical chart data remains reachable.
since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds
since_threshold = normalize_since_threshold(since, floor: since_floor)
where_clauses << "COALESCE(rx_time, telemetry_time, 0) >= ?"
params << since_threshold
+15 -3
View File
@@ -390,9 +390,21 @@ module PotatoMesh
halt 404 unless federation_enabled?
content_type :json
ensure_self_instance_record!
payload = load_instances_for_api
JSON.generate(payload)
# The federation banner is rendered on every page navigation, which
# caused this endpoint to fire ~7 times in a few seconds while the
# user clicked through the site. Cache the response (including the
# self-record refresh) for a short window so navigation feels free
# without delaying signature/peer updates by more than a few
# seconds. The dedicated announcer thread keeps the underlying
# record fresh on its own cadence regardless of cache hits.
priv = private_mode? ? 1 : 0
cached = PotatoMesh::App::ApiCache.fetch("api:instances:#{priv}", ttl_seconds: 30) do
ensure_self_instance_record!
JSON.generate(load_instances_for_api)
end
etag cached[:etag], kind: :weak
api_cache_control
cached[:value]
end
end
end
@@ -315,6 +315,10 @@ module PotatoMesh
db = open_database
upsert_instance_record(db, attributes, signature)
# Drop the cached /api/instances payload so the new peer becomes
# visible on the next dashboard refresh instead of after the TTL
# naturally expires.
PotatoMesh::App::ApiCache.invalidate_prefix("api:instances:")
enqueued = enqueue_federation_crawl(
attributes[:domain],
per_response_limit: PotatoMesh::Config.federation_max_instances_per_response,
+16 -3
View File
@@ -181,17 +181,30 @@ module PotatoMesh
DEFAULT_DB_BUSY_RETRY_DELAY
end
# Convenience constant describing the number of seconds in a week.
# Default rolling retention window in seconds.
#
# Used as the freshness floor for every "general" bulk read endpoint —
# nodes, messages, positions, telemetry, and the federation instance
# catalog — and as the freshness floor for federation/dashboard activity
# counters. Exceptions (sparse data) live on
# {#four_weeks_seconds}; per-id lookups also widen to the extended
# window so callers can backfill historical context for a single node.
#
# @return [Integer] seconds in seven days.
def week_seconds
7 * 24 * 60 * 60
end
# Rolling retention window in seconds for trace and neighbor API queries.
# Extended rolling retention window in seconds.
#
# Used as the default freshness floor for endpoints whose data is more
# fragile (traces, neighbors, ingestors) and as the floor for every
# +/api/.../:id+ lookup so callers can backfill historical records that
# would otherwise fall outside the seven-day default applied to bulk
# endpoints.
#
# @return [Integer] seconds in twenty-eight days.
def trace_neighbor_window_seconds
def four_weeks_seconds
28 * 24 * 60 * 60
end
@@ -17,7 +17,53 @@
import test from 'node:test';
import assert from 'node:assert/strict';
import { createMessageNodeHydrator } from '../message-node-hydrator.js';
import { createMessageNodeHydrator, MESSAGE_HYDRATION_CONCURRENCY } from '../message-node-hydrator.js';
/**
* Build a fetch double that records the maximum number of simultaneously
* pending lookups so tests can assert the worker-pool cap is honoured.
*
* @param {number} settleDelayMs Milliseconds to keep each lookup pending
* before resolving, giving sibling workers a chance to start.
* @returns {{
* fetchNodeById: (id: string) => Promise<object|null>,
* maxInFlight: () => number,
* totalCalls: () => number,
* }} Helper API exposing the recorded peak concurrency.
*/
function makeConcurrencyProbe(settleDelayMs = 10) {
let inFlight = 0;
let peak = 0;
let total = 0;
return {
async fetchNodeById(id) {
inFlight += 1;
total += 1;
peak = Math.max(peak, inFlight);
try {
await new Promise(resolve => setTimeout(resolve, settleDelayMs));
return { node_id: id, short_name: id.slice(1, 5) };
} finally {
inFlight -= 1;
}
},
maxInFlight: () => peak,
totalCalls: () => total,
};
}
/**
* Build N messages with unique sender identifiers for concurrency tests.
*
* @param {number} count Number of messages to produce.
* @returns {Array<object>} Synthetic message payloads.
*/
function makeUniqueSenderMessages(count) {
return Array.from({ length: count }, (_, index) => ({
from_id: `!sender${index.toString().padStart(4, '0')}`,
text: `m${index}`,
}));
}
/**
* Capture warning invocations produced during a test run.
@@ -78,6 +124,66 @@ test('hydrate fetches missing nodes once and caches the result', async () => {
assert.strictEqual(result[1].node, nodesById.get('!fetch'));
});
test('hydrate caches 404 results so subsequent calls do not refetch dead ids', async () => {
let fetchCalls = 0;
const hydrator = createMessageNodeHydrator({
fetchNodeById: async () => {
fetchCalls += 1;
return null;
},
applyNodeFallback: () => {},
});
const messages = [{ from_id: '!gone', text: 'first' }];
const nodesById = new Map();
await hydrator.hydrate(messages, nodesById);
await hydrator.hydrate([{ from_id: '!gone', text: 'second' }], nodesById);
await hydrator.hydrate([{ from_id: '!gone', text: 'third' }], nodesById);
assert.equal(fetchCalls, 1);
});
test('cached missing entry is overridden when nodesById later resolves the id', async () => {
let fetchCalls = 0;
const hydrator = createMessageNodeHydrator({
fetchNodeById: async () => {
fetchCalls += 1;
return null;
},
applyNodeFallback: () => {},
});
const nodesById = new Map();
await hydrator.hydrate([{ from_id: '!late', text: 'first' }], nodesById);
assert.equal(fetchCalls, 1);
// Bulk /api/nodes refresh resolves the id afterwards.
const lateNode = { node_id: '!late', short_name: 'Late' };
nodesById.set('!late', lateNode);
const result = await hydrator.hydrate([{ from_id: '!late', text: 'second' }], nodesById);
assert.equal(fetchCalls, 1);
assert.strictEqual(result[0].node, lateNode);
});
test('hydrate caches lookup failures alongside 404s', async () => {
let fetchCalls = 0;
const hydrator = createMessageNodeHydrator({
fetchNodeById: async () => {
fetchCalls += 1;
throw new Error('network down');
},
applyNodeFallback: () => {},
logger: { warn() {} },
});
const nodesById = new Map();
await hydrator.hydrate([{ from_id: '!flaky', text: 'a' }], nodesById);
await hydrator.hydrate([{ from_id: '!flaky', text: 'b' }], nodesById);
assert.equal(fetchCalls, 1);
});
test('hydrate falls back to placeholders when lookups fail', async () => {
const logger = new LoggerStub();
let fallbackCalls = 0;
@@ -121,3 +227,125 @@ test('hydrate records warning when fetch rejects', async () => {
assert.ok(logger.messages.length >= 1);
assert.equal(nodesById.has('!warn'), false);
});
test('hydrate caps in-flight lookups at the default concurrency', async () => {
const probe = makeConcurrencyProbe();
const hydrator = createMessageNodeHydrator({
fetchNodeById: probe.fetchNodeById,
applyNodeFallback: () => {},
});
const messages = makeUniqueSenderMessages(MESSAGE_HYDRATION_CONCURRENCY * 3);
await hydrator.hydrate(messages, new Map());
assert.equal(probe.totalCalls(), messages.length);
assert.ok(
probe.maxInFlight() <= MESSAGE_HYDRATION_CONCURRENCY,
`expected <= ${MESSAGE_HYDRATION_CONCURRENCY} concurrent fetches, observed ${probe.maxInFlight()}`,
);
});
test('hydrate honours a custom concurrency override', async () => {
const probe = makeConcurrencyProbe();
const hydrator = createMessageNodeHydrator({
fetchNodeById: probe.fetchNodeById,
applyNodeFallback: () => {},
concurrency: 2,
});
const messages = makeUniqueSenderMessages(8);
await hydrator.hydrate(messages, new Map());
assert.equal(probe.totalCalls(), 8);
assert.equal(probe.maxInFlight(), 2);
});
test('hydrate serialises lookups when concurrency is one', async () => {
const probe = makeConcurrencyProbe();
const hydrator = createMessageNodeHydrator({
fetchNodeById: probe.fetchNodeById,
applyNodeFallback: () => {},
concurrency: 1,
});
const messages = makeUniqueSenderMessages(4);
await hydrator.hydrate(messages, new Map());
assert.equal(probe.maxInFlight(), 1);
});
test('hydrate falls back to the default cap for invalid concurrency values', async () => {
for (const invalid of [0, -3, Number.NaN, Number.POSITIVE_INFINITY, 'four']) {
const probe = makeConcurrencyProbe();
const hydrator = createMessageNodeHydrator({
fetchNodeById: probe.fetchNodeById,
applyNodeFallback: () => {},
concurrency: invalid,
});
const messages = makeUniqueSenderMessages(MESSAGE_HYDRATION_CONCURRENCY * 2);
await hydrator.hydrate(messages, new Map());
assert.ok(
probe.maxInFlight() <= MESSAGE_HYDRATION_CONCURRENCY,
`concurrency=${String(invalid)} should fall back to default; observed peak ${probe.maxInFlight()}`,
);
}
});
test('factory rejects missing fetch and fallback dependencies', () => {
assert.throws(
() => createMessageNodeHydrator({ applyNodeFallback: () => {} }),
TypeError,
);
assert.throws(
() => createMessageNodeHydrator({ fetchNodeById: async () => null }),
TypeError,
);
});
test('hydrate skips non-object entries and senderless messages', async () => {
let fetchCalls = 0;
const hydrator = createMessageNodeHydrator({
fetchNodeById: async () => {
fetchCalls += 1;
return null;
},
applyNodeFallback: () => {},
});
const senderless = { text: 'no sender' };
const messages = [null, 'not-an-object', senderless];
const result = await hydrator.hydrate(messages, new Map());
assert.equal(fetchCalls, 0);
assert.equal(result.length, 3);
assert.strictEqual(senderless.node, null);
});
test('hydrate dedupes duplicate senders without exceeding the cap', async () => {
const probe = makeConcurrencyProbe();
const hydrator = createMessageNodeHydrator({
fetchNodeById: probe.fetchNodeById,
applyNodeFallback: () => {},
concurrency: 2,
});
// Twenty messages but only four unique senders. After the first lookup
// for a given sender resolves, ``resolveNode`` writes the result into the
// shared ``nodesById`` cache; every later message with the same id is
// bound synchronously from that cache before it ever reaches the worker
// pool, so the total fetch count collapses to the four unique senders.
// (The inflight-promise map only matters when two workers happen to race
// on the same id, which barely happens at concurrency=2 — the
// ``nodesById`` short-circuit is the dominant mechanism here.)
const senders = ['!aaa', '!bbb', '!ccc', '!ddd'];
const messages = Array.from({ length: 20 }, (_, index) => ({
from_id: senders[index % senders.length],
text: `dup${index}`,
}));
await hydrator.hydrate(messages, new Map());
assert.equal(probe.totalCalls(), senders.length);
assert.ok(probe.maxInFlight() <= 2);
});
@@ -14,29 +14,63 @@
* limitations under the License.
*/
/**
* Default upper bound for in-flight ``/api/nodes/:id`` lookups while the
* hydrator backfills sender metadata. Matches the worker-pool size used by
* ``node-page/role-index.js`` so a thundering herd of cold-load lookups
* cannot overwhelm the server.
*/
export const MESSAGE_HYDRATION_CONCURRENCY = 4;
/**
* Build a hydrator capable of attaching node metadata to chat messages.
*
* @param {{
* fetchNodeById: (nodeId: string) => Promise<object|null>,
* applyNodeFallback: (node: object) => void,
* logger?: { warn?: (message?: any, ...optionalParams: any[]) => void }
* }} options Factory configuration.
* logger?: { warn?: (message?: any, ...optionalParams: any[]) => void },
* concurrency?: number
* }} options Factory configuration. ``concurrency`` overrides the default
* worker-pool size and is primarily intended for unit tests; callers
* should leave it unset in production.
* @returns {{
* hydrate: (messages: Array<object>|null|undefined, nodesById: Map<string, object>) => Promise<Array<object>>
* }} Hydrator API.
*/
export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, logger = console }) {
export function createMessageNodeHydrator({
fetchNodeById,
applyNodeFallback,
logger = console,
concurrency = MESSAGE_HYDRATION_CONCURRENCY,
}) {
if (typeof fetchNodeById !== 'function') {
throw new TypeError('fetchNodeById must be a function');
}
if (typeof applyNodeFallback !== 'function') {
throw new TypeError('applyNodeFallback must be a function');
}
// Treat any non-positive or non-finite value as "fall back to default". This
// keeps the hydrator robust against accidental misconfiguration without
// degrading to unbounded parallelism.
const workerCap =
Number.isFinite(concurrency) && concurrency > 0
? Math.floor(concurrency)
: MESSAGE_HYDRATION_CONCURRENCY;
/** @type {Map<string, Promise<object|null>>} */
const inflightLookups = new Map();
// Negative-result cache shared across all ``hydrate()`` invocations on
// this hydrator instance. Without it, every refresh tick would re-issue
// ``/api/nodes/:id`` for senders that the server has already returned
// 404 for once — turning a single dead participant in a busy chat into a
// perpetual per-minute fetch. The set is consulted *after* the fresh
// ``nodesById`` lookup, so a node that registers later (and therefore
// appears in the bulk /api/nodes refresh) immediately wins over a stale
// missing entry without any explicit invalidation.
/** @type {Set<string>} */
const missingNodeIds = new Set();
/**
* Normalise potential node identifiers into canonical strings.
*
@@ -63,6 +97,9 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo
if (nodesById instanceof Map && nodesById.has(id)) {
return nodesById.get(id);
}
if (missingNodeIds.has(id)) {
return null;
}
if (inflightLookups.has(id)) {
return inflightLookups.get(id);
}
@@ -77,12 +114,14 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo
}
return node;
}
missingNodeIds.add(id);
return null;
})
.catch(error => {
if (logger && typeof logger.warn === 'function') {
logger.warn('message node lookup failed', { nodeId: id, error });
}
missingNodeIds.add(id);
return null;
})
.finally(() => {
@@ -96,6 +135,13 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo
/**
* Attach node information to the provided message collection.
*
* Messages whose sender is already in ``nodesById`` are bound synchronously
* and incur no network traffic. Misses are pushed onto a shared queue and
* drained by a fixed worker pool so the number of in-flight
* ``/api/nodes/:id`` requests never exceeds {@link workerCap}. This caps
* the cold-load thundering-herd that would otherwise issue one request per
* unique sender in parallel.
*
* @param {Array<object>|null|undefined} messages Message payloads from the API.
* @param {Map<string, object>} nodesById Lookup table of known nodes.
* @returns {Promise<Array<object>>} Hydrated message entries.
@@ -105,7 +151,7 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo
return Array.isArray(messages) ? messages : [];
}
const tasks = [];
const queue = [];
for (const message of messages) {
if (!message || typeof message !== 'object') {
continue;
@@ -127,22 +173,36 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo
continue;
}
const task = resolveNode(targetId, nodesById).then(node => {
if (node) {
message.node = node;
} else {
const placeholder = { node_id: targetId };
applyNodeFallback(placeholder);
message.node = placeholder;
}
});
tasks.push(task);
queue.push({ message, targetId });
}
if (tasks.length > 0) {
await Promise.all(tasks);
if (queue.length === 0) {
return messages;
}
// Workers share a monotonically advancing index instead of mutating the
// queue with ``shift()`` — ``Array#shift`` is O(n) and would turn a
// large hydration burst into O(n²). Single-threaded JS makes the
// post-increment atomic with respect to other workers, so no lock or
// existence check is needed.
let cursor = 0;
const workerCount = Math.min(workerCap, queue.length);
const workers = Array.from({ length: workerCount }, async () => {
while (cursor < queue.length) {
const entry = queue[cursor++];
// eslint-disable-next-line no-await-in-loop
const node = await resolveNode(entry.targetId, nodesById);
if (node) {
entry.message.node = node;
} else {
const placeholder = { node_id: entry.targetId };
applyNodeFallback(placeholder);
entry.message.node = placeholder;
}
}
});
await Promise.all(workers);
return messages;
}
+229 -13
View File
@@ -2852,6 +2852,77 @@ RSpec.describe "Potato Mesh Sinatra app" do
expect(last_response.status).to eq(404)
end
end
describe "response caching" do
it "serves a stable etag and returns 304 for matching If-None-Match" do
get "/api/instances"
expect(last_response).to be_ok
first_etag = last_response.headers["ETag"]
expect(first_etag).not_to be_nil
header "If-None-Match", first_etag
get "/api/instances"
expect(last_response.status).to eq(304)
end
it "returns the same response body for repeat requests within the TTL" do
# The bodies must be byte-identical: a fresh self-record sign would
# produce a different last_update_time / signature, so consistency
# confirms the cached entry is being reused.
get "/api/instances"
expect(last_response).to be_ok
first_body = last_response.body
first_etag = last_response.headers["ETag"]
get "/api/instances"
expect(last_response).to be_ok
expect(last_response.body).to eq(first_body)
expect(last_response.headers["ETag"]).to eq(first_etag)
end
it "invalidates the cache when a new peer registers" do
get "/api/instances"
expect(last_response).to be_ok
baseline_size = JSON.parse(last_response.body).length
# Build a valid peer registration that POST /api/instances accepts.
peer_key = OpenSSL::PKey::RSA.new(2048)
peer_attributes = {
id: "cache-peer",
domain: "cache-peer.example",
pubkey: peer_key.public_key.export,
name: "Cache Peer",
version: "1.0.0",
channel: "#peer",
frequency: "868MHz",
latitude: 50.0,
longitude: 8.0,
last_update_time: Time.now.to_i,
is_private: false,
}
peer_signature = Base64.strict_encode64(
peer_key.sign(
OpenSSL::Digest::SHA256.new,
canonical_instance_payload(peer_attributes),
),
)
with_db do |db|
upsert_instance_record(db, peer_attributes, peer_signature)
end
# The cache key is unaware of the direct DB insert, so explicitly
# invalidate to mirror what POST /api/instances does in production.
PotatoMesh::App::ApiCache.invalidate_prefix("api:instances:")
get "/api/instances"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
expect(payload.length).to eq(baseline_size + 1)
expect(payload.map { |row| row["id"] }).to include("cache-peer")
end
end
end
describe "POST /api/nodes" do
@@ -6329,11 +6400,12 @@ RSpec.describe "Potato Mesh Sinatra app" do
end
end
it "filters messages by the since parameter while defaulting to the full history" do
it "excludes messages older than seven days from collection queries while honouring since" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
stale_rx = now - (PotatoMesh::Config.week_seconds + 120)
backfillable_rx = now - (PotatoMesh::Config.week_seconds + 3 * 24 * 60 * 60)
fresh_rx = now - 15
with_db do |db|
@@ -6349,15 +6421,32 @@ RSpec.describe "Potato Mesh Sinatra app" do
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text, snr, rssi, hop_limit) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
[2, fresh_rx, Time.at(fresh_rx).utc.iso8601, "!fresh", "!old", 0, "TEXT_MESSAGE_APP", "fresh", 2.0, -60, 3],
)
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text, snr, rssi, hop_limit) VALUES(?,?,?,?,?,?,?,?,?,?,?)",
[3, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!old", "!fresh", 0, "TEXT_MESSAGE_APP", "backfill", 1.0, -75, 3],
)
end
# Bulk feed defaults to the seven-day window so stale conversations stop
# spamming the dashboard hydrator with backfill lookups.
get "/api/messages"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
ids = payload.map { |row| row["id"] }
expect(ids).to eq([2, 1])
expect(ids).to eq([2])
# Per-id lookups widen to twenty-eight days so callers can still backfill
# historical context for a specific conversation participant. The route
# matches messages where either ``from_id`` or ``to_id`` references the
# supplied identifier, so the fresh exchange to ``!old`` shows up too.
get "/api/messages/!old"
expect(last_response).to be_ok
scoped = JSON.parse(last_response.body)
expect(scoped.map { |row| row["id"] }).to contain_exactly(1, 2, 3)
# The since parameter overrides the floor when it is more restrictive.
get "/api/messages?since=#{fresh_rx}"
expect(last_response).to be_ok
@@ -6367,8 +6456,91 @@ RSpec.describe "Potato Mesh Sinatra app" do
get "/api/messages/!old?since=#{fresh_rx}"
expect(last_response).to be_ok
scoped = JSON.parse(last_response.body)
expect(scoped.map { |row| row["id"] }).to eq([2])
scoped_since = JSON.parse(last_response.body)
expect(scoped_since.map { |row| row["id"] }).to eq([2])
end
it "clamps an explicit since older than the seven-day floor up to the floor" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
stale_rx = now - (PotatoMesh::Config.week_seconds + 4 * 60 * 60)
fresh_rx = now - 30
explicit_since = now - (PotatoMesh::Config.week_seconds + 14 * 24 * 60 * 60)
with_db do |db|
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)",
[301, stale_rx, Time.at(stale_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "stale"],
)
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)",
[302, fresh_rx, Time.at(fresh_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "fresh"],
)
end
# The caller asked for "everything since three weeks ago" but the route
# silently clamps that up to the seven-day floor — the stale row stays
# excluded. This locks in the contract that ``since`` cannot widen the
# window past the floor.
get "/api/messages?since=#{explicit_since}"
expect(last_response).to be_ok
ids = JSON.parse(last_response.body).map { |row| row["id"] }
expect(ids).to eq([302])
expect(ids).not_to include(301)
end
it "clamps per-id since older than the twenty-eight-day floor up to that floor" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
ancient_rx = now - (PotatoMesh::Config.four_weeks_seconds + 6 * 60 * 60)
backfillable_rx = now - (PotatoMesh::Config.four_weeks_seconds - 60)
explicit_since = now - (PotatoMesh::Config.four_weeks_seconds + 30 * 24 * 60 * 60)
with_db do |db|
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)",
[401, ancient_rx, Time.at(ancient_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "ancient"],
)
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)",
[402, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "backfill"],
)
end
get "/api/messages/!a?since=#{explicit_since}"
expect(last_response).to be_ok
ids = JSON.parse(last_response.body).map { |row| row["id"] }
expect(ids).to eq([402])
expect(ids).not_to include(401)
end
it "excludes per-id messages older than the twenty-eight-day extended window" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
ancient_rx = now - (PotatoMesh::Config.four_weeks_seconds + 60)
backfillable_rx = now - (PotatoMesh::Config.four_weeks_seconds - 60)
with_db do |db|
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)",
[10, ancient_rx, Time.at(ancient_rx).utc.iso8601, "!old", "!fresh", 0, "TEXT_MESSAGE_APP", "ancient"],
)
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)",
[11, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!old", "!fresh", 0, "TEXT_MESSAGE_APP", "backfill"],
)
end
get "/api/messages/!old"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
expect(payload.map { |row| row["id"] }).to eq([11])
end
end
@@ -6580,11 +6752,12 @@ RSpec.describe "Potato Mesh Sinatra app" do
end
describe "GET /api/neighbors" do
it "excludes neighbor records older than twenty-eight days from collection queries" do
it "excludes neighbor records older than twenty-eight days from both bulk and per-id queries" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
stale_rx = now - (PotatoMesh::Config.trace_neighbor_window_seconds + 45)
stale_rx = now - (PotatoMesh::Config.four_weeks_seconds + 45)
backfillable_rx = now - (PotatoMesh::Config.four_weeks_seconds - 60)
fresh_rx = now - 10
with_db do |db|
@@ -6596,6 +6769,10 @@ RSpec.describe "Potato Mesh Sinatra app" do
"INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)",
["!neighbor-old", "oldn", "Neighbor Old", "TBEAM", "CLIENT", 0.0, fresh_rx, fresh_rx],
)
db.execute(
"INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)",
["!neighbor-mid", "midn", "Neighbor Mid", "TBEAM", "CLIENT", 0.0, fresh_rx, fresh_rx],
)
db.execute(
"INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)",
["!neighbor-new", "newn", "Neighbor New", "TBEAM", "CLIENT", 0.0, fresh_rx, fresh_rx],
@@ -6604,26 +6781,30 @@ RSpec.describe "Potato Mesh Sinatra app" do
"INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)",
["!root", "!neighbor-old", 1.0, stale_rx],
)
db.execute(
"INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)",
["!root", "!neighbor-mid", 4.0, backfillable_rx],
)
db.execute(
"INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)",
["!root", "!neighbor-new", 8.0, fresh_rx],
)
end
# Both bulk and per-id queries share the twenty-eight-day extended
# window — neighbours are reported sporadically and would otherwise be
# lost between scrapes.
get "/api/neighbors"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
expect(payload.length).to eq(1)
expect(payload.first["neighbor_id"]).to eq("!neighbor-new")
expect(payload.first["rx_time"]).to eq(fresh_rx)
expect(payload.map { |row| row["neighbor_id"] }).to eq(["!neighbor-new", "!neighbor-mid"])
get "/api/neighbors/!root"
expect(last_response).to be_ok
filtered = JSON.parse(last_response.body)
expect(filtered.length).to eq(2)
expect(filtered.map { |row| row["neighbor_id"] }).to eq(["!neighbor-new", "!neighbor-old"])
expect(filtered.map { |row| row["neighbor_id"] }).to eq(["!neighbor-new", "!neighbor-mid"])
end
it "honours the since parameter for neighbor queries" do
@@ -7172,8 +7353,8 @@ RSpec.describe "Potato Mesh Sinatra app" do
it "excludes traces older than twenty-eight days" do
clear_database
now = Time.now.to_i
recent_rx = now - (PotatoMesh::Config.trace_neighbor_window_seconds / 2)
stale_rx = now - (PotatoMesh::Config.trace_neighbor_window_seconds + 60)
recent_rx = now - (PotatoMesh::Config.four_weeks_seconds / 2)
stale_rx = now - (PotatoMesh::Config.four_weeks_seconds + 60)
payload = [
{ "id" => 50_001, "src" => 1, "dest" => 2, "rx_time" => recent_rx, "metrics" => {} },
{ "id" => 50_002, "src" => 3, "dest" => 4, "rx_time" => stale_rx, "metrics" => {} },
@@ -7221,6 +7402,41 @@ RSpec.describe "Potato Mesh Sinatra app" do
end
end
describe "GET /api/ingestors" do
it "uses the twenty-eight-day extended window so slow-tick ingestors stay visible" do
clear_database
allow(Time).to receive(:now).and_return(reference_time)
now = reference_time.to_i
stale_seen = now - (PotatoMesh::Config.four_weeks_seconds + 30)
backfillable_seen = now - (PotatoMesh::Config.week_seconds + 24 * 60 * 60)
fresh_seen = now - 60
with_db do |db|
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version, protocol) VALUES(?,?,?,?,?)",
["!stale-ing", stale_seen - 60, stale_seen, "0.6.3", "meshtastic"],
)
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version, protocol) VALUES(?,?,?,?,?)",
["!slow-ing", backfillable_seen - 60, backfillable_seen, "0.6.3", "meshtastic"],
)
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version, protocol) VALUES(?,?,?,?,?)",
["!fresh-ing", fresh_seen - 60, fresh_seen, "0.6.3", "meshtastic"],
)
end
get "/api/ingestors"
expect(last_response).to be_ok
payload = JSON.parse(last_response.body)
ids = payload.map { |row| row["node_id"] }
# The eight-day-old ingestor is included (was excluded under the seven-day
# default) while the twenty-eight-day-old one stays out.
expect(ids).to contain_exactly("!fresh-ing", "!slow-ing")
end
end
describe "GET /nodes/:id" do
before do
import_nodes_fixture
+1 -1
View File
@@ -144,7 +144,7 @@ RSpec.describe PotatoMesh::App::DataProcessing do
allow(PotatoMesh::Config).to receive(:db_path).and_return(db_path)
allow(PotatoMesh::Config).to receive(:db_busy_timeout_ms).and_return(5000)
allow(PotatoMesh::Config).to receive(:week_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:trace_neighbor_window_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:debug?).and_return(false)
db_helper = Object.new.extend(PotatoMesh::App::Database)
db_helper.init_db
+50
View File
@@ -2317,6 +2317,10 @@ RSpec.describe PotatoMesh::App::Federation do
describe ".ensure_self_instance_record! (skip branch)" do
let(:application_class) { PotatoMesh::Application }
before do
PotatoMesh::App::Federation.reset_self_registration_log_state!
end
it "logs without writing when registration is not allowed" do
attributes = { id: "self", domain: "self.mesh" }
allow(application_class).to receive(:self_instance_attributes).and_return(attributes)
@@ -2337,6 +2341,52 @@ RSpec.describe PotatoMesh::App::Federation do
hash_including(reason: "blocked"),
)
end
it "deduplicates the skip log when the decision does not change" do
attributes = { id: "self", domain: "self.mesh" }
allow(application_class).to receive(:self_instance_attributes).and_return(attributes)
allow(application_class).to receive(:sign_instance_attributes).and_return("sig")
allow(application_class).to receive(:self_instance_registration_decision).and_return([false, "blocked"])
allow(application_class).to receive(:debug_log)
3.times { application_class.ensure_self_instance_record! }
expect(application_class).to have_received(:debug_log).with(
"Skipped self instance registration",
hash_including(reason: "blocked"),
).once
end
it "re-emits the log when the decision flips between calls" do
attributes = { id: "self", domain: "self.mesh" }
allow(application_class).to receive(:self_instance_attributes).and_return(attributes)
allow(application_class).to receive(:sign_instance_attributes).and_return("sig")
allow(application_class).to receive(:open_database)
allow(application_class).to receive(:upsert_instance_record)
allow(application_class).to receive(:debug_log)
decisions = [
[false, "blocked"],
[false, "blocked"],
[true, nil],
[true, nil],
[false, "blocked"],
]
allow(application_class).to receive(:self_instance_registration_decision) do
decisions.shift
end
5.times { application_class.ensure_self_instance_record! }
expect(application_class).to have_received(:debug_log).with(
"Skipped self instance registration",
hash_including(reason: "blocked"),
).twice
expect(application_class).to have_received(:debug_log).with(
"Registered self instance record",
hash_including(domain: "self.mesh"),
).once
end
end
describe ".verify_instance_signature (failure path)" do
+11 -3
View File
@@ -128,16 +128,24 @@ RSpec.describe "Ingestor endpoints" do
end
describe "GET /api/ingestors" do
it "returns recent ingestors and omits stale rows" do
it "returns recent ingestors and omits rows older than the twenty-eight-day window" do
now = Time.now.to_i
with_db do |db|
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version) VALUES(?,?,?,?)",
["!fresh000", now - 100, now - 10, "0.5.12"],
)
# Eight-day-old heartbeats now stay visible: ingestors are sparse and
# the bulk endpoint widens to four weeks so slow-tick nodes do not
# disappear between scrapes.
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version) VALUES(?,?,?,?)",
["!stale000", now - (9 * 24 * 60 * 60), now - (9 * 24 * 60 * 60), "0.5.6"],
["!sparse00", now - (9 * 24 * 60 * 60), now - (8 * 24 * 60 * 60), "0.5.6"],
)
# Beyond the four-week floor — must be excluded.
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version) VALUES(?,?,?,?)",
["!stale000", now - (30 * 24 * 60 * 60), now - (30 * 24 * 60 * 60), "0.5.0"],
)
db.execute(
"INSERT INTO ingestors(node_id, start_time, last_seen_time, version, lora_freq, modem_preset) VALUES(?,?,?,?,?,?)",
@@ -151,7 +159,7 @@ RSpec.describe "Ingestor endpoints" do
payload = JSON.parse(last_response.body)
expect(payload).to all(include("node_id", "start_time", "last_seen_time", "version"))
node_ids = payload.map { |entry| entry["node_id"] }
expect(node_ids).to include("!fresh000")
expect(node_ids).to include("!fresh000", "!sparse00")
expect(node_ids).not_to include("!stale000")
rich = payload.find { |row| row["node_id"] == "!rich000" }
expect(rich["lora_freq"]).to eq(915)
+1 -1
View File
@@ -72,7 +72,7 @@ RSpec.describe PotatoMesh::App::Prometheus do
allow(PotatoMesh::Config).to receive(:db_path).and_return(db_path)
allow(PotatoMesh::Config).to receive(:db_busy_timeout_ms).and_return(5000)
allow(PotatoMesh::Config).to receive(:week_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:trace_neighbor_window_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:debug?).and_return(false)
db_helper = Object.new.extend(PotatoMesh::App::Database)
db_helper.init_db
+31 -1
View File
@@ -358,7 +358,7 @@ RSpec.describe PotatoMesh::App::Queries do
allow(PotatoMesh::Config).to receive(:db_path).and_return(db_path)
allow(PotatoMesh::Config).to receive(:db_busy_timeout_ms).and_return(5000)
allow(PotatoMesh::Config).to receive(:week_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:trace_neighbor_window_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(604_800)
allow(PotatoMesh::Config).to receive(:debug?).and_return(false)
# Initialise schema so query methods can execute real SQL.
@@ -565,6 +565,36 @@ RSpec.describe PotatoMesh::App::Queries do
expect(texts).to include("hello")
expect(texts).not_to include("other message")
end
it "applies a seven-day floor for bulk queries and twenty-eight days for per-id" do
# Re-stub the windows with their real ratio so the bulk-vs-per-id
# distinction is observable from a single test.
allow(PotatoMesh::Config).to receive(:week_seconds).and_return(7 * 24 * 60 * 60)
allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(28 * 24 * 60 * 60)
stale_rx = now - (7 * 24 * 60 * 60 + 60)
backfillable_rx = now - (28 * 24 * 60 * 60 - 60)
with_db do |db|
rx_iso = Time.at(stale_rx).utc.iso8601
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, text) VALUES (?,?,?,?,?,?,?)",
[101, stale_rx, rx_iso, "!aabbccdd", "!ffffffff", 0, "stale"],
)
db.execute(
"INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, text) VALUES (?,?,?,?,?,?,?)",
[102, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!aabbccdd", "!ffffffff", 0, "backfill"],
)
end
bulk_ids = queries.query_messages(50).map { |r| r["id"] }
expect(bulk_ids).not_to include(101)
expect(bulk_ids).not_to include(102)
scoped_ids = queries.query_messages(50, node_ref: "!aabbccdd").map { |r| r["id"] }
expect(scoped_ids).to include(101)
expect(scoped_ids).to include(102)
end
end
describe "#query_telemetry" do