Fix async db handling. Closes #179.

This commit is contained in:
Jack Kingsman
2026-04-12 11:57:37 -07:00
parent 53a4d8186a
commit cde4d1744e
8 changed files with 89 additions and 94 deletions

View File

@@ -9,6 +9,7 @@ The path_len wire byte is packed as [hash_mode:2][hop_count:6]:
Mode 3 (hash_size=4) is reserved and rejected.
"""
from collections.abc import Iterable
from dataclasses import dataclass
MAX_PATH_SIZE = 64
@@ -246,30 +247,26 @@ def parse_explicit_hop_route(route_text: str) -> tuple[str, int, int]:
return "".join(hops), len(hops), hash_size - 1
async def bucket_path_hash_widths(cursor, *, batch_size: int = 500) -> dict[str, int | float]:
def bucket_path_hash_widths(rows: Iterable) -> dict[str, int | float]:
"""Bucket raw packet rows by hop hash width and return counts + percentages.
*cursor* must be an already-executed async cursor whose rows have a ``data``
*rows* must be an already-fetched list whose elements have a ``data``
column containing raw packet bytes.
"""
single_byte = 0
double_byte = 0
triple_byte = 0
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
envelope = parse_packet_envelope(bytes(row["data"]))
if envelope is None:
continue
if envelope.hash_size == 1:
single_byte += 1
elif envelope.hash_size == 2:
double_byte += 1
elif envelope.hash_size == 3:
triple_byte += 1
for row in rows:
envelope = parse_packet_envelope(bytes(row["data"]))
if envelope is None:
continue
if envelope.hash_size == 1:
single_byte += 1
elif envelope.hash_size == 2:
double_byte += 1
elif envelope.hash_size == 3:
triple_byte += 1
total = single_byte + double_byte + triple_byte
if total == 0:

View File

@@ -868,7 +868,8 @@ class MessageRepository:
""",
(conversation_key, t_24h),
)
path_hash_width_24h = await bucket_path_hash_widths(cursor3)
rows3 = await cursor3.fetchall()
path_hash_width_24h = bucket_path_hash_widths(rows3)
return {
"message_counts": message_counts,

View File

@@ -74,41 +74,52 @@ class RawPacketRepository:
async def stream_all_undecrypted(
batch_size: int = UNDECRYPTED_PACKET_BATCH_SIZE,
) -> AsyncIterator[tuple[int, bytes, int]]:
"""Yield all undecrypted packets as (id, data, timestamp) in bounded batches."""
cursor = await db.conn.execute(
"SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC"
)
try:
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
yield (row["id"], bytes(row["data"]), row["timestamp"])
finally:
"""Yield all undecrypted packets as (id, data, timestamp) in bounded batches.
Uses keyset pagination so each batch is a fresh query with a fully
consumed cursor — no open statement held across yield boundaries.
"""
last_id = -1
while True:
cursor = await db.conn.execute(
"SELECT id, data, timestamp FROM raw_packets "
"WHERE message_id IS NULL AND id > ? ORDER BY id ASC LIMIT ?",
(last_id, batch_size),
)
rows = await cursor.fetchall()
await cursor.close()
if not rows:
break
for row in rows:
last_id = row["id"]
yield (row["id"], bytes(row["data"]), row["timestamp"])
@staticmethod
async def stream_undecrypted_text_messages(
batch_size: int = UNDECRYPTED_PACKET_BATCH_SIZE,
) -> AsyncIterator[tuple[int, bytes, int]]:
"""Yield undecrypted TEXT_MESSAGE packets in bounded-size batches."""
cursor = await db.conn.execute(
"SELECT id, data, timestamp FROM raw_packets WHERE message_id IS NULL ORDER BY timestamp ASC"
)
try:
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
"""Yield undecrypted TEXT_MESSAGE packets in bounded-size batches.
for row in rows:
data = bytes(row["data"])
payload_type = get_packet_payload_type(data)
if payload_type == PayloadType.TEXT_MESSAGE:
yield (row["id"], data, row["timestamp"])
finally:
Uses keyset pagination so each batch is a fresh query with a fully
consumed cursor — no open statement held across yield boundaries.
"""
last_id = -1
while True:
cursor = await db.conn.execute(
"SELECT id, data, timestamp FROM raw_packets "
"WHERE message_id IS NULL AND id > ? ORDER BY id ASC LIMIT ?",
(last_id, batch_size),
)
rows = await cursor.fetchall()
await cursor.close()
if not rows:
break
for row in rows:
last_id = row["id"]
data = bytes(row["data"])
payload_type = get_packet_payload_type(data)
if payload_type == PayloadType.TEXT_MESSAGE:
yield (row["id"], data, row["timestamp"])
@staticmethod
async def count_undecrypted_text_messages(

View File

@@ -13,7 +13,6 @@ SECONDS_1H = 3600
SECONDS_24H = 86400
SECONDS_72H = 259200
SECONDS_7D = 604800
RAW_PACKET_STATS_BATCH_SIZE = 500
class AppSettingsRepository:
@@ -302,7 +301,8 @@ class StatisticsRepository:
"SELECT data FROM raw_packets WHERE timestamp >= ?",
(now - SECONDS_24H,),
)
return await bucket_path_hash_widths(cursor, batch_size=RAW_PACKET_STATS_BATCH_SIZE)
rows = await cursor.fetchall()
return bucket_path_hash_widths(rows)
@staticmethod
async def get_all() -> dict:

View File

@@ -1,8 +1,10 @@
import type { FullConfig } from '@playwright/test';
const BASE_URL = 'http://localhost:8001';
const MAX_RETRIES = 10;
const RETRY_DELAY_MS = 2000;
// Post-connect sync (contact offload, channel sync, key export) can take
// 30-60s on a radio with many contacts, so allow generous polling here.
const MAX_RETRIES = 60;
const RETRY_DELAY_MS = 3000;
interface HealthStatus {
radio_connected: boolean;

View File

@@ -63,7 +63,6 @@ export default defineConfig({
timeout: 180_000,
env: {
MESHCORE_DATABASE_PATH: path.join(tmpDir, 'e2e-test.db'),
MESHCORE_SKIP_POST_CONNECT_SYNC: 'true',
// Pass through the serial port from the environment
...(process.env.MESHCORE_SERIAL_PORT
? { MESHCORE_SERIAL_PORT: process.env.MESHCORE_SERIAL_PORT }

View File

@@ -5,7 +5,7 @@ undecrypted count endpoint, and the maintenance endpoint.
"""
import time
from unittest.mock import AsyncMock, patch
from unittest.mock import patch
import pytest
@@ -307,38 +307,37 @@ class TestDecryptHistoricalPackets:
class TestUndecryptedTextPacketStreaming:
@pytest.mark.asyncio
async def test_count_undecrypted_text_messages_uses_batched_streaming(self, test_db):
"""Counting undecrypted DM packets should stream batches and filter by payload type."""
async def test_count_undecrypted_text_messages_uses_keyset_pagination(self, test_db):
"""Counting undecrypted DM packets should use keyset pagination and filter by payload type."""
class FakeCursor:
def __init__(self):
self._batches = [
[
{"id": 1, "data": b"\x09\x00dm", "timestamp": 1000},
{"id": 2, "data": b"\x15\x00chan", "timestamp": 1001},
],
[{"id": 3, "data": b"\x09\x00dm2", "timestamp": 1002}],
[],
]
self.fetchall_called = False
# Simulate keyset pagination: each execute() call returns a cursor
# whose fetchall() yields one batch. The generator stops when a
# batch is empty.
batches = [
[
{"id": 1, "data": b"\x09\x00dm", "timestamp": 1000},
{"id": 2, "data": b"\x15\x00chan", "timestamp": 1001},
],
[{"id": 3, "data": b"\x09\x00dm2", "timestamp": 1002}],
[],
]
async def fetchmany(self, size):
assert size > 0
return self._batches.pop(0)
async def fake_execute(*_args, **_kwargs):
batch = batches.pop(0)
async def close(self):
return None
class FakeCursor:
async def fetchall(self):
return batch
async def fetchall(self):
self.fetchall_called = True
raise AssertionError("fetchall() should not be used")
async def close(self):
pass
fake_cursor = FakeCursor()
return FakeCursor()
with patch.object(test_db.conn, "execute", new=AsyncMock(return_value=fake_cursor)):
with patch.object(test_db.conn, "execute", side_effect=fake_execute):
count = await RawPacketRepository.count_undecrypted_text_messages(batch_size=2)
assert fake_cursor.fetchall_called is False
# header byte 0x09 -> payload type 2 (TEXT_MESSAGE); 0x15 -> type 5 (not TEXT_MESSAGE)
assert count == 2

View File

@@ -352,27 +352,14 @@ class TestPathHashWidthStats:
assert breakdown["triple_byte_pct"] == pytest.approx(100 / 3, rel=1e-3)
@pytest.mark.asyncio
async def test_path_hash_width_scan_uses_batched_fetchmany(self, test_db):
"""Hash-width stats should stream batches instead of calling fetchall()."""
async def test_path_hash_width_scan_fetches_all_then_buckets(self, test_db):
"""Hash-width stats should fetchall() then bucket synchronously."""
fake_rows = [{"data": b"a"}, {"data": b"b"}, {"data": b"c"}]
class FakeCursor:
def __init__(self):
self._batches = [
[{"data": b"a"}, {"data": b"b"}],
[{"data": b"c"}],
[],
]
self.fetchall_called = False
async def fetchmany(self, size):
assert size > 0
return self._batches.pop(0)
async def fetchall(self):
self.fetchall_called = True
raise AssertionError("fetchall() should not be used")
fake_cursor = FakeCursor()
return fake_rows
def fake_parse(raw_packet: bytes):
hash_sizes = {
@@ -386,12 +373,11 @@ class TestPathHashWidthStats:
return SimpleNamespace(hash_size=hash_size)
with (
patch.object(test_db.conn, "execute", new=AsyncMock(return_value=fake_cursor)),
patch.object(test_db.conn, "execute", new=AsyncMock(return_value=FakeCursor())),
patch("app.path_utils.parse_packet_envelope", side_effect=fake_parse),
):
breakdown = await StatisticsRepository._path_hash_width_24h()
assert fake_cursor.fetchall_called is False
assert breakdown["total_packets"] == 3
assert breakdown["single_byte"] == 1
assert breakdown["double_byte"] == 1