diff --git a/app/database.py b/app/database.py index 42436a3..0c25ce4 100644 --- a/app/database.py +++ b/app/database.py @@ -58,7 +58,7 @@ CREATE TABLE IF NOT EXISTS raw_packets ( timestamp INTEGER NOT NULL, data BLOB NOT NULL, message_id INTEGER, - payload_hash TEXT, + payload_hash BLOB, FOREIGN KEY (message_id) REFERENCES messages(id) ); diff --git a/app/migrations.py b/app/migrations.py index 6fa0240..27b08e8 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -226,6 +226,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 27) applied += 1 + # Migration 28: Convert payload_hash from 64-char hex TEXT to 32-byte BLOB + if version < 28: + logger.info("Applying migration 28: convert payload_hash from TEXT to BLOB") + await _migrate_028_payload_hash_text_to_blob(conn) + await set_version(conn, 28) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -1682,3 +1689,104 @@ async def _migrate_027_backfill_first_seen_from_advert_paths(conn: aiosqlite.Con await conn.commit() logger.debug("Backfilled first_seen from contact_advert_paths") + + +async def _migrate_028_payload_hash_text_to_blob(conn: aiosqlite.Connection) -> None: + """ + Convert payload_hash from 64-char hex TEXT to 32-byte BLOB. + + Halves storage for both the column data and its UNIQUE index. + Uses Python bytes.fromhex() for the conversion since SQLite's unhex() + requires 3.41.0+ which may not be available on all deployments. + """ + # Guard: skip if raw_packets table doesn't exist + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='raw_packets'" + ) + if not await cursor.fetchone(): + logger.debug("raw_packets table does not exist, skipping payload_hash conversion") + await conn.commit() + return + + # Check column types — skip if payload_hash doesn't exist or is already BLOB + cursor = await conn.execute("PRAGMA table_info(raw_packets)") + cols = {row[1]: row[2] for row in await cursor.fetchall()} + if "payload_hash" not in cols: + logger.debug("payload_hash column does not exist, skipping conversion") + await conn.commit() + return + if cols["payload_hash"].upper() == "BLOB": + logger.debug("payload_hash is already BLOB, skipping conversion") + await conn.commit() + return + + logger.info("Rebuilding raw_packets to convert payload_hash TEXT → BLOB...") + + # Create new table with BLOB type + await conn.execute(""" + CREATE TABLE raw_packets_new ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + data BLOB NOT NULL, + message_id INTEGER, + payload_hash BLOB, + FOREIGN KEY (message_id) REFERENCES messages(id) + ) + """) + + # Batch-convert rows: read TEXT hashes, convert to bytes, insert into new table + batch_size = 5000 + cursor = await conn.execute( + "SELECT id, timestamp, data, message_id, payload_hash FROM raw_packets ORDER BY id" + ) + + total = 0 + while True: + rows = await cursor.fetchmany(batch_size) + if not rows: + break + + batch: list[tuple[int, int, bytes, int | None, bytes | None]] = [] + for row in rows: + rid, ts, data, mid, ph = row[0], row[1], row[2], row[3], row[4] + if ph is not None and isinstance(ph, str): + try: + ph = bytes.fromhex(ph) + except ValueError: + # Not a valid hex string — hash the value to produce a valid BLOB + ph = sha256(ph.encode()).digest() + batch.append((rid, ts, data, mid, ph)) + + await conn.executemany( + "INSERT INTO raw_packets_new (id, timestamp, data, message_id, payload_hash) " + "VALUES (?, ?, ?, ?, ?)", + batch, + ) + total += len(batch) + + if total % 50000 == 0: + logger.info("Converted %d rows...", total) + + # Preserve autoincrement sequence + cursor = await conn.execute("SELECT seq FROM sqlite_sequence WHERE name = 'raw_packets'") + seq_row = await cursor.fetchone() + if seq_row is not None: + await conn.execute( + "INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES ('raw_packets_new', ?)", + (seq_row[0],), + ) + + await conn.execute("DROP TABLE raw_packets") + await conn.execute("ALTER TABLE raw_packets_new RENAME TO raw_packets") + + # Clean up the sqlite_sequence entry for the old temp name + await conn.execute("DELETE FROM sqlite_sequence WHERE name = 'raw_packets_new'") + + # Recreate indexes + await conn.execute( + "CREATE UNIQUE INDEX idx_raw_packets_payload_hash ON raw_packets(payload_hash)" + ) + await conn.execute("CREATE INDEX idx_raw_packets_message_id ON raw_packets(message_id)") + + await conn.commit() + logger.info("Converted %d payload_hash values from TEXT to BLOB", total) diff --git a/app/repository.py b/app/repository.py index e4511b4..170c5dd 100644 --- a/app/repository.py +++ b/app/repository.py @@ -935,10 +935,10 @@ class RawPacketRepository: # Compute payload hash for deduplication payload = extract_payload(data) if payload: - payload_hash = sha256(payload).hexdigest() + payload_hash = sha256(payload).digest() else: # For malformed packets, hash the full data - payload_hash = sha256(data).hexdigest() + payload_hash = sha256(data).digest() # Check if this payload already exists cursor = await db.conn.execute( @@ -950,7 +950,7 @@ class RawPacketRepository: # Duplicate - return existing packet ID logger.debug( "Duplicate payload detected (hash=%s..., existing_id=%d)", - payload_hash[:12], + payload_hash.hex()[:12], existing["id"], ) return (existing["id"], False) @@ -970,7 +970,7 @@ class RawPacketRepository: # close together. Query again to get the existing ID. logger.debug( "Duplicate packet detected via race condition (payload_hash=%s), dropping", - payload_hash[:16], + payload_hash.hex()[:16], ) cursor = await db.conn.execute( "SELECT id FROM raw_packets WHERE payload_hash = ?", (payload_hash,) diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 7b1cd9a..95c9fa1 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -100,8 +100,8 @@ class TestMigration001: # Run migrations applied = await run_migrations(conn) - assert applied == 27 # All migrations run - assert await get_version(conn) == 27 + assert applied == 28 # All migrations run + assert await get_version(conn) == 28 # Verify columns exist by inserting and selecting await conn.execute( @@ -183,9 +183,9 @@ class TestMigration001: applied1 = await run_migrations(conn) applied2 = await run_migrations(conn) - assert applied1 == 27 # All migrations run + assert applied1 == 28 # All migrations run assert applied2 == 0 # No migrations on second run - assert await get_version(conn) == 27 + assert await get_version(conn) == 28 finally: await conn.close() @@ -246,8 +246,8 @@ class TestMigration001: applied = await run_migrations(conn) # All migrations applied (version incremented) but no error - assert applied == 27 - assert await get_version(conn) == 27 + assert applied == 28 + assert await get_version(conn) == 28 finally: await conn.close() @@ -376,8 +376,8 @@ class TestMigration013: # Run migration 13 (plus 14-27 which also run) applied = await run_migrations(conn) - assert applied == 15 - assert await get_version(conn) == 27 + assert applied == 16 + assert await get_version(conn) == 28 # Verify bots array was created with migrated data cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1") @@ -497,7 +497,7 @@ class TestMigration018: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 27 + assert await get_version(conn) == 28 # Verify autoindex is gone cursor = await conn.execute( @@ -516,7 +516,11 @@ class TestMigration018: assert rows[0]["timestamp"] == 1000 assert bytes(rows[0]["data"]) == b"\x01\x02\x03" assert rows[0]["message_id"] is None - assert rows[0]["payload_hash"] == "hash_a" + # payload_hash was converted from TEXT to BLOB by migration 28; + # "hash_a" is not valid hex so gets sha256-hashed + from hashlib import sha256 + + assert bytes(rows[0]["payload_hash"]) == sha256(b"hash_a").digest() assert rows[1]["message_id"] == 42 # Verify payload_hash unique index still works @@ -571,8 +575,8 @@ class TestMigration018: await conn.commit() applied = await run_migrations(conn) - assert applied == 10 # Migrations 18-27 run (18+19 skip internally) - assert await get_version(conn) == 27 + assert applied == 11 # Migrations 18-28 run (18+19 skip internally) + assert await get_version(conn) == 28 finally: await conn.close() @@ -644,7 +648,7 @@ class TestMigration019: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 27 + assert await get_version(conn) == 28 # Verify autoindex is gone cursor = await conn.execute( @@ -710,8 +714,8 @@ class TestMigration020: assert (await cursor.fetchone())[0] == "delete" applied = await run_migrations(conn) - assert applied == 8 # Migrations 20-27 - assert await get_version(conn) == 27 + assert applied == 9 # Migrations 20-28 + assert await get_version(conn) == 28 # Verify WAL mode cursor = await conn.execute("PRAGMA journal_mode") @@ -741,7 +745,7 @@ class TestMigration020: await set_version(conn, 20) applied = await run_migrations(conn) - assert applied == 7 # Migrations 21-27 still run + assert applied == 8 # Migrations 21-28 still run # Still WAL + INCREMENTAL cursor = await conn.execute("PRAGMA journal_mode") @@ -750,3 +754,131 @@ class TestMigration020: assert (await cursor.fetchone())[0] == 2 finally: await conn.close() + + +class TestMigration028: + """Test migration 028: convert payload_hash from TEXT to BLOB.""" + + @pytest.mark.asyncio + async def test_migration_converts_hex_text_to_blob(self): + """Migration converts 64-char hex TEXT payload_hash values to 32-byte BLOBs.""" + from hashlib import sha256 + + conn = await aiosqlite.connect(":memory:") + conn.row_factory = aiosqlite.Row + try: + await set_version(conn, 27) + + # Create raw_packets with TEXT payload_hash (pre-migration schema) + await conn.execute(""" + CREATE TABLE raw_packets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + data BLOB NOT NULL, + message_id INTEGER, + payload_hash TEXT + ) + """) + await conn.execute( + "CREATE UNIQUE INDEX idx_raw_packets_payload_hash ON raw_packets(payload_hash)" + ) + await conn.execute("CREATE INDEX idx_raw_packets_message_id ON raw_packets(message_id)") + + # Insert rows with hex TEXT hashes (as produced by .hexdigest()) + hash_a = sha256(b"packet_a").hexdigest() + hash_b = sha256(b"packet_b").hexdigest() + await conn.execute( + "INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)", + (1000, b"\x01\x02", hash_a), + ) + await conn.execute( + "INSERT INTO raw_packets (timestamp, data, message_id, payload_hash) VALUES (?, ?, ?, ?)", + (2000, b"\x03\x04", 42, hash_b), + ) + # Row with NULL payload_hash + await conn.execute( + "INSERT INTO raw_packets (timestamp, data) VALUES (?, ?)", + (3000, b"\x05\x06"), + ) + await conn.commit() + + applied = await run_migrations(conn) + assert applied == 1 + assert await get_version(conn) == 28 + + # Verify payload_hash column is now BLOB + cursor = await conn.execute("PRAGMA table_info(raw_packets)") + cols = {row[1]: row[2] for row in await cursor.fetchall()} + assert cols["payload_hash"] == "BLOB" + + # Verify data is preserved and converted correctly + cursor = await conn.execute( + "SELECT id, timestamp, data, message_id, payload_hash FROM raw_packets ORDER BY id" + ) + rows = await cursor.fetchall() + assert len(rows) == 3 + + assert rows[0]["timestamp"] == 1000 + assert bytes(rows[0]["data"]) == b"\x01\x02" + assert bytes(rows[0]["payload_hash"]) == sha256(b"packet_a").digest() + assert rows[0]["message_id"] is None + + assert rows[1]["timestamp"] == 2000 + assert bytes(rows[1]["payload_hash"]) == sha256(b"packet_b").digest() + assert rows[1]["message_id"] == 42 + + assert rows[2]["payload_hash"] is None + + # Verify unique index works + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE name='idx_raw_packets_payload_hash'" + ) + assert await cursor.fetchone() is not None + + # Verify message_id index exists + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE name='idx_raw_packets_message_id'" + ) + assert await cursor.fetchone() is not None + finally: + await conn.close() + + @pytest.mark.asyncio + async def test_migration_skips_when_already_blob(self): + """Migration is a no-op when payload_hash is already BLOB (fresh install).""" + conn = await aiosqlite.connect(":memory:") + conn.row_factory = aiosqlite.Row + try: + await set_version(conn, 27) + + # Create raw_packets with BLOB payload_hash (new schema) + await conn.execute(""" + CREATE TABLE raw_packets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + data BLOB NOT NULL, + message_id INTEGER, + payload_hash BLOB + ) + """) + await conn.execute( + "CREATE UNIQUE INDEX idx_raw_packets_payload_hash ON raw_packets(payload_hash)" + ) + + # Insert a row with a BLOB hash + await conn.execute( + "INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)", + (1000, b"\x01", b"\xab" * 32), + ) + await conn.commit() + + applied = await run_migrations(conn) + assert applied == 1 # Version still bumped + assert await get_version(conn) == 28 + + # Verify data unchanged + cursor = await conn.execute("SELECT payload_hash FROM raw_packets") + row = await cursor.fetchone() + assert bytes(row["payload_hash"]) == b"\xab" * 32 + finally: + await conn.close() diff --git a/tests/test_statistics.py b/tests/test_statistics.py index 243f108..cdb2ba1 100644 --- a/tests/test_statistics.py +++ b/tests/test_statistics.py @@ -137,15 +137,15 @@ class TestStatisticsCounts: # 2 decrypted packets (linked to message), 1 undecrypted await conn.execute( "INSERT INTO raw_packets (timestamp, data, message_id, payload_hash) VALUES (?, ?, ?, ?)", - (now, b"\x01", msg_id, "hash1"), + (now, b"\x01", msg_id, b"\x01" * 32), ) await conn.execute( "INSERT INTO raw_packets (timestamp, data, message_id, payload_hash) VALUES (?, ?, ?, ?)", - (now, b"\x02", msg_id, "hash2"), + (now, b"\x02", msg_id, b"\x02" * 32), ) await conn.execute( "INSERT INTO raw_packets (timestamp, data, payload_hash) VALUES (?, ?, ?)", - (now, b"\x03", "hash3"), + (now, b"\x03", b"\x03" * 32), ) await conn.commit()