Move to blob storage for payload hashes

This commit is contained in:
Jack Kingsman
2026-02-27 15:46:16 -08:00
parent 6a3510ce2e
commit 194852ed16
5 changed files with 264 additions and 24 deletions

View File

@@ -58,7 +58,7 @@ CREATE TABLE IF NOT EXISTS raw_packets (
timestamp INTEGER NOT NULL,
data BLOB NOT NULL,
message_id INTEGER,
payload_hash TEXT,
payload_hash BLOB,
FOREIGN KEY (message_id) REFERENCES messages(id)
);

View File

@@ -226,6 +226,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
await set_version(conn, 27)
applied += 1
# Migration 28: Convert payload_hash from 64-char hex TEXT to 32-byte BLOB
if version < 28:
logger.info("Applying migration 28: convert payload_hash from TEXT to BLOB")
await _migrate_028_payload_hash_text_to_blob(conn)
await set_version(conn, 28)
applied += 1
if applied > 0:
logger.info(
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
@@ -1682,3 +1689,104 @@ async def _migrate_027_backfill_first_seen_from_advert_paths(conn: aiosqlite.Con
await conn.commit()
logger.debug("Backfilled first_seen from contact_advert_paths")
async def _migrate_028_payload_hash_text_to_blob(conn: aiosqlite.Connection) -> None:
"""
Convert payload_hash from 64-char hex TEXT to 32-byte BLOB.
Halves storage for both the column data and its UNIQUE index.
Uses Python bytes.fromhex() for the conversion since SQLite's unhex()
requires 3.41.0+ which may not be available on all deployments.
"""
# Guard: skip if raw_packets table doesn't exist
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='raw_packets'"
)
if not await cursor.fetchone():
logger.debug("raw_packets table does not exist, skipping payload_hash conversion")
await conn.commit()
return
# Check column types — skip if payload_hash doesn't exist or is already BLOB
cursor = await conn.execute("PRAGMA table_info(raw_packets)")
cols = {row[1]: row[2] for row in await cursor.fetchall()}
if "payload_hash" not in cols:
logger.debug("payload_hash column does not exist, skipping conversion")
await conn.commit()
return
if cols["payload_hash"].upper() == "BLOB":
logger.debug("payload_hash is already BLOB, skipping conversion")
await conn.commit()
return
logger.info("Rebuilding raw_packets to convert payload_hash TEXT → BLOB...")
# Create new table with BLOB type
await conn.execute("""
CREATE TABLE raw_packets_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL,
message_id INTEGER,
payload_hash BLOB,
FOREIGN KEY (message_id) REFERENCES messages(id)
)
""")
# Batch-convert rows: read TEXT hashes, convert to bytes, insert into new table
batch_size = 5000
cursor = await conn.execute(
"SELECT id, timestamp, data, message_id, payload_hash FROM raw_packets ORDER BY id"
)
total = 0
while True:
rows = await cursor.fetchmany(batch_size)
if not rows:
break
batch: list[tuple[int, int, bytes, int | None, bytes | None]] = []
for row in rows:
rid, ts, data, mid, ph = row[0], row[1], row[2], row[3], row[4]
if ph is not None and isinstance(ph, str):
try:
ph = bytes.fromhex(ph)
except ValueError:
# Not a valid hex string — hash the value to produce a valid BLOB
ph = sha256(ph.encode()).digest()
batch.append((rid, ts, data, mid, ph))
await conn.executemany(
"INSERT INTO raw_packets_new (id, timestamp, data, message_id, payload_hash) "
"VALUES (?, ?, ?, ?, ?)",
batch,
)
total += len(batch)
if total % 50000 == 0:
logger.info("Converted %d rows...", total)
# Preserve autoincrement sequence
cursor = await conn.execute("SELECT seq FROM sqlite_sequence WHERE name = 'raw_packets'")
seq_row = await cursor.fetchone()
if seq_row is not None:
await conn.execute(
"INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES ('raw_packets_new', ?)",
(seq_row[0],),
)
await conn.execute("DROP TABLE raw_packets")
await conn.execute("ALTER TABLE raw_packets_new RENAME TO raw_packets")
# Clean up the sqlite_sequence entry for the old temp name
await conn.execute("DELETE FROM sqlite_sequence WHERE name = 'raw_packets_new'")
# Recreate indexes
await conn.execute(
"CREATE UNIQUE INDEX idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
)
await conn.execute("CREATE INDEX idx_raw_packets_message_id ON raw_packets(message_id)")
await conn.commit()
logger.info("Converted %d payload_hash values from TEXT to BLOB", total)

View File

@@ -935,10 +935,10 @@ class RawPacketRepository:
# Compute payload hash for deduplication
payload = extract_payload(data)
if payload:
payload_hash = sha256(payload).hexdigest()
payload_hash = sha256(payload).digest()
else:
# For malformed packets, hash the full data
payload_hash = sha256(data).hexdigest()
payload_hash = sha256(data).digest()
# Check if this payload already exists
cursor = await db.conn.execute(
@@ -950,7 +950,7 @@ class RawPacketRepository:
# Duplicate - return existing packet ID
logger.debug(
"Duplicate payload detected (hash=%s..., existing_id=%d)",
payload_hash[:12],
payload_hash.hex()[:12],
existing["id"],
)
return (existing["id"], False)
@@ -970,7 +970,7 @@ class RawPacketRepository:
# close together. Query again to get the existing ID.
logger.debug(
"Duplicate packet detected via race condition (payload_hash=%s), dropping",
payload_hash[:16],
payload_hash.hex()[:16],
)
cursor = await db.conn.execute(
"SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,)

View File

@@ -100,8 +100,8 @@ class TestMigration001:
# Run migrations
applied = await run_migrations(conn)
assert applied == 27 # All migrations run
assert await get_version(conn) == 27
assert applied == 28 # All migrations run
assert await get_version(conn) == 28
# Verify columns exist by inserting and selecting
await conn.execute(
@@ -183,9 +183,9 @@ class TestMigration001:
applied1 = await run_migrations(conn)
applied2 = await run_migrations(conn)
assert applied1 == 27 # All migrations run
assert applied1 == 28 # All migrations run
assert applied2 == 0 # No migrations on second run
assert await get_version(conn) == 27
assert await get_version(conn) == 28
finally:
await conn.close()
@@ -246,8 +246,8 @@ class TestMigration001:
applied = await run_migrations(conn)
# All migrations applied (version incremented) but no error
assert applied == 27
assert await get_version(conn) == 27
assert applied == 28
assert await get_version(conn) == 28
finally:
await conn.close()
@@ -376,8 +376,8 @@ class TestMigration013:
# Run migration 13 (plus 14-27 which also run)
applied = await run_migrations(conn)
assert applied == 15
assert await get_version(conn) == 27
assert applied == 16
assert await get_version(conn) == 28
# Verify bots array was created with migrated data
cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1")
@@ -497,7 +497,7 @@ class TestMigration018:
assert await cursor.fetchone() is not None
await run_migrations(conn)
assert await get_version(conn) == 27
assert await get_version(conn) == 28
# Verify autoindex is gone
cursor = await conn.execute(
@@ -516,7 +516,11 @@ class TestMigration018:
assert rows[0]["timestamp"] == 1000
assert bytes(rows[0]["data"]) == b"\x01\x02\x03"
assert rows[0]["message_id"] is None
assert rows[0]["payload_hash"] == "hash_a"
# payload_hash was converted from TEXT to BLOB by migration 28;
# "hash_a" is not valid hex so gets sha256-hashed
from hashlib import sha256
assert bytes(rows[0]["payload_hash"]) == sha256(b"hash_a").digest()
assert rows[1]["message_id"] == 42
# Verify payload_hash unique index still works
@@ -571,8 +575,8 @@ class TestMigration018:
await conn.commit()
applied = await run_migrations(conn)
assert applied == 10 # Migrations 18-27 run (18+19 skip internally)
assert await get_version(conn) == 27
assert applied == 11 # Migrations 18-28 run (18+19 skip internally)
assert await get_version(conn) == 28
finally:
await conn.close()
@@ -644,7 +648,7 @@ class TestMigration019:
assert await cursor.fetchone() is not None
await run_migrations(conn)
assert await get_version(conn) == 27
assert await get_version(conn) == 28
# Verify autoindex is gone
cursor = await conn.execute(
@@ -710,8 +714,8 @@ class TestMigration020:
assert (await cursor.fetchone())[0] == "delete"
applied = await run_migrations(conn)
assert applied == 8 # Migrations 20-27
assert await get_version(conn) == 27
assert applied == 9 # Migrations 20-28
assert await get_version(conn) == 28
# Verify WAL mode
cursor = await conn.execute("PRAGMA journal_mode")
@@ -741,7 +745,7 @@ class TestMigration020:
await set_version(conn, 20)
applied = await run_migrations(conn)
assert applied == 7 # Migrations 21-27 still run
assert applied == 8 # Migrations 21-28 still run
# Still WAL + INCREMENTAL
cursor = await conn.execute("PRAGMA journal_mode")
@@ -750,3 +754,131 @@ class TestMigration020:
assert (await cursor.fetchone())[0] == 2
finally:
await conn.close()
class TestMigration028:
"""Test migration 028: convert payload_hash from TEXT to BLOB."""
@pytest.mark.asyncio
async def test_migration_converts_hex_text_to_blob(self):
"""Migration converts 64-char hex TEXT payload_hash values to 32-byte BLOBs."""
from hashlib import sha256
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 27)
# Create raw_packets with TEXT payload_hash (pre-migration schema)
await conn.execute("""
CREATE TABLE raw_packets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL,
message_id INTEGER,
payload_hash TEXT
)
""")
await conn.execute(
"CREATE UNIQUE INDEX idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
)
await conn.execute("CREATE INDEX idx_raw_packets_message_id ON raw_packets(message_id)")
# Insert rows with hex TEXT hashes (as produced by .hexdigest())
hash_a = sha256(b"packet_a").hexdigest()
hash_b = sha256(b"packet_b").hexdigest()
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(1000, b"\x01\x02", hash_a),
)
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, message_id, payload_hash) VALUES (?, ?, ?, ?)",
(2000, b"\x03\x04", 42, hash_b),
)
# Row with NULL payload_hash
await conn.execute(
"INSERT INTO raw_packets (timestamp, data) VALUES (?, ?)",
(3000, b"\x05\x06"),
)
await conn.commit()
applied = await run_migrations(conn)
assert applied == 1
assert await get_version(conn) == 28
# Verify payload_hash column is now BLOB
cursor = await conn.execute("PRAGMA table_info(raw_packets)")
cols = {row[1]: row[2] for row in await cursor.fetchall()}
assert cols["payload_hash"] == "BLOB"
# Verify data is preserved and converted correctly
cursor = await conn.execute(
"SELECT id, timestamp, data, message_id, payload_hash FROM raw_packets ORDER BY id"
)
rows = await cursor.fetchall()
assert len(rows) == 3
assert rows[0]["timestamp"] == 1000
assert bytes(rows[0]["data"]) == b"\x01\x02"
assert bytes(rows[0]["payload_hash"]) == sha256(b"packet_a").digest()
assert rows[0]["message_id"] is None
assert rows[1]["timestamp"] == 2000
assert bytes(rows[1]["payload_hash"]) == sha256(b"packet_b").digest()
assert rows[1]["message_id"] == 42
assert rows[2]["payload_hash"] is None
# Verify unique index works
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE name='idx_raw_packets_payload_hash'"
)
assert await cursor.fetchone() is not None
# Verify message_id index exists
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE name='idx_raw_packets_message_id'"
)
assert await cursor.fetchone() is not None
finally:
await conn.close()
@pytest.mark.asyncio
async def test_migration_skips_when_already_blob(self):
"""Migration is a no-op when payload_hash is already BLOB (fresh install)."""
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 27)
# Create raw_packets with BLOB payload_hash (new schema)
await conn.execute("""
CREATE TABLE raw_packets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL,
message_id INTEGER,
payload_hash BLOB
)
""")
await conn.execute(
"CREATE UNIQUE INDEX idx_raw_packets_payload_hash ON raw_packets(payload_hash)"
)
# Insert a row with a BLOB hash
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(1000, b"\x01", b"\xab" * 32),
)
await conn.commit()
applied = await run_migrations(conn)
assert applied == 1 # Version still bumped
assert await get_version(conn) == 28
# Verify data unchanged
cursor = await conn.execute("SELECT payload_hash FROM raw_packets")
row = await cursor.fetchone()
assert bytes(row["payload_hash"]) == b"\xab" * 32
finally:
await conn.close()

View File

@@ -137,15 +137,15 @@ class TestStatisticsCounts:
# 2 decrypted packets (linked to message), 1 undecrypted
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, message_id, payload_hash) VALUES (?, ?, ?, ?)",
(now, b"\x01", msg_id, "hash1"),
(now, b"\x01", msg_id, b"\x01" * 32),
)
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, message_id, payload_hash) VALUES (?, ?, ?, ?)",
(now, b"\x02", msg_id, "hash2"),
(now, b"\x02", msg_id, b"\x02" * 32),
)
await conn.execute(
"INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)",
(now, b"\x03", "hash3"),
(now, b"\x03", b"\x03" * 32),
)
await conn.commit()