diff --git a/app/path_utils.py b/app/path_utils.py index 7d4d731..99b87f2 100644 --- a/app/path_utils.py +++ b/app/path_utils.py @@ -9,6 +9,7 @@ The path_len wire byte is packed as [hash_mode:2][hop_count:6]: Mode 3 (hash_size=4) is reserved and rejected. """ +from collections.abc import Iterable from dataclasses import dataclass MAX_PATH_SIZE = 64 @@ -246,30 +247,26 @@ def parse_explicit_hop_route(route_text: str) -> tuple[str, int, int]: return "".join(hops), len(hops), hash_size - 1 -async def bucket_path_hash_widths(cursor, *, batch_size: int = 500) -> dict[str, int | float]: +def bucket_path_hash_widths(rows: Iterable) -> dict[str, int | float]: """Bucket raw packet rows by hop hash width and return counts + percentages. - *cursor* must be an already-executed async cursor whose rows have a ``data`` + *rows* must be an already-fetched list whose elements have a ``data`` column containing raw packet bytes. """ single_byte = 0 double_byte = 0 triple_byte = 0 - while True: - rows = await cursor.fetchmany(batch_size) - if not rows: - break - for row in rows: - envelope = parse_packet_envelope(bytes(row["data"])) - if envelope is None: - continue - if envelope.hash_size == 1: - single_byte += 1 - elif envelope.hash_size == 2: - double_byte += 1 - elif envelope.hash_size == 3: - triple_byte += 1 + for row in rows: + envelope = parse_packet_envelope(bytes(row["data"])) + if envelope is None: + continue + if envelope.hash_size == 1: + single_byte += 1 + elif envelope.hash_size == 2: + double_byte += 1 + elif envelope.hash_size == 3: + triple_byte += 1 total = single_byte + double_byte + triple_byte if total == 0: diff --git a/app/repository/messages.py b/app/repository/messages.py index 65c4799..2562034 100644 --- a/app/repository/messages.py +++ b/app/repository/messages.py @@ -868,7 +868,8 @@ class MessageRepository: """, (conversation_key, t_24h), ) - path_hash_width_24h = await bucket_path_hash_widths(cursor3) + rows3 = await cursor3.fetchall() + path_hash_width_24h = bucket_path_hash_widths(rows3) return { "message_counts": message_counts, diff --git a/app/repository/raw_packets.py b/app/repository/raw_packets.py index d14911d..3aded63 100644 --- a/app/repository/raw_packets.py +++ b/app/repository/raw_packets.py @@ -74,41 +74,52 @@ class RawPacketRepository: async def stream_all_undecrypted( batch_size: int = UNDECRYPTED_PACKET_BATCH_SIZE, ) -> AsyncIterator[tuple[int, bytes, int]]: - """Yield all undecrypted packets as (id, data, timestamp) in bounded batches.""" - cursor = await db.conn.execute( - "SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC" - ) - try: - while True: - rows = await cursor.fetchmany(batch_size) - if not rows: - break - for row in rows: - yield (row["id"], bytes(row["data"]), row["timestamp"]) - finally: + """Yield all undecrypted packets as (id, data, timestamp) in bounded batches. + + Uses keyset pagination so each batch is a fresh query with a fully + consumed cursor — no open statement held across yield boundaries. + """ + last_id = -1 + while True: + cursor = await db.conn.execute( + "SELECT id, data, timestamp FROM raw_packets " + "WHERE message_id IS NULL AND id > ? ORDER BY id ASC LIMIT ?", + (last_id, batch_size), + ) + rows = await cursor.fetchall() await cursor.close() + if not rows: + break + for row in rows: + last_id = row["id"] + yield (row["id"], bytes(row["data"]), row["timestamp"]) @staticmethod async def stream_undecrypted_text_messages( batch_size: int = UNDECRYPTED_PACKET_BATCH_SIZE, ) -> AsyncIterator[tuple[int, bytes, int]]: - """Yield undecrypted TEXT_MESSAGE packets in bounded-size batches.""" - cursor = await db.conn.execute( - "SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC" - ) - try: - while True: - rows = await cursor.fetchmany(batch_size) - if not rows: - break + """Yield undecrypted TEXT_MESSAGE packets in bounded-size batches. - for row in rows: - data = bytes(row["data"]) - payload_type = get_packet_payload_type(data) - if payload_type == PayloadType.TEXT_MESSAGE: - yield (row["id"], data, row["timestamp"]) - finally: + Uses keyset pagination so each batch is a fresh query with a fully + consumed cursor — no open statement held across yield boundaries. + """ + last_id = -1 + while True: + cursor = await db.conn.execute( + "SELECT id, data, timestamp FROM raw_packets " + "WHERE message_id IS NULL AND id > ? ORDER BY id ASC LIMIT ?", + (last_id, batch_size), + ) + rows = await cursor.fetchall() await cursor.close() + if not rows: + break + for row in rows: + last_id = row["id"] + data = bytes(row["data"]) + payload_type = get_packet_payload_type(data) + if payload_type == PayloadType.TEXT_MESSAGE: + yield (row["id"], data, row["timestamp"]) @staticmethod async def count_undecrypted_text_messages( diff --git a/app/repository/settings.py b/app/repository/settings.py index c5dda0b..e8077ce 100644 --- a/app/repository/settings.py +++ b/app/repository/settings.py @@ -13,7 +13,6 @@ SECONDS_1H = 3600 SECONDS_24H = 86400 SECONDS_72H = 259200 SECONDS_7D = 604800 -RAW_PACKET_STATS_BATCH_SIZE = 500 class AppSettingsRepository: @@ -302,7 +301,8 @@ class StatisticsRepository: "SELECT data FROM raw_packets WHERE timestamp >= ?", (now - SECONDS_24H,), ) - return await bucket_path_hash_widths(cursor, batch_size=RAW_PACKET_STATS_BATCH_SIZE) + rows = await cursor.fetchall() + return bucket_path_hash_widths(rows) @staticmethod async def get_all() -> dict: diff --git a/tests/e2e/global-setup.ts b/tests/e2e/global-setup.ts index e207882..a2677b7 100644 --- a/tests/e2e/global-setup.ts +++ b/tests/e2e/global-setup.ts @@ -1,8 +1,10 @@ import type { FullConfig } from '@playwright/test'; const BASE_URL = 'http://localhost:8001'; -const MAX_RETRIES = 10; -const RETRY_DELAY_MS = 2000; +// Post-connect sync (contact offload, channel sync, key export) can take +// 30-60s on a radio with many contacts, so allow generous polling here. +const MAX_RETRIES = 60; +const RETRY_DELAY_MS = 3000; interface HealthStatus { radio_connected: boolean; diff --git a/tests/e2e/playwright.config.ts b/tests/e2e/playwright.config.ts index 8870701..58d0840 100644 --- a/tests/e2e/playwright.config.ts +++ b/tests/e2e/playwright.config.ts @@ -63,7 +63,6 @@ export default defineConfig({ timeout: 180_000, env: { MESHCORE_DATABASE_PATH: path.join(tmpDir, 'e2e-test.db'), - MESHCORE_SKIP_POST_CONNECT_SYNC: 'true', // Pass through the serial port from the environment ...(process.env.MESHCORE_SERIAL_PORT ? { MESHCORE_SERIAL_PORT: process.env.MESHCORE_SERIAL_PORT } diff --git a/tests/test_packets_router.py b/tests/test_packets_router.py index d9d1243..7876017 100644 --- a/tests/test_packets_router.py +++ b/tests/test_packets_router.py @@ -5,7 +5,7 @@ undecrypted count endpoint, and the maintenance endpoint. """ import time -from unittest.mock import AsyncMock, patch +from unittest.mock import patch import pytest @@ -307,38 +307,37 @@ class TestDecryptHistoricalPackets: class TestUndecryptedTextPacketStreaming: @pytest.mark.asyncio - async def test_count_undecrypted_text_messages_uses_batched_streaming(self, test_db): - """Counting undecrypted DM packets should stream batches and filter by payload type.""" + async def test_count_undecrypted_text_messages_uses_keyset_pagination(self, test_db): + """Counting undecrypted DM packets should use keyset pagination and filter by payload type.""" - class FakeCursor: - def __init__(self): - self._batches = [ - [ - {"id": 1, "data": b"\x09\x00dm", "timestamp": 1000}, - {"id": 2, "data": b"\x15\x00chan", "timestamp": 1001}, - ], - [{"id": 3, "data": b"\x09\x00dm2", "timestamp": 1002}], - [], - ] - self.fetchall_called = False + # Simulate keyset pagination: each execute() call returns a cursor + # whose fetchall() yields one batch. The generator stops when a + # batch is empty. + batches = [ + [ + {"id": 1, "data": b"\x09\x00dm", "timestamp": 1000}, + {"id": 2, "data": b"\x15\x00chan", "timestamp": 1001}, + ], + [{"id": 3, "data": b"\x09\x00dm2", "timestamp": 1002}], + [], + ] - async def fetchmany(self, size): - assert size > 0 - return self._batches.pop(0) + async def fake_execute(*_args, **_kwargs): + batch = batches.pop(0) - async def close(self): - return None + class FakeCursor: + async def fetchall(self): + return batch - async def fetchall(self): - self.fetchall_called = True - raise AssertionError("fetchall() should not be used") + async def close(self): + pass - fake_cursor = FakeCursor() + return FakeCursor() - with patch.object(test_db.conn, "execute", new=AsyncMock(return_value=fake_cursor)): + with patch.object(test_db.conn, "execute", side_effect=fake_execute): count = await RawPacketRepository.count_undecrypted_text_messages(batch_size=2) - assert fake_cursor.fetchall_called is False + # header byte 0x09 -> payload type 2 (TEXT_MESSAGE); 0x15 -> type 5 (not TEXT_MESSAGE) assert count == 2 diff --git a/tests/test_statistics.py b/tests/test_statistics.py index d788684..31910e3 100644 --- a/tests/test_statistics.py +++ b/tests/test_statistics.py @@ -352,27 +352,14 @@ class TestPathHashWidthStats: assert breakdown["triple_byte_pct"] == pytest.approx(100 / 3, rel=1e-3) @pytest.mark.asyncio - async def test_path_hash_width_scan_uses_batched_fetchmany(self, test_db): - """Hash-width stats should stream batches instead of calling fetchall().""" + async def test_path_hash_width_scan_fetches_all_then_buckets(self, test_db): + """Hash-width stats should fetchall() then bucket synchronously.""" + + fake_rows = [{"data": b"a"}, {"data": b"b"}, {"data": b"c"}] class FakeCursor: - def __init__(self): - self._batches = [ - [{"data": b"a"}, {"data": b"b"}], - [{"data": b"c"}], - [], - ] - self.fetchall_called = False - - async def fetchmany(self, size): - assert size > 0 - return self._batches.pop(0) - async def fetchall(self): - self.fetchall_called = True - raise AssertionError("fetchall() should not be used") - - fake_cursor = FakeCursor() + return fake_rows def fake_parse(raw_packet: bytes): hash_sizes = { @@ -386,12 +373,11 @@ class TestPathHashWidthStats: return SimpleNamespace(hash_size=hash_size) with ( - patch.object(test_db.conn, "execute", new=AsyncMock(return_value=fake_cursor)), + patch.object(test_db.conn, "execute", new=AsyncMock(return_value=FakeCursor())), patch("app.path_utils.parse_packet_envelope", side_effect=fake_parse), ): breakdown = await StatisticsRepository._path_hash_width_24h() - assert fake_cursor.fetchall_called is False assert breakdown["total_packets"] == 3 assert breakdown["single_byte"] == 1 assert breakdown["double_byte"] == 1