diff --git a/data/mesh_ingestor/CONTRACTS.md b/data/mesh_ingestor/CONTRACTS.md index 4e7a2bb..86ae63e 100644 --- a/data/mesh_ingestor/CONTRACTS.md +++ b/data/mesh_ingestor/CONTRACTS.md @@ -145,3 +145,25 @@ Heartbeat payload: All collection GET endpoints (`/api/nodes`, `/api/messages`, `/api/positions`, `/api/telemetry`, `/api/traces`, `/api/neighbors`, `/api/ingestors`) accept an optional `?protocol=` query parameter. When present, only records whose `protocol` column matches the given value are returned. The `protocol` field is included in all GET responses. +### GET endpoint time windows + +Every read endpoint enforces a server-side rolling-window floor on the data it returns. The window is fixed per route and **cannot be widened by the caller** — explicit `?since=` is treated as `MAX(since, floor)`, so a `since` older than the floor is silently clamped to the floor. Pass a `since` newer than the floor when you want to be more restrictive (incremental refresh). + +| Route | Floor (default) | Notes | +| --- | --- | --- | +| `GET /api/nodes` | 7 days | filtered by `nodes.last_heard` | +| `GET /api/messages` | 7 days | filtered by `messages.rx_time` | +| `GET /api/positions` | 7 days | filtered by `COALESCE(rx_time, position_time)` | +| `GET /api/telemetry` | 7 days | filtered by `COALESCE(rx_time, telemetry_time)` | +| `GET /api/instances` | 7 days | filtered by `instances.last_update_time` | +| `GET /api/neighbors` | **28 days** | sparse data; widened to keep slow scrapes visible | +| `GET /api/traces` | **28 days** | sparse data; same rationale | +| `GET /api/ingestors` | **28 days** | sparse heartbeats; same rationale | +| `GET /api/.../:id` (per-id lookup) | **28 days** | every per-id route uses the extended window so callers can backfill historical context for a specific node/conversation that has dropped out of the bulk view. The `since` clamp still applies. | +| `GET /api/telemetry/aggregated` | caller-controlled | `?windowSeconds=` is mandatory; defaults to 86 400 (1 day). Bounded by `MAX_QUERY_LIMIT` on bucket count, not by a hard floor. | +| `GET /api/stats` | n/a | reports counts at fixed `hour`/`day`/`week`/`month` activity buckets. | + +Federation peers should not assume an unbounded historical window: a peer that requests `/api/messages?since=0` from a partner expecting "everything" will only ever receive the last seven days. To pull older state, request the per-id endpoint (28 days) for the relevant nodes. + +The constants live in `web/lib/potato_mesh/config.rb` (`week_seconds`, `four_weeks_seconds`). + diff --git a/web/lib/potato_mesh/application/federation/self_instance.rb b/web/lib/potato_mesh/application/federation/self_instance.rb index 5f8662f..d1770f9 100644 --- a/web/lib/potato_mesh/application/federation/self_instance.rb +++ b/web/lib/potato_mesh/application/federation/self_instance.rb @@ -17,6 +17,29 @@ module PotatoMesh module App module Federation + # Process-wide memo for the most recently emitted self-registration + # decision. Sinatra spins up a fresh app instance per request so a + # plain instance variable would not survive across calls; storing the + # state on the module itself keeps the dedupe stable for the lifetime + # of the worker process. + @self_registration_log_state = { mutex: Mutex.new, last: nil } + + # Accessor for the dedupe state used by {#ensure_self_instance_record!}. + # + # @return [Hash{Symbol => Object}] mutable state hash holding +:mutex+ and +:last+. + def self.self_registration_log_state + @self_registration_log_state + end + + # Reset the dedupe memo. Intended for tests; production code never + # needs to clear the state because each process starts fresh. + # + # @return [void] + def self.reset_self_registration_log_state! + state = @self_registration_log_state + state[:mutex].synchronize { state[:last] = nil } + end + # Resolve the canonical domain for the running instance. # # @return [String, nil] sanitized instance domain or nil outside production. @@ -137,16 +160,30 @@ module PotatoMesh signature = sign_instance_attributes(attributes) db = nil allowed, reason = self_instance_registration_decision(attributes[:domain]) + # Decisions are stable per process while INSTANCE_DOMAIN_SOURCE + # remains the same — without dedupe, the federation banner on every + # page navigation produced one log line apiece. Only emit when the + # tuple changes so operators still see the first decision (and any + # later flip) without the spam. + sentinel = [allowed, reason, attributes[:domain]] + state = PotatoMesh::App::Federation.self_registration_log_state + should_log = state[:mutex].synchronize do + changed = state[:last] != sentinel + state[:last] = sentinel if changed + changed + end if allowed db = open_database upsert_instance_record(db, attributes, signature) - debug_log( - "Registered self instance record", - context: "federation.instances", - domain: attributes[:domain], - instance_id: attributes[:id], - ) - else + if should_log + debug_log( + "Registered self instance record", + context: "federation.instances", + domain: attributes[:domain], + instance_id: attributes[:id], + ) + end + elsif should_log debug_log( "Skipped self instance registration", context: "federation.instances", diff --git a/web/lib/potato_mesh/application/queries/chat_queries.rb b/web/lib/potato_mesh/application/queries/chat_queries.rb index 16c1cc3..587f4f1 100644 --- a/web/lib/potato_mesh/application/queries/chat_queries.rb +++ b/web/lib/potato_mesh/application/queries/chat_queries.rb @@ -26,7 +26,12 @@ module PotatoMesh # @return [Array] compacted message rows safe for API responses. def query_messages(limit, node_ref: nil, include_encrypted: false, since: 0, protocol: nil) limit = coerce_query_limit(limit) - since_threshold = normalize_since_threshold(since, floor: 0) + now = Time.now.to_i + # Default the chat feed to the same seven-day window the dashboard uses + # for the node table; per-id lookups widen to twenty-eight days so + # historical conversation context remains reachable on demand. + since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds + since_threshold = normalize_since_threshold(since, floor: since_floor) db = open_database(readonly: true) db.results_as_hash = true params = [] diff --git a/web/lib/potato_mesh/application/queries/federation_queries.rb b/web/lib/potato_mesh/application/queries/federation_queries.rb index b305777..e20c976 100644 --- a/web/lib/potato_mesh/application/queries/federation_queries.rb +++ b/web/lib/potato_mesh/application/queries/federation_queries.rb @@ -30,8 +30,9 @@ module PotatoMesh params = [] where_clauses = [] now = Time.now.to_i - min_rx_time = now - PotatoMesh::Config.week_seconds - since_floor = node_ref ? 0 : min_rx_time + # Bulk positions follow the seven-day default; per-id lookups widen + # to twenty-eight days for backfill of historical track data. + since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds since_threshold = normalize_since_threshold(since, floor: since_floor) where_clauses << "COALESCE(rx_time, position_time, 0) >= ?" params << since_threshold @@ -91,9 +92,11 @@ module PotatoMesh params = [] where_clauses = [] now = Time.now.to_i - min_rx_time = now - PotatoMesh::Config.week_seconds - since_floor = node_ref ? 0 : min_rx_time - since_threshold = normalize_since_threshold(since, floor: since_floor) + # Neighbor relationships are reported sporadically and are easy to + # lose between scrapes, so use the twenty-eight-day extended window + # for both bulk and per-id queries. + min_rx_time = now - PotatoMesh::Config.four_weeks_seconds + since_threshold = normalize_since_threshold(since, floor: min_rx_time) where_clauses << "COALESCE(rx_time, 0) >= ?" params << since_threshold @@ -141,7 +144,7 @@ module PotatoMesh params = [] where_clauses = [] now = Time.now.to_i - min_rx_time = now - PotatoMesh::Config.trace_neighbor_window_seconds + min_rx_time = now - PotatoMesh::Config.four_weeks_seconds since_threshold = normalize_since_threshold(since, floor: min_rx_time) where_clauses << "COALESCE(rx_time, 0) >= ?" params << since_threshold diff --git a/web/lib/potato_mesh/application/queries/node_queries.rb b/web/lib/potato_mesh/application/queries/node_queries.rb index 7032f51..3028f44 100644 --- a/web/lib/potato_mesh/application/queries/node_queries.rb +++ b/web/lib/potato_mesh/application/queries/node_queries.rb @@ -141,8 +141,10 @@ module PotatoMesh db = open_database(readonly: true) db.results_as_hash = true now = Time.now.to_i - min_last_heard = now - PotatoMesh::Config.week_seconds - since_floor = node_ref ? 0 : min_last_heard + # Bulk listings stay on the seven-day window so the dashboard does not + # render stale nodes; per-id lookups widen to twenty-eight days so + # callers can backfill older records that fall outside the bulk floor. + since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds since_threshold = normalize_since_threshold(since, floor: since_floor) params = [] where_clauses = [] @@ -227,7 +229,10 @@ module PotatoMesh db = open_database(readonly: true) db.results_as_hash = true now = Time.now.to_i - cutoff = now - PotatoMesh::Config.week_seconds + # Ingestor heartbeats are sparse (one per ingestor per cycle) so widen + # the rolling window to twenty-eight days to keep slow-tick ingestors + # visible in the federation overview. + cutoff = now - PotatoMesh::Config.four_weeks_seconds since_threshold = normalize_since_threshold(since, floor: cutoff) where_clauses = ["last_seen_time >= ?"] params = [since_threshold] diff --git a/web/lib/potato_mesh/application/queries/telemetry_queries.rb b/web/lib/potato_mesh/application/queries/telemetry_queries.rb index 3c2beb7..01d8c3a 100644 --- a/web/lib/potato_mesh/application/queries/telemetry_queries.rb +++ b/web/lib/potato_mesh/application/queries/telemetry_queries.rb @@ -30,8 +30,9 @@ module PotatoMesh params = [] where_clauses = [] now = Time.now.to_i - min_rx_time = now - PotatoMesh::Config.week_seconds - since_floor = node_ref ? 0 : min_rx_time + # Bulk telemetry follows the seven-day default; per-id lookups widen + # to twenty-eight days so historical chart data remains reachable. + since_floor = node_ref ? now - PotatoMesh::Config.four_weeks_seconds : now - PotatoMesh::Config.week_seconds since_threshold = normalize_since_threshold(since, floor: since_floor) where_clauses << "COALESCE(rx_time, telemetry_time, 0) >= ?" params << since_threshold diff --git a/web/lib/potato_mesh/application/routes/api.rb b/web/lib/potato_mesh/application/routes/api.rb index 7b2c91c..b503b31 100644 --- a/web/lib/potato_mesh/application/routes/api.rb +++ b/web/lib/potato_mesh/application/routes/api.rb @@ -390,9 +390,21 @@ module PotatoMesh halt 404 unless federation_enabled? content_type :json - ensure_self_instance_record! - payload = load_instances_for_api - JSON.generate(payload) + # The federation banner is rendered on every page navigation, which + # caused this endpoint to fire ~7 times in a few seconds while the + # user clicked through the site. Cache the response (including the + # self-record refresh) for a short window so navigation feels free + # without delaying signature/peer updates by more than a few + # seconds. The dedicated announcer thread keeps the underlying + # record fresh on its own cadence regardless of cache hits. + priv = private_mode? ? 1 : 0 + cached = PotatoMesh::App::ApiCache.fetch("api:instances:#{priv}", ttl_seconds: 30) do + ensure_self_instance_record! + JSON.generate(load_instances_for_api) + end + etag cached[:etag], kind: :weak + api_cache_control + cached[:value] end end end diff --git a/web/lib/potato_mesh/application/routes/ingest.rb b/web/lib/potato_mesh/application/routes/ingest.rb index 7f4f02f..23e84ab 100644 --- a/web/lib/potato_mesh/application/routes/ingest.rb +++ b/web/lib/potato_mesh/application/routes/ingest.rb @@ -315,6 +315,10 @@ module PotatoMesh db = open_database upsert_instance_record(db, attributes, signature) + # Drop the cached /api/instances payload so the new peer becomes + # visible on the next dashboard refresh instead of after the TTL + # naturally expires. + PotatoMesh::App::ApiCache.invalidate_prefix("api:instances:") enqueued = enqueue_federation_crawl( attributes[:domain], per_response_limit: PotatoMesh::Config.federation_max_instances_per_response, diff --git a/web/lib/potato_mesh/config.rb b/web/lib/potato_mesh/config.rb index 5f9b791..f84a14a 100644 --- a/web/lib/potato_mesh/config.rb +++ b/web/lib/potato_mesh/config.rb @@ -181,17 +181,30 @@ module PotatoMesh DEFAULT_DB_BUSY_RETRY_DELAY end - # Convenience constant describing the number of seconds in a week. + # Default rolling retention window in seconds. + # + # Used as the freshness floor for every "general" bulk read endpoint — + # nodes, messages, positions, telemetry, and the federation instance + # catalog — and as the freshness floor for federation/dashboard activity + # counters. Exceptions (sparse data) live on + # {#four_weeks_seconds}; per-id lookups also widen to the extended + # window so callers can backfill historical context for a single node. # # @return [Integer] seconds in seven days. def week_seconds 7 * 24 * 60 * 60 end - # Rolling retention window in seconds for trace and neighbor API queries. + # Extended rolling retention window in seconds. + # + # Used as the default freshness floor for endpoints whose data is more + # fragile (traces, neighbors, ingestors) and as the floor for every + # +/api/.../:id+ lookup so callers can backfill historical records that + # would otherwise fall outside the seven-day default applied to bulk + # endpoints. # # @return [Integer] seconds in twenty-eight days. - def trace_neighbor_window_seconds + def four_weeks_seconds 28 * 24 * 60 * 60 end diff --git a/web/public/assets/js/app/__tests__/message-node-hydrator.test.js b/web/public/assets/js/app/__tests__/message-node-hydrator.test.js index 6dd5f13..ce5907a 100644 --- a/web/public/assets/js/app/__tests__/message-node-hydrator.test.js +++ b/web/public/assets/js/app/__tests__/message-node-hydrator.test.js @@ -17,7 +17,53 @@ import test from 'node:test'; import assert from 'node:assert/strict'; -import { createMessageNodeHydrator } from '../message-node-hydrator.js'; +import { createMessageNodeHydrator, MESSAGE_HYDRATION_CONCURRENCY } from '../message-node-hydrator.js'; + +/** + * Build a fetch double that records the maximum number of simultaneously + * pending lookups so tests can assert the worker-pool cap is honoured. + * + * @param {number} settleDelayMs Milliseconds to keep each lookup pending + * before resolving, giving sibling workers a chance to start. + * @returns {{ + * fetchNodeById: (id: string) => Promise, + * maxInFlight: () => number, + * totalCalls: () => number, + * }} Helper API exposing the recorded peak concurrency. + */ +function makeConcurrencyProbe(settleDelayMs = 10) { + let inFlight = 0; + let peak = 0; + let total = 0; + return { + async fetchNodeById(id) { + inFlight += 1; + total += 1; + peak = Math.max(peak, inFlight); + try { + await new Promise(resolve => setTimeout(resolve, settleDelayMs)); + return { node_id: id, short_name: id.slice(1, 5) }; + } finally { + inFlight -= 1; + } + }, + maxInFlight: () => peak, + totalCalls: () => total, + }; +} + +/** + * Build N messages with unique sender identifiers for concurrency tests. + * + * @param {number} count Number of messages to produce. + * @returns {Array} Synthetic message payloads. + */ +function makeUniqueSenderMessages(count) { + return Array.from({ length: count }, (_, index) => ({ + from_id: `!sender${index.toString().padStart(4, '0')}`, + text: `m${index}`, + })); +} /** * Capture warning invocations produced during a test run. @@ -78,6 +124,66 @@ test('hydrate fetches missing nodes once and caches the result', async () => { assert.strictEqual(result[1].node, nodesById.get('!fetch')); }); +test('hydrate caches 404 results so subsequent calls do not refetch dead ids', async () => { + let fetchCalls = 0; + const hydrator = createMessageNodeHydrator({ + fetchNodeById: async () => { + fetchCalls += 1; + return null; + }, + applyNodeFallback: () => {}, + }); + const messages = [{ from_id: '!gone', text: 'first' }]; + const nodesById = new Map(); + + await hydrator.hydrate(messages, nodesById); + await hydrator.hydrate([{ from_id: '!gone', text: 'second' }], nodesById); + await hydrator.hydrate([{ from_id: '!gone', text: 'third' }], nodesById); + + assert.equal(fetchCalls, 1); +}); + +test('cached missing entry is overridden when nodesById later resolves the id', async () => { + let fetchCalls = 0; + const hydrator = createMessageNodeHydrator({ + fetchNodeById: async () => { + fetchCalls += 1; + return null; + }, + applyNodeFallback: () => {}, + }); + const nodesById = new Map(); + + await hydrator.hydrate([{ from_id: '!late', text: 'first' }], nodesById); + assert.equal(fetchCalls, 1); + + // Bulk /api/nodes refresh resolves the id afterwards. + const lateNode = { node_id: '!late', short_name: 'Late' }; + nodesById.set('!late', lateNode); + + const result = await hydrator.hydrate([{ from_id: '!late', text: 'second' }], nodesById); + assert.equal(fetchCalls, 1); + assert.strictEqual(result[0].node, lateNode); +}); + +test('hydrate caches lookup failures alongside 404s', async () => { + let fetchCalls = 0; + const hydrator = createMessageNodeHydrator({ + fetchNodeById: async () => { + fetchCalls += 1; + throw new Error('network down'); + }, + applyNodeFallback: () => {}, + logger: { warn() {} }, + }); + const nodesById = new Map(); + + await hydrator.hydrate([{ from_id: '!flaky', text: 'a' }], nodesById); + await hydrator.hydrate([{ from_id: '!flaky', text: 'b' }], nodesById); + + assert.equal(fetchCalls, 1); +}); + test('hydrate falls back to placeholders when lookups fail', async () => { const logger = new LoggerStub(); let fallbackCalls = 0; @@ -121,3 +227,125 @@ test('hydrate records warning when fetch rejects', async () => { assert.ok(logger.messages.length >= 1); assert.equal(nodesById.has('!warn'), false); }); + +test('hydrate caps in-flight lookups at the default concurrency', async () => { + const probe = makeConcurrencyProbe(); + const hydrator = createMessageNodeHydrator({ + fetchNodeById: probe.fetchNodeById, + applyNodeFallback: () => {}, + }); + const messages = makeUniqueSenderMessages(MESSAGE_HYDRATION_CONCURRENCY * 3); + + await hydrator.hydrate(messages, new Map()); + + assert.equal(probe.totalCalls(), messages.length); + assert.ok( + probe.maxInFlight() <= MESSAGE_HYDRATION_CONCURRENCY, + `expected <= ${MESSAGE_HYDRATION_CONCURRENCY} concurrent fetches, observed ${probe.maxInFlight()}`, + ); +}); + +test('hydrate honours a custom concurrency override', async () => { + const probe = makeConcurrencyProbe(); + const hydrator = createMessageNodeHydrator({ + fetchNodeById: probe.fetchNodeById, + applyNodeFallback: () => {}, + concurrency: 2, + }); + const messages = makeUniqueSenderMessages(8); + + await hydrator.hydrate(messages, new Map()); + + assert.equal(probe.totalCalls(), 8); + assert.equal(probe.maxInFlight(), 2); +}); + +test('hydrate serialises lookups when concurrency is one', async () => { + const probe = makeConcurrencyProbe(); + const hydrator = createMessageNodeHydrator({ + fetchNodeById: probe.fetchNodeById, + applyNodeFallback: () => {}, + concurrency: 1, + }); + const messages = makeUniqueSenderMessages(4); + + await hydrator.hydrate(messages, new Map()); + + assert.equal(probe.maxInFlight(), 1); +}); + +test('hydrate falls back to the default cap for invalid concurrency values', async () => { + for (const invalid of [0, -3, Number.NaN, Number.POSITIVE_INFINITY, 'four']) { + const probe = makeConcurrencyProbe(); + const hydrator = createMessageNodeHydrator({ + fetchNodeById: probe.fetchNodeById, + applyNodeFallback: () => {}, + concurrency: invalid, + }); + const messages = makeUniqueSenderMessages(MESSAGE_HYDRATION_CONCURRENCY * 2); + + await hydrator.hydrate(messages, new Map()); + + assert.ok( + probe.maxInFlight() <= MESSAGE_HYDRATION_CONCURRENCY, + `concurrency=${String(invalid)} should fall back to default; observed peak ${probe.maxInFlight()}`, + ); + } +}); + +test('factory rejects missing fetch and fallback dependencies', () => { + assert.throws( + () => createMessageNodeHydrator({ applyNodeFallback: () => {} }), + TypeError, + ); + assert.throws( + () => createMessageNodeHydrator({ fetchNodeById: async () => null }), + TypeError, + ); +}); + +test('hydrate skips non-object entries and senderless messages', async () => { + let fetchCalls = 0; + const hydrator = createMessageNodeHydrator({ + fetchNodeById: async () => { + fetchCalls += 1; + return null; + }, + applyNodeFallback: () => {}, + }); + const senderless = { text: 'no sender' }; + const messages = [null, 'not-an-object', senderless]; + + const result = await hydrator.hydrate(messages, new Map()); + + assert.equal(fetchCalls, 0); + assert.equal(result.length, 3); + assert.strictEqual(senderless.node, null); +}); + +test('hydrate dedupes duplicate senders without exceeding the cap', async () => { + const probe = makeConcurrencyProbe(); + const hydrator = createMessageNodeHydrator({ + fetchNodeById: probe.fetchNodeById, + applyNodeFallback: () => {}, + concurrency: 2, + }); + // Twenty messages but only four unique senders. After the first lookup + // for a given sender resolves, ``resolveNode`` writes the result into the + // shared ``nodesById`` cache; every later message with the same id is + // bound synchronously from that cache before it ever reaches the worker + // pool, so the total fetch count collapses to the four unique senders. + // (The inflight-promise map only matters when two workers happen to race + // on the same id, which barely happens at concurrency=2 — the + // ``nodesById`` short-circuit is the dominant mechanism here.) + const senders = ['!aaa', '!bbb', '!ccc', '!ddd']; + const messages = Array.from({ length: 20 }, (_, index) => ({ + from_id: senders[index % senders.length], + text: `dup${index}`, + })); + + await hydrator.hydrate(messages, new Map()); + + assert.equal(probe.totalCalls(), senders.length); + assert.ok(probe.maxInFlight() <= 2); +}); diff --git a/web/public/assets/js/app/message-node-hydrator.js b/web/public/assets/js/app/message-node-hydrator.js index 0186710..462f060 100644 --- a/web/public/assets/js/app/message-node-hydrator.js +++ b/web/public/assets/js/app/message-node-hydrator.js @@ -14,29 +14,63 @@ * limitations under the License. */ +/** + * Default upper bound for in-flight ``/api/nodes/:id`` lookups while the + * hydrator backfills sender metadata. Matches the worker-pool size used by + * ``node-page/role-index.js`` so a thundering herd of cold-load lookups + * cannot overwhelm the server. + */ +export const MESSAGE_HYDRATION_CONCURRENCY = 4; + /** * Build a hydrator capable of attaching node metadata to chat messages. * * @param {{ * fetchNodeById: (nodeId: string) => Promise, * applyNodeFallback: (node: object) => void, - * logger?: { warn?: (message?: any, ...optionalParams: any[]) => void } - * }} options Factory configuration. + * logger?: { warn?: (message?: any, ...optionalParams: any[]) => void }, + * concurrency?: number + * }} options Factory configuration. ``concurrency`` overrides the default + * worker-pool size and is primarily intended for unit tests; callers + * should leave it unset in production. * @returns {{ * hydrate: (messages: Array|null|undefined, nodesById: Map) => Promise> * }} Hydrator API. */ -export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, logger = console }) { +export function createMessageNodeHydrator({ + fetchNodeById, + applyNodeFallback, + logger = console, + concurrency = MESSAGE_HYDRATION_CONCURRENCY, +}) { if (typeof fetchNodeById !== 'function') { throw new TypeError('fetchNodeById must be a function'); } if (typeof applyNodeFallback !== 'function') { throw new TypeError('applyNodeFallback must be a function'); } + // Treat any non-positive or non-finite value as "fall back to default". This + // keeps the hydrator robust against accidental misconfiguration without + // degrading to unbounded parallelism. + const workerCap = + Number.isFinite(concurrency) && concurrency > 0 + ? Math.floor(concurrency) + : MESSAGE_HYDRATION_CONCURRENCY; /** @type {Map>} */ const inflightLookups = new Map(); + // Negative-result cache shared across all ``hydrate()`` invocations on + // this hydrator instance. Without it, every refresh tick would re-issue + // ``/api/nodes/:id`` for senders that the server has already returned + // 404 for once — turning a single dead participant in a busy chat into a + // perpetual per-minute fetch. The set is consulted *after* the fresh + // ``nodesById`` lookup, so a node that registers later (and therefore + // appears in the bulk /api/nodes refresh) immediately wins over a stale + // missing entry without any explicit invalidation. + /** @type {Set} */ + const missingNodeIds = new Set(); + /** * Normalise potential node identifiers into canonical strings. * @@ -63,6 +97,9 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo if (nodesById instanceof Map && nodesById.has(id)) { return nodesById.get(id); } + if (missingNodeIds.has(id)) { + return null; + } if (inflightLookups.has(id)) { return inflightLookups.get(id); } @@ -77,12 +114,14 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo } return node; } + missingNodeIds.add(id); return null; }) .catch(error => { if (logger && typeof logger.warn === 'function') { logger.warn('message node lookup failed', { nodeId: id, error }); } + missingNodeIds.add(id); return null; }) .finally(() => { @@ -96,6 +135,13 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo /** * Attach node information to the provided message collection. * + * Messages whose sender is already in ``nodesById`` are bound synchronously + * and incur no network traffic. Misses are pushed onto a shared queue and + * drained by a fixed worker pool so the number of in-flight + * ``/api/nodes/:id`` requests never exceeds {@link workerCap}. This caps + * the cold-load thundering-herd that would otherwise issue one request per + * unique sender in parallel. + * * @param {Array|null|undefined} messages Message payloads from the API. * @param {Map} nodesById Lookup table of known nodes. * @returns {Promise>} Hydrated message entries. @@ -105,7 +151,7 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo return Array.isArray(messages) ? messages : []; } - const tasks = []; + const queue = []; for (const message of messages) { if (!message || typeof message !== 'object') { continue; @@ -127,22 +173,36 @@ export function createMessageNodeHydrator({ fetchNodeById, applyNodeFallback, lo continue; } - const task = resolveNode(targetId, nodesById).then(node => { - if (node) { - message.node = node; - } else { - const placeholder = { node_id: targetId }; - applyNodeFallback(placeholder); - message.node = placeholder; - } - }); - tasks.push(task); + queue.push({ message, targetId }); } - if (tasks.length > 0) { - await Promise.all(tasks); + if (queue.length === 0) { + return messages; } + // Workers share a monotonically advancing index instead of mutating the + // queue with ``shift()`` — ``Array#shift`` is O(n) and would turn a + // large hydration burst into O(n²). Single-threaded JS makes the + // post-increment atomic with respect to other workers, so no lock or + // existence check is needed. + let cursor = 0; + const workerCount = Math.min(workerCap, queue.length); + const workers = Array.from({ length: workerCount }, async () => { + while (cursor < queue.length) { + const entry = queue[cursor++]; + // eslint-disable-next-line no-await-in-loop + const node = await resolveNode(entry.targetId, nodesById); + if (node) { + entry.message.node = node; + } else { + const placeholder = { node_id: entry.targetId }; + applyNodeFallback(placeholder); + entry.message.node = placeholder; + } + } + }); + await Promise.all(workers); + return messages; } diff --git a/web/spec/app_spec.rb b/web/spec/app_spec.rb index d9c7427..14c7e3b 100644 --- a/web/spec/app_spec.rb +++ b/web/spec/app_spec.rb @@ -2852,6 +2852,77 @@ RSpec.describe "Potato Mesh Sinatra app" do expect(last_response.status).to eq(404) end end + + describe "response caching" do + it "serves a stable etag and returns 304 for matching If-None-Match" do + get "/api/instances" + + expect(last_response).to be_ok + first_etag = last_response.headers["ETag"] + expect(first_etag).not_to be_nil + + header "If-None-Match", first_etag + get "/api/instances" + + expect(last_response.status).to eq(304) + end + + it "returns the same response body for repeat requests within the TTL" do + # The bodies must be byte-identical: a fresh self-record sign would + # produce a different last_update_time / signature, so consistency + # confirms the cached entry is being reused. + get "/api/instances" + expect(last_response).to be_ok + first_body = last_response.body + first_etag = last_response.headers["ETag"] + + get "/api/instances" + expect(last_response).to be_ok + expect(last_response.body).to eq(first_body) + expect(last_response.headers["ETag"]).to eq(first_etag) + end + + it "invalidates the cache when a new peer registers" do + get "/api/instances" + expect(last_response).to be_ok + baseline_size = JSON.parse(last_response.body).length + + # Build a valid peer registration that POST /api/instances accepts. + peer_key = OpenSSL::PKey::RSA.new(2048) + peer_attributes = { + id: "cache-peer", + domain: "cache-peer.example", + pubkey: peer_key.public_key.export, + name: "Cache Peer", + version: "1.0.0", + channel: "#peer", + frequency: "868MHz", + latitude: 50.0, + longitude: 8.0, + last_update_time: Time.now.to_i, + is_private: false, + } + peer_signature = Base64.strict_encode64( + peer_key.sign( + OpenSSL::Digest::SHA256.new, + canonical_instance_payload(peer_attributes), + ), + ) + + with_db do |db| + upsert_instance_record(db, peer_attributes, peer_signature) + end + # The cache key is unaware of the direct DB insert, so explicitly + # invalidate to mirror what POST /api/instances does in production. + PotatoMesh::App::ApiCache.invalidate_prefix("api:instances:") + + get "/api/instances" + expect(last_response).to be_ok + payload = JSON.parse(last_response.body) + expect(payload.length).to eq(baseline_size + 1) + expect(payload.map { |row| row["id"] }).to include("cache-peer") + end + end end describe "POST /api/nodes" do @@ -6329,11 +6400,12 @@ RSpec.describe "Potato Mesh Sinatra app" do end end - it "filters messages by the since parameter while defaulting to the full history" do + it "excludes messages older than seven days from collection queries while honouring since" do clear_database allow(Time).to receive(:now).and_return(reference_time) now = reference_time.to_i stale_rx = now - (PotatoMesh::Config.week_seconds + 120) + backfillable_rx = now - (PotatoMesh::Config.week_seconds + 3 * 24 * 60 * 60) fresh_rx = now - 15 with_db do |db| @@ -6349,15 +6421,32 @@ RSpec.describe "Potato Mesh Sinatra app" do "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text, snr, rssi, hop_limit) VALUES(?,?,?,?,?,?,?,?,?,?,?)", [2, fresh_rx, Time.at(fresh_rx).utc.iso8601, "!fresh", "!old", 0, "TEXT_MESSAGE_APP", "fresh", 2.0, -60, 3], ) + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text, snr, rssi, hop_limit) VALUES(?,?,?,?,?,?,?,?,?,?,?)", + [3, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!old", "!fresh", 0, "TEXT_MESSAGE_APP", "backfill", 1.0, -75, 3], + ) end + # Bulk feed defaults to the seven-day window so stale conversations stop + # spamming the dashboard hydrator with backfill lookups. get "/api/messages" expect(last_response).to be_ok payload = JSON.parse(last_response.body) ids = payload.map { |row| row["id"] } - expect(ids).to eq([2, 1]) + expect(ids).to eq([2]) + # Per-id lookups widen to twenty-eight days so callers can still backfill + # historical context for a specific conversation participant. The route + # matches messages where either ``from_id`` or ``to_id`` references the + # supplied identifier, so the fresh exchange to ``!old`` shows up too. + get "/api/messages/!old" + + expect(last_response).to be_ok + scoped = JSON.parse(last_response.body) + expect(scoped.map { |row| row["id"] }).to contain_exactly(1, 2, 3) + + # The since parameter overrides the floor when it is more restrictive. get "/api/messages?since=#{fresh_rx}" expect(last_response).to be_ok @@ -6367,8 +6456,91 @@ RSpec.describe "Potato Mesh Sinatra app" do get "/api/messages/!old?since=#{fresh_rx}" expect(last_response).to be_ok - scoped = JSON.parse(last_response.body) - expect(scoped.map { |row| row["id"] }).to eq([2]) + scoped_since = JSON.parse(last_response.body) + expect(scoped_since.map { |row| row["id"] }).to eq([2]) + end + + it "clamps an explicit since older than the seven-day floor up to the floor" do + clear_database + allow(Time).to receive(:now).and_return(reference_time) + now = reference_time.to_i + stale_rx = now - (PotatoMesh::Config.week_seconds + 4 * 60 * 60) + fresh_rx = now - 30 + explicit_since = now - (PotatoMesh::Config.week_seconds + 14 * 24 * 60 * 60) + + with_db do |db| + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)", + [301, stale_rx, Time.at(stale_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "stale"], + ) + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)", + [302, fresh_rx, Time.at(fresh_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "fresh"], + ) + end + + # The caller asked for "everything since three weeks ago" but the route + # silently clamps that up to the seven-day floor — the stale row stays + # excluded. This locks in the contract that ``since`` cannot widen the + # window past the floor. + get "/api/messages?since=#{explicit_since}" + + expect(last_response).to be_ok + ids = JSON.parse(last_response.body).map { |row| row["id"] } + expect(ids).to eq([302]) + expect(ids).not_to include(301) + end + + it "clamps per-id since older than the twenty-eight-day floor up to that floor" do + clear_database + allow(Time).to receive(:now).and_return(reference_time) + now = reference_time.to_i + ancient_rx = now - (PotatoMesh::Config.four_weeks_seconds + 6 * 60 * 60) + backfillable_rx = now - (PotatoMesh::Config.four_weeks_seconds - 60) + explicit_since = now - (PotatoMesh::Config.four_weeks_seconds + 30 * 24 * 60 * 60) + + with_db do |db| + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)", + [401, ancient_rx, Time.at(ancient_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "ancient"], + ) + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)", + [402, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!a", "!b", 0, "TEXT_MESSAGE_APP", "backfill"], + ) + end + + get "/api/messages/!a?since=#{explicit_since}" + + expect(last_response).to be_ok + ids = JSON.parse(last_response.body).map { |row| row["id"] } + expect(ids).to eq([402]) + expect(ids).not_to include(401) + end + + it "excludes per-id messages older than the twenty-eight-day extended window" do + clear_database + allow(Time).to receive(:now).and_return(reference_time) + now = reference_time.to_i + ancient_rx = now - (PotatoMesh::Config.four_weeks_seconds + 60) + backfillable_rx = now - (PotatoMesh::Config.four_weeks_seconds - 60) + + with_db do |db| + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)", + [10, ancient_rx, Time.at(ancient_rx).utc.iso8601, "!old", "!fresh", 0, "TEXT_MESSAGE_APP", "ancient"], + ) + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text) VALUES(?,?,?,?,?,?,?,?)", + [11, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!old", "!fresh", 0, "TEXT_MESSAGE_APP", "backfill"], + ) + end + + get "/api/messages/!old" + + expect(last_response).to be_ok + payload = JSON.parse(last_response.body) + expect(payload.map { |row| row["id"] }).to eq([11]) end end @@ -6580,11 +6752,12 @@ RSpec.describe "Potato Mesh Sinatra app" do end describe "GET /api/neighbors" do - it "excludes neighbor records older than twenty-eight days from collection queries" do + it "excludes neighbor records older than twenty-eight days from both bulk and per-id queries" do clear_database allow(Time).to receive(:now).and_return(reference_time) now = reference_time.to_i - stale_rx = now - (PotatoMesh::Config.trace_neighbor_window_seconds + 45) + stale_rx = now - (PotatoMesh::Config.four_weeks_seconds + 45) + backfillable_rx = now - (PotatoMesh::Config.four_weeks_seconds - 60) fresh_rx = now - 10 with_db do |db| @@ -6596,6 +6769,10 @@ RSpec.describe "Potato Mesh Sinatra app" do "INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)", ["!neighbor-old", "oldn", "Neighbor Old", "TBEAM", "CLIENT", 0.0, fresh_rx, fresh_rx], ) + db.execute( + "INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)", + ["!neighbor-mid", "midn", "Neighbor Mid", "TBEAM", "CLIENT", 0.0, fresh_rx, fresh_rx], + ) db.execute( "INSERT INTO nodes(node_id, short_name, long_name, hw_model, role, snr, last_heard, first_heard) VALUES(?,?,?,?,?,?,?,?)", ["!neighbor-new", "newn", "Neighbor New", "TBEAM", "CLIENT", 0.0, fresh_rx, fresh_rx], @@ -6604,26 +6781,30 @@ RSpec.describe "Potato Mesh Sinatra app" do "INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)", ["!root", "!neighbor-old", 1.0, stale_rx], ) + db.execute( + "INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)", + ["!root", "!neighbor-mid", 4.0, backfillable_rx], + ) db.execute( "INSERT INTO neighbors(node_id, neighbor_id, snr, rx_time) VALUES(?,?,?,?)", ["!root", "!neighbor-new", 8.0, fresh_rx], ) end + # Both bulk and per-id queries share the twenty-eight-day extended + # window — neighbours are reported sporadically and would otherwise be + # lost between scrapes. get "/api/neighbors" expect(last_response).to be_ok payload = JSON.parse(last_response.body) - expect(payload.length).to eq(1) - expect(payload.first["neighbor_id"]).to eq("!neighbor-new") - expect(payload.first["rx_time"]).to eq(fresh_rx) + expect(payload.map { |row| row["neighbor_id"] }).to eq(["!neighbor-new", "!neighbor-mid"]) get "/api/neighbors/!root" expect(last_response).to be_ok filtered = JSON.parse(last_response.body) - expect(filtered.length).to eq(2) - expect(filtered.map { |row| row["neighbor_id"] }).to eq(["!neighbor-new", "!neighbor-old"]) + expect(filtered.map { |row| row["neighbor_id"] }).to eq(["!neighbor-new", "!neighbor-mid"]) end it "honours the since parameter for neighbor queries" do @@ -7172,8 +7353,8 @@ RSpec.describe "Potato Mesh Sinatra app" do it "excludes traces older than twenty-eight days" do clear_database now = Time.now.to_i - recent_rx = now - (PotatoMesh::Config.trace_neighbor_window_seconds / 2) - stale_rx = now - (PotatoMesh::Config.trace_neighbor_window_seconds + 60) + recent_rx = now - (PotatoMesh::Config.four_weeks_seconds / 2) + stale_rx = now - (PotatoMesh::Config.four_weeks_seconds + 60) payload = [ { "id" => 50_001, "src" => 1, "dest" => 2, "rx_time" => recent_rx, "metrics" => {} }, { "id" => 50_002, "src" => 3, "dest" => 4, "rx_time" => stale_rx, "metrics" => {} }, @@ -7221,6 +7402,41 @@ RSpec.describe "Potato Mesh Sinatra app" do end end + describe "GET /api/ingestors" do + it "uses the twenty-eight-day extended window so slow-tick ingestors stay visible" do + clear_database + allow(Time).to receive(:now).and_return(reference_time) + now = reference_time.to_i + stale_seen = now - (PotatoMesh::Config.four_weeks_seconds + 30) + backfillable_seen = now - (PotatoMesh::Config.week_seconds + 24 * 60 * 60) + fresh_seen = now - 60 + + with_db do |db| + db.execute( + "INSERT INTO ingestors(node_id, start_time, last_seen_time, version, protocol) VALUES(?,?,?,?,?)", + ["!stale-ing", stale_seen - 60, stale_seen, "0.6.3", "meshtastic"], + ) + db.execute( + "INSERT INTO ingestors(node_id, start_time, last_seen_time, version, protocol) VALUES(?,?,?,?,?)", + ["!slow-ing", backfillable_seen - 60, backfillable_seen, "0.6.3", "meshtastic"], + ) + db.execute( + "INSERT INTO ingestors(node_id, start_time, last_seen_time, version, protocol) VALUES(?,?,?,?,?)", + ["!fresh-ing", fresh_seen - 60, fresh_seen, "0.6.3", "meshtastic"], + ) + end + + get "/api/ingestors" + + expect(last_response).to be_ok + payload = JSON.parse(last_response.body) + ids = payload.map { |row| row["node_id"] } + # The eight-day-old ingestor is included (was excluded under the seven-day + # default) while the twenty-eight-day-old one stays out. + expect(ids).to contain_exactly("!fresh-ing", "!slow-ing") + end + end + describe "GET /nodes/:id" do before do import_nodes_fixture diff --git a/web/spec/data_processing_spec.rb b/web/spec/data_processing_spec.rb index 87118a8..85fb27c 100644 --- a/web/spec/data_processing_spec.rb +++ b/web/spec/data_processing_spec.rb @@ -144,7 +144,7 @@ RSpec.describe PotatoMesh::App::DataProcessing do allow(PotatoMesh::Config).to receive(:db_path).and_return(db_path) allow(PotatoMesh::Config).to receive(:db_busy_timeout_ms).and_return(5000) allow(PotatoMesh::Config).to receive(:week_seconds).and_return(604_800) - allow(PotatoMesh::Config).to receive(:trace_neighbor_window_seconds).and_return(604_800) + allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(604_800) allow(PotatoMesh::Config).to receive(:debug?).and_return(false) db_helper = Object.new.extend(PotatoMesh::App::Database) db_helper.init_db diff --git a/web/spec/federation_spec.rb b/web/spec/federation_spec.rb index 31beee4..19ba54b 100644 --- a/web/spec/federation_spec.rb +++ b/web/spec/federation_spec.rb @@ -2317,6 +2317,10 @@ RSpec.describe PotatoMesh::App::Federation do describe ".ensure_self_instance_record! (skip branch)" do let(:application_class) { PotatoMesh::Application } + before do + PotatoMesh::App::Federation.reset_self_registration_log_state! + end + it "logs without writing when registration is not allowed" do attributes = { id: "self", domain: "self.mesh" } allow(application_class).to receive(:self_instance_attributes).and_return(attributes) @@ -2337,6 +2341,52 @@ RSpec.describe PotatoMesh::App::Federation do hash_including(reason: "blocked"), ) end + + it "deduplicates the skip log when the decision does not change" do + attributes = { id: "self", domain: "self.mesh" } + allow(application_class).to receive(:self_instance_attributes).and_return(attributes) + allow(application_class).to receive(:sign_instance_attributes).and_return("sig") + allow(application_class).to receive(:self_instance_registration_decision).and_return([false, "blocked"]) + allow(application_class).to receive(:debug_log) + + 3.times { application_class.ensure_self_instance_record! } + + expect(application_class).to have_received(:debug_log).with( + "Skipped self instance registration", + hash_including(reason: "blocked"), + ).once + end + + it "re-emits the log when the decision flips between calls" do + attributes = { id: "self", domain: "self.mesh" } + allow(application_class).to receive(:self_instance_attributes).and_return(attributes) + allow(application_class).to receive(:sign_instance_attributes).and_return("sig") + allow(application_class).to receive(:open_database) + allow(application_class).to receive(:upsert_instance_record) + allow(application_class).to receive(:debug_log) + + decisions = [ + [false, "blocked"], + [false, "blocked"], + [true, nil], + [true, nil], + [false, "blocked"], + ] + allow(application_class).to receive(:self_instance_registration_decision) do + decisions.shift + end + + 5.times { application_class.ensure_self_instance_record! } + + expect(application_class).to have_received(:debug_log).with( + "Skipped self instance registration", + hash_including(reason: "blocked"), + ).twice + expect(application_class).to have_received(:debug_log).with( + "Registered self instance record", + hash_including(domain: "self.mesh"), + ).once + end end describe ".verify_instance_signature (failure path)" do diff --git a/web/spec/ingestors_spec.rb b/web/spec/ingestors_spec.rb index 823f072..b3fa6bb 100644 --- a/web/spec/ingestors_spec.rb +++ b/web/spec/ingestors_spec.rb @@ -128,16 +128,24 @@ RSpec.describe "Ingestor endpoints" do end describe "GET /api/ingestors" do - it "returns recent ingestors and omits stale rows" do + it "returns recent ingestors and omits rows older than the twenty-eight-day window" do now = Time.now.to_i with_db do |db| db.execute( "INSERT INTO ingestors(node_id, start_time, last_seen_time, version) VALUES(?,?,?,?)", ["!fresh000", now - 100, now - 10, "0.5.12"], ) + # Eight-day-old heartbeats now stay visible: ingestors are sparse and + # the bulk endpoint widens to four weeks so slow-tick nodes do not + # disappear between scrapes. db.execute( "INSERT INTO ingestors(node_id, start_time, last_seen_time, version) VALUES(?,?,?,?)", - ["!stale000", now - (9 * 24 * 60 * 60), now - (9 * 24 * 60 * 60), "0.5.6"], + ["!sparse00", now - (9 * 24 * 60 * 60), now - (8 * 24 * 60 * 60), "0.5.6"], + ) + # Beyond the four-week floor — must be excluded. + db.execute( + "INSERT INTO ingestors(node_id, start_time, last_seen_time, version) VALUES(?,?,?,?)", + ["!stale000", now - (30 * 24 * 60 * 60), now - (30 * 24 * 60 * 60), "0.5.0"], ) db.execute( "INSERT INTO ingestors(node_id, start_time, last_seen_time, version, lora_freq, modem_preset) VALUES(?,?,?,?,?,?)", @@ -151,7 +159,7 @@ RSpec.describe "Ingestor endpoints" do payload = JSON.parse(last_response.body) expect(payload).to all(include("node_id", "start_time", "last_seen_time", "version")) node_ids = payload.map { |entry| entry["node_id"] } - expect(node_ids).to include("!fresh000") + expect(node_ids).to include("!fresh000", "!sparse00") expect(node_ids).not_to include("!stale000") rich = payload.find { |row| row["node_id"] == "!rich000" } expect(rich["lora_freq"]).to eq(915) diff --git a/web/spec/prometheus_spec.rb b/web/spec/prometheus_spec.rb index 5f13d59..58a5f87 100644 --- a/web/spec/prometheus_spec.rb +++ b/web/spec/prometheus_spec.rb @@ -72,7 +72,7 @@ RSpec.describe PotatoMesh::App::Prometheus do allow(PotatoMesh::Config).to receive(:db_path).and_return(db_path) allow(PotatoMesh::Config).to receive(:db_busy_timeout_ms).and_return(5000) allow(PotatoMesh::Config).to receive(:week_seconds).and_return(604_800) - allow(PotatoMesh::Config).to receive(:trace_neighbor_window_seconds).and_return(604_800) + allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(604_800) allow(PotatoMesh::Config).to receive(:debug?).and_return(false) db_helper = Object.new.extend(PotatoMesh::App::Database) db_helper.init_db diff --git a/web/spec/queries_spec.rb b/web/spec/queries_spec.rb index 63b9ef0..4bb95b9 100644 --- a/web/spec/queries_spec.rb +++ b/web/spec/queries_spec.rb @@ -358,7 +358,7 @@ RSpec.describe PotatoMesh::App::Queries do allow(PotatoMesh::Config).to receive(:db_path).and_return(db_path) allow(PotatoMesh::Config).to receive(:db_busy_timeout_ms).and_return(5000) allow(PotatoMesh::Config).to receive(:week_seconds).and_return(604_800) - allow(PotatoMesh::Config).to receive(:trace_neighbor_window_seconds).and_return(604_800) + allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(604_800) allow(PotatoMesh::Config).to receive(:debug?).and_return(false) # Initialise schema so query methods can execute real SQL. @@ -565,6 +565,36 @@ RSpec.describe PotatoMesh::App::Queries do expect(texts).to include("hello") expect(texts).not_to include("other message") end + + it "applies a seven-day floor for bulk queries and twenty-eight days for per-id" do + # Re-stub the windows with their real ratio so the bulk-vs-per-id + # distinction is observable from a single test. + allow(PotatoMesh::Config).to receive(:week_seconds).and_return(7 * 24 * 60 * 60) + allow(PotatoMesh::Config).to receive(:four_weeks_seconds).and_return(28 * 24 * 60 * 60) + + stale_rx = now - (7 * 24 * 60 * 60 + 60) + backfillable_rx = now - (28 * 24 * 60 * 60 - 60) + + with_db do |db| + rx_iso = Time.at(stale_rx).utc.iso8601 + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, text) VALUES (?,?,?,?,?,?,?)", + [101, stale_rx, rx_iso, "!aabbccdd", "!ffffffff", 0, "stale"], + ) + db.execute( + "INSERT INTO messages(id, rx_time, rx_iso, from_id, to_id, channel, text) VALUES (?,?,?,?,?,?,?)", + [102, backfillable_rx, Time.at(backfillable_rx).utc.iso8601, "!aabbccdd", "!ffffffff", 0, "backfill"], + ) + end + + bulk_ids = queries.query_messages(50).map { |r| r["id"] } + expect(bulk_ids).not_to include(101) + expect(bulk_ids).not_to include(102) + + scoped_ids = queries.query_messages(50, node_ref: "!aabbccdd").map { |r| r["id"] } + expect(scoped_ids).to include(101) + expect(scoped_ids).to include(102) + end end describe "#query_telemetry" do