"""Tests for database migrations.""" import aiosqlite import pytest from app.migrations import get_version, run_migrations, set_version class TestMigrationSystem: """Test the migration version tracking system.""" @pytest.mark.asyncio async def test_get_version_returns_zero_for_new_db(self): """New database has user_version=0.""" conn = await aiosqlite.connect(":memory:") try: version = await get_version(conn) assert version == 0 finally: await conn.close() @pytest.mark.asyncio async def test_set_version_updates_pragma(self): """Setting version updates the user_version pragma.""" conn = await aiosqlite.connect(":memory:") try: await set_version(conn, 5) version = await get_version(conn) assert version == 5 finally: await conn.close() class TestMigration001: """Test migration 001: add last_read_at columns.""" @pytest.mark.asyncio async def test_migration_adds_last_read_at_to_contacts(self): """Migration adds last_read_at column to contacts table.""" conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row try: # Create schema without last_read_at (simulating pre-migration state) await conn.execute(""" CREATE TABLE contacts ( public_key TEXT PRIMARY KEY, name TEXT, type INTEGER DEFAULT 0, flags INTEGER DEFAULT 0, last_path TEXT, last_path_len INTEGER DEFAULT -1, last_advert INTEGER, lat REAL, lon REAL, last_seen INTEGER, on_radio INTEGER DEFAULT 0, last_contacted INTEGER ) """) await conn.execute(""" CREATE TABLE channels ( key TEXT PRIMARY KEY, name TEXT NOT NULL, is_hashtag INTEGER DEFAULT 0, on_radio INTEGER DEFAULT 0 ) """) # Raw packets table with old schema (for migrations 2 and 3) await conn.execute(""" CREATE TABLE raw_packets ( id INTEGER PRIMARY KEY, timestamp INTEGER NOT NULL, data BLOB NOT NULL, decrypted INTEGER DEFAULT 0, message_id INTEGER, decrypt_attempts INTEGER DEFAULT 0, last_attempt INTEGER ) """) await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)") # Messages table with old schema (for migrations 6 and 7) await conn.execute(""" CREATE TABLE messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL, conversation_key TEXT NOT NULL, text TEXT NOT NULL, sender_timestamp INTEGER, received_at INTEGER NOT NULL, path_len INTEGER, txt_type INTEGER DEFAULT 0, signature TEXT, outgoing INTEGER DEFAULT 0, acked INTEGER DEFAULT 0, UNIQUE(type, conversation_key, text, sender_timestamp) ) """) await conn.commit() # Run migrations applied = await run_migrations(conn) assert applied == 15 # All 15 migrations run assert await get_version(conn) == 15 # Verify columns exist by inserting and selecting await conn.execute( "INSERT INTO contacts (public_key, name, last_read_at) VALUES (?, ?, ?)", ("abc123", "Test", 12345), ) await conn.execute( "INSERT INTO channels (key, name, last_read_at) VALUES (?, ?, ?)", ("KEY123", "#test", 67890), ) await conn.commit() cursor = await conn.execute( "SELECT last_read_at FROM contacts WHERE public_key = ?", ("abc123",) ) row = await cursor.fetchone() assert row["last_read_at"] == 12345 cursor = await conn.execute( "SELECT last_read_at FROM channels WHERE key = ?", ("KEY123",) ) row = await cursor.fetchone() assert row["last_read_at"] == 67890 finally: await conn.close() @pytest.mark.asyncio async def test_migration_is_idempotent(self): """Running migration multiple times is safe.""" conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row try: # Create schema without last_read_at await conn.execute(""" CREATE TABLE contacts ( public_key TEXT PRIMARY KEY, name TEXT ) """) await conn.execute(""" CREATE TABLE channels ( key TEXT PRIMARY KEY, name TEXT NOT NULL ) """) # Raw packets table with old schema (for migrations 2 and 3) await conn.execute(""" CREATE TABLE raw_packets ( id INTEGER PRIMARY KEY, timestamp INTEGER NOT NULL, data BLOB NOT NULL, decrypted INTEGER DEFAULT 0, message_id INTEGER, decrypt_attempts INTEGER DEFAULT 0, last_attempt INTEGER ) """) await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)") # Messages table with old schema (for migrations 6 and 7) await conn.execute(""" CREATE TABLE messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL, conversation_key TEXT NOT NULL, text TEXT NOT NULL, sender_timestamp INTEGER, received_at INTEGER NOT NULL, path_len INTEGER, txt_type INTEGER DEFAULT 0, signature TEXT, outgoing INTEGER DEFAULT 0, acked INTEGER DEFAULT 0, UNIQUE(type, conversation_key, text, sender_timestamp) ) """) await conn.commit() # Run migrations twice applied1 = await run_migrations(conn) applied2 = await run_migrations(conn) assert applied1 == 15 # All 15 migrations run assert applied2 == 0 # No migrations on second run assert await get_version(conn) == 15 finally: await conn.close() @pytest.mark.asyncio async def test_migration_handles_column_already_exists(self): """Migration handles case where column already exists.""" conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row try: # Create schema with last_read_at already present await conn.execute(""" CREATE TABLE contacts ( public_key TEXT PRIMARY KEY, name TEXT, last_read_at INTEGER ) """) await conn.execute(""" CREATE TABLE channels ( key TEXT PRIMARY KEY, name TEXT NOT NULL, last_read_at INTEGER ) """) # Raw packets table with old schema (for migrations 2 and 3) await conn.execute(""" CREATE TABLE raw_packets ( id INTEGER PRIMARY KEY, timestamp INTEGER NOT NULL, data BLOB NOT NULL, decrypted INTEGER DEFAULT 0, message_id INTEGER, decrypt_attempts INTEGER DEFAULT 0, last_attempt INTEGER ) """) await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)") # Messages table with old schema (for migrations 6 and 7) await conn.execute(""" CREATE TABLE messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL, conversation_key TEXT NOT NULL, text TEXT NOT NULL, sender_timestamp INTEGER, received_at INTEGER NOT NULL, path_len INTEGER, txt_type INTEGER DEFAULT 0, signature TEXT, outgoing INTEGER DEFAULT 0, acked INTEGER DEFAULT 0, UNIQUE(type, conversation_key, text, sender_timestamp) ) """) await conn.commit() # Run migrations - should not fail applied = await run_migrations(conn) # All 15 migrations applied (version incremented) but no error assert applied == 15 assert await get_version(conn) == 15 finally: await conn.close() @pytest.mark.asyncio async def test_existing_data_preserved_after_migration(self): """Migration preserves existing contact and channel data.""" conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row try: # Create schema and insert data before migration await conn.execute(""" CREATE TABLE contacts ( public_key TEXT PRIMARY KEY, name TEXT, type INTEGER DEFAULT 0 ) """) await conn.execute(""" CREATE TABLE channels ( key TEXT PRIMARY KEY, name TEXT NOT NULL, is_hashtag INTEGER DEFAULT 0 ) """) # Raw packets table with old schema (for migrations 2 and 3) await conn.execute(""" CREATE TABLE raw_packets ( id INTEGER PRIMARY KEY, timestamp INTEGER NOT NULL, data BLOB NOT NULL, decrypted INTEGER DEFAULT 0, message_id INTEGER, decrypt_attempts INTEGER DEFAULT 0, last_attempt INTEGER ) """) await conn.execute("CREATE INDEX idx_raw_packets_decrypted ON raw_packets(decrypted)") # Messages table with old schema (for migrations 6 and 7) await conn.execute(""" CREATE TABLE messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL, conversation_key TEXT NOT NULL, text TEXT NOT NULL, sender_timestamp INTEGER, received_at INTEGER NOT NULL, path_len INTEGER, txt_type INTEGER DEFAULT 0, signature TEXT, outgoing INTEGER DEFAULT 0, acked INTEGER DEFAULT 0, UNIQUE(type, conversation_key, text, sender_timestamp) ) """) await conn.execute( "INSERT INTO contacts (public_key, name, type) VALUES (?, ?, ?)", ("existingkey", "ExistingContact", 1), ) await conn.execute( "INSERT INTO channels (key, name, is_hashtag) VALUES (?, ?, ?)", ("EXISTINGCHAN", "#existing", 1), ) await conn.commit() # Run migrations await run_migrations(conn) # Verify data is preserved cursor = await conn.execute( "SELECT public_key, name, type, last_read_at FROM contacts WHERE public_key = ?", ("existingkey",), ) row = await cursor.fetchone() assert row["public_key"] == "existingkey" assert row["name"] == "ExistingContact" assert row["type"] == 1 assert row["last_read_at"] is None # New column defaults to NULL cursor = await conn.execute( "SELECT key, name, is_hashtag, last_read_at FROM channels WHERE key = ?", ("EXISTINGCHAN",), ) row = await cursor.fetchone() assert row["key"] == "EXISTINGCHAN" assert row["name"] == "#existing" assert row["is_hashtag"] == 1 assert row["last_read_at"] is None finally: await conn.close() class TestMigration013: """Test migration 013: convert bot_enabled/bot_code to multi-bot format.""" @pytest.mark.asyncio async def test_migration_converts_existing_bot_to_array(self): """Migration converts existing bot_enabled/bot_code to bots array.""" import json conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row try: # Set version to 12 (just before migration 13) await set_version(conn, 12) # Create app_settings with old bot columns await conn.execute(""" CREATE TABLE app_settings ( id INTEGER PRIMARY KEY, max_radio_contacts INTEGER DEFAULT 50, favorites TEXT DEFAULT '[]', auto_decrypt_dm_on_advert INTEGER DEFAULT 0, sidebar_sort_order TEXT DEFAULT 'recent', last_message_times TEXT DEFAULT '{}', preferences_migrated INTEGER DEFAULT 0, advert_interval INTEGER DEFAULT 0, last_advert_time INTEGER DEFAULT 0, bot_enabled INTEGER DEFAULT 0, bot_code TEXT DEFAULT '' ) """) await conn.execute( "INSERT INTO app_settings (id, bot_enabled, bot_code) VALUES (1, 1, 'def bot(): return \"hello\"')" ) await conn.commit() # Run migration 13 (plus 14+15 which also run) applied = await run_migrations(conn) assert applied == 3 assert await get_version(conn) == 15 # Verify bots array was created with migrated data cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1") row = await cursor.fetchone() bots = json.loads(row["bots"]) assert len(bots) == 1 assert bots[0]["name"] == "Bot 1" assert bots[0]["enabled"] is True assert bots[0]["code"] == 'def bot(): return "hello"' assert "id" in bots[0] # Should have a UUID finally: await conn.close() @pytest.mark.asyncio async def test_migration_creates_empty_array_when_no_bot(self): """Migration creates empty bots array when no existing bot data.""" import json conn = await aiosqlite.connect(":memory:") conn.row_factory = aiosqlite.Row try: await set_version(conn, 12) await conn.execute(""" CREATE TABLE app_settings ( id INTEGER PRIMARY KEY, max_radio_contacts INTEGER DEFAULT 50, favorites TEXT DEFAULT '[]', auto_decrypt_dm_on_advert INTEGER DEFAULT 0, sidebar_sort_order TEXT DEFAULT 'recent', last_message_times TEXT DEFAULT '{}', preferences_migrated INTEGER DEFAULT 0, advert_interval INTEGER DEFAULT 0, last_advert_time INTEGER DEFAULT 0, bot_enabled INTEGER DEFAULT 0, bot_code TEXT DEFAULT '' ) """) await conn.execute( "INSERT INTO app_settings (id, bot_enabled, bot_code) VALUES (1, 0, '')" ) await conn.commit() await run_migrations(conn) cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1") row = await cursor.fetchone() bots = json.loads(row["bots"]) assert bots == [] finally: await conn.close()