From c7d5d3887d8c0ebb35ec0c5a7d12f15a9422e421 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Thu, 2 Apr 2026 18:53:34 -0700 Subject: [PATCH] Yield radio lock on build repeater ops and use INSERT OR IGNORE instead of check-then-act on packet ops --- app/repository/raw_packets.py | 50 +++++++++-------------------------- app/routers/server_control.py | 28 ++++++++++++-------- 2 files changed, 30 insertions(+), 48 deletions(-) diff --git a/app/repository/raw_packets.py b/app/repository/raw_packets.py index 9214eef..17d53dd 100644 --- a/app/repository/raw_packets.py +++ b/app/repository/raw_packets.py @@ -1,5 +1,4 @@ import logging -import sqlite3 import time from collections.abc import AsyncIterator from hashlib import sha256 @@ -35,46 +34,23 @@ class RawPacketRepository: # For malformed packets, hash the full data payload_hash = sha256(data).digest() - # Check if this payload already exists + cursor = await db.conn.execute( + "INSERT OR IGNORE INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)", + (ts, data, payload_hash), + ) + await db.conn.commit() + + if cursor.rowcount > 0: + assert cursor.lastrowid is not None + return (cursor.lastrowid, True) + + # Duplicate payload — look up the existing row. cursor = await db.conn.execute( "SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,) ) existing = await cursor.fetchone() - - if existing: - # Duplicate - return existing packet ID - logger.debug( - "Duplicate payload detected (hash=%s..., existing_id=%d)", - payload_hash.hex()[:12], - existing["id"], - ) - return (existing["id"], False) - - # New packet - insert with hash - try: - cursor = await db.conn.execute( - "INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)", - (ts, data, payload_hash), - ) - await db.conn.commit() - assert cursor.lastrowid is not None # INSERT always returns a row ID - return (cursor.lastrowid, True) - except sqlite3.IntegrityError: - # Race condition: another insert with same payload_hash happened between - # our SELECT and INSERT. This is expected for duplicate packets arriving - # close together. Query again to get the existing ID. - logger.debug( - "Duplicate packet detected via race condition (payload_hash=%s), dropping", - payload_hash.hex()[:16], - ) - cursor = await db.conn.execute( - "SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,) - ) - existing = await cursor.fetchone() - if existing: - return (existing["id"], False) - # This shouldn't happen, but if it does, re-raise - raise + assert existing is not None + return (existing["id"], False) @staticmethod async def get_undecrypted_count() -> int: diff --git a/app/routers/server_control.py b/app/routers/server_control.py index a13ffca..425978a 100644 --- a/app/routers/server_control.py +++ b/app/routers/server_control.py @@ -230,20 +230,26 @@ async def batch_cli_fetch( operation_name: str, commands: list[tuple[str, str]], ) -> dict[str, str | None]: - """Send a batch of CLI commands to a server-capable contact and collect responses.""" + """Send a batch of CLI commands to a server-capable contact and collect responses. + + Each command acquires and releases the radio lock independently so that + other operations (sends, syncs) can slip in between commands. + """ results: dict[str, str | None] = {field: None for _, field in commands} - async with radio_manager.radio_operation( - operation_name, - pause_polling=True, - suspend_auto_fetch=True, - ) as mc: - await _ensure_on_radio(mc, contact) - await asyncio.sleep(1.0) + for index, (cmd, field) in enumerate(commands): + if index > 0: + # Yield briefly so queued operations can acquire the lock. + await asyncio.sleep(0.25) - for index, (cmd, field) in enumerate(commands): - if index > 0: - await asyncio.sleep(1.0) + async with radio_manager.radio_operation( + operation_name, + pause_polling=True, + suspend_auto_fetch=True, + ) as mc: + # Re-ensure contact is loaded each iteration; another operation + # may have evicted it while we didn't hold the lock. + await _ensure_on_radio(mc, contact) send_result = await mc.commands.send_cmd(contact.public_key, cmd) if send_result.type == EventType.ERROR: