Yield radio lock on build repeater ops and use INSERT OR IGNORE instead of check-then-act on packet ops

This commit is contained in:
Jack Kingsman
2026-04-02 18:53:34 -07:00
parent 5c93d8487e
commit c7d5d3887d
2 changed files with 30 additions and 48 deletions

View File

@@ -1,5 +1,4 @@
import logging
import sqlite3
import time
from collections.abc import AsyncIterator
from hashlib import sha256
@@ -35,46 +34,23 @@ class RawPacketRepository:
# For malformed packets, hash the full data
payload_hash = sha256(data).digest()
# Check if this payload already exists
cursor = await db.conn.execute(
"INSERT OR IGNORE INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(ts, data, payload_hash),
)
await db.conn.commit()
if cursor.rowcount > 0:
assert cursor.lastrowid is not None
return (cursor.lastrowid, True)
# Duplicate payload — look up the existing row.
cursor = await db.conn.execute(
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)
)
existing = await cursor.fetchone()
if existing:
# Duplicate - return existing packet ID
logger.debug(
"Duplicate payload detected (hash=%s..., existing_id=%d)",
payload_hash.hex()[:12],
existing["id"],
)
return (existing["id"], False)
# New packet - insert with hash
try:
cursor = await db.conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(ts, data, payload_hash),
)
await db.conn.commit()
assert cursor.lastrowid is not None # INSERT always returns a row ID
return (cursor.lastrowid, True)
except sqlite3.IntegrityError:
# Race condition: another insert with same payload_hash happened between
# our SELECT and INSERT. This is expected for duplicate packets arriving
# close together. Query again to get the existing ID.
logger.debug(
"Duplicate packet detected via race condition (payload_hash=%s), dropping",
payload_hash.hex()[:16],
)
cursor = await db.conn.execute(
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)
)
existing = await cursor.fetchone()
if existing:
return (existing["id"], False)
# This shouldn't happen, but if it does, re-raise
raise
assert existing is not None
return (existing["id"], False)
@staticmethod
async def get_undecrypted_count() -> int:

View File

@@ -230,20 +230,26 @@ async def batch_cli_fetch(
operation_name: str,
commands: list[tuple[str, str]],
) -> dict[str, str | None]:
"""Send a batch of CLI commands to a server-capable contact and collect responses."""
"""Send a batch of CLI commands to a server-capable contact and collect responses.
Each command acquires and releases the radio lock independently so that
other operations (sends, syncs) can slip in between commands.
"""
results: dict[str, str | None] = {field: None for _, field in commands}
async with radio_manager.radio_operation(
operation_name,
pause_polling=True,
suspend_auto_fetch=True,
) as mc:
await _ensure_on_radio(mc, contact)
await asyncio.sleep(1.0)
for index, (cmd, field) in enumerate(commands):
if index > 0:
# Yield briefly so queued operations can acquire the lock.
await asyncio.sleep(0.25)
for index, (cmd, field) in enumerate(commands):
if index > 0:
await asyncio.sleep(1.0)
async with radio_manager.radio_operation(
operation_name,
pause_polling=True,
suspend_auto_fetch=True,
) as mc:
# Re-ensure contact is loaded each iteration; another operation
# may have evicted it while we didn't hold the lock.
await _ensure_on_radio(mc, contact)
send_result = await mc.commands.send_cmd(contact.public_key, cmd)
if send_result.type == EventType.ERROR: