From fdd82e1f77a56e438af4339681a0ba54184a9c2a Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Thu, 19 Mar 2026 20:56:36 -0700 Subject: [PATCH] Clean up orphaned contact child rows and add foreign key enforcement --- app/migrations.py | 95 +++++++++++++++++ app/repository/contacts.py | 39 +++++++ tests/test_event_handlers.py | 17 +++ tests/test_migrations.py | 197 ++++++++++++++++++++++++++++++++--- 4 files changed, 336 insertions(+), 12 deletions(-) diff --git a/app/migrations.py b/app/migrations.py index 8b03844..64c7826 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -353,6 +353,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 45) applied += 1 + # Migration 46: Clean orphaned contact child rows left by old prefix promotion + if version < 46: + logger.info("Applying migration 46: clean orphaned contact child rows") + await _migrate_046_cleanup_orphaned_contact_child_rows(conn) + await set_version(conn, 46) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -2773,3 +2780,91 @@ async def _migrate_045_rebuild_contacts_direct_route_columns(conn: aiosqlite.Con await conn.execute("DROP TABLE contacts") await conn.execute("ALTER TABLE contacts_new RENAME TO contacts") await conn.commit() + + +async def _migrate_046_cleanup_orphaned_contact_child_rows(conn: aiosqlite.Connection) -> None: + """Move uniquely resolvable orphan contact child rows onto full contacts, drop the rest.""" + existing_tables_cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='table'") + existing_tables = {row[0] for row in await existing_tables_cursor.fetchall()} + if "contacts" not in existing_tables: + await conn.commit() + return + + child_tables = [ + table + for table in ("contact_name_history", "contact_advert_paths") + if table in existing_tables + ] + if not child_tables: + await conn.commit() + return + + orphan_keys: set[str] = set() + + for table in child_tables: + cursor = await conn.execute( + f""" + SELECT DISTINCT child.public_key + FROM {table} child + LEFT JOIN contacts c ON c.public_key = child.public_key + WHERE c.public_key IS NULL + """ + ) + orphan_keys.update(row[0] for row in await cursor.fetchall()) + + for orphan_key in sorted(orphan_keys, key=len, reverse=True): + match_cursor = await conn.execute( + """ + SELECT public_key + FROM contacts + WHERE length(public_key) = 64 + AND public_key LIKE ? || '%' + ORDER BY public_key + """, + (orphan_key.lower(),), + ) + matches = [row[0] for row in await match_cursor.fetchall()] + resolved_key = matches[0] if len(matches) == 1 else None + + if resolved_key is not None: + if "contact_name_history" in child_tables: + await conn.execute( + """ + INSERT INTO contact_name_history (public_key, name, first_seen, last_seen) + SELECT ?, name, first_seen, last_seen + FROM contact_name_history + WHERE public_key = ? + ON CONFLICT(public_key, name) DO UPDATE SET + first_seen = MIN(contact_name_history.first_seen, excluded.first_seen), + last_seen = MAX(contact_name_history.last_seen, excluded.last_seen) + """, + (resolved_key, orphan_key), + ) + if "contact_advert_paths" in child_tables: + await conn.execute( + """ + INSERT INTO contact_advert_paths + (public_key, path_hex, path_len, first_seen, last_seen, heard_count) + SELECT ?, path_hex, path_len, first_seen, last_seen, heard_count + FROM contact_advert_paths + WHERE public_key = ? + ON CONFLICT(public_key, path_hex, path_len) DO UPDATE SET + first_seen = MIN(contact_advert_paths.first_seen, excluded.first_seen), + last_seen = MAX(contact_advert_paths.last_seen, excluded.last_seen), + heard_count = contact_advert_paths.heard_count + excluded.heard_count + """, + (resolved_key, orphan_key), + ) + + if "contact_name_history" in child_tables: + await conn.execute( + "DELETE FROM contact_name_history WHERE public_key = ?", + (orphan_key,), + ) + if "contact_advert_paths" in child_tables: + await conn.execute( + "DELETE FROM contact_advert_paths WHERE public_key = ?", + (orphan_key,), + ) + + await conn.commit() diff --git a/app/repository/contacts.py b/app/repository/contacts.py index c57830c..7b5e00e 100644 --- a/app/repository/contacts.py +++ b/app/repository/contacts.py @@ -431,6 +431,43 @@ class ContactRepository: Returns the placeholder public keys that were merged into the full key. """ + + async def migrate_child_rows(old_key: str, new_key: str) -> None: + await db.conn.execute( + """ + INSERT INTO contact_name_history (public_key, name, first_seen, last_seen) + SELECT ?, name, first_seen, last_seen + FROM contact_name_history + WHERE public_key = ? + ON CONFLICT(public_key, name) DO UPDATE SET + first_seen = MIN(contact_name_history.first_seen, excluded.first_seen), + last_seen = MAX(contact_name_history.last_seen, excluded.last_seen) + """, + (new_key, old_key), + ) + await db.conn.execute( + """ + INSERT INTO contact_advert_paths + (public_key, path_hex, path_len, first_seen, last_seen, heard_count) + SELECT ?, path_hex, path_len, first_seen, last_seen, heard_count + FROM contact_advert_paths + WHERE public_key = ? + ON CONFLICT(public_key, path_hex, path_len) DO UPDATE SET + first_seen = MIN(contact_advert_paths.first_seen, excluded.first_seen), + last_seen = MAX(contact_advert_paths.last_seen, excluded.last_seen), + heard_count = contact_advert_paths.heard_count + excluded.heard_count + """, + (new_key, old_key), + ) + await db.conn.execute( + "DELETE FROM contact_name_history WHERE public_key = ?", + (old_key,), + ) + await db.conn.execute( + "DELETE FROM contact_advert_paths WHERE public_key = ?", + (old_key,), + ) + normalized_full_key = full_key.lower() cursor = await db.conn.execute( """ @@ -467,6 +504,8 @@ class ContactRepository: if (match_row["match_count"] if match_row is not None else 0) != 1: continue + await migrate_child_rows(old_key, normalized_full_key) + if full_exists: await db.conn.execute( """ diff --git a/tests/test_event_handlers.py b/tests/test_event_handlers.py index 5c810c0..8433d37 100644 --- a/tests/test_event_handlers.py +++ b/tests/test_event_handlers.py @@ -18,6 +18,8 @@ from app.event_handlers import ( track_pending_ack, ) from app.repository import ( + ContactAdvertPathRepository, + ContactNameHistoryRepository, ContactRepository, MessageRepository, ) @@ -618,6 +620,8 @@ class TestContactMessageCLIFiltering: sender_timestamp=1700000000, received_at=1700000000, ) + await ContactNameHistoryRepository.record_name(prefix, "Prefix Sender", 1699999990) + await ContactAdvertPathRepository.record_observation(prefix, "1122", 1699999995) with patch("app.event_handlers.broadcast_event") as mock_broadcast: @@ -646,6 +650,19 @@ class TestContactMessageCLIFiltering: assert len(messages) == 1 assert messages[0].conversation_key == full_key + assert await ContactNameHistoryRepository.get_history(prefix) == [] + assert await ContactAdvertPathRepository.get_recent_for_contact(prefix) == [] + + resolved_history = await ContactNameHistoryRepository.get_history(full_key) + assert {entry.name for entry in resolved_history} == { + "Prefix Sender", + "Resolved Sender", + } + + resolved_paths = await ContactAdvertPathRepository.get_recent_for_contact(full_key) + assert len(resolved_paths) == 1 + assert resolved_paths[0].path == "1122" + event_types = [call.args[0] for call in mock_broadcast.call_args_list] assert "contact" in event_types assert "contact_resolved" in event_types diff --git a/tests/test_migrations.py b/tests/test_migrations.py index b193ce8..7d21c92 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -1247,8 +1247,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 7 - assert await get_version(conn) == 45 + assert applied == 8 + assert await get_version(conn) == 46 cursor = await conn.execute( """ @@ -1319,8 +1319,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 7 - assert await get_version(conn) == 45 + assert applied == 8 + assert await get_version(conn) == 46 cursor = await conn.execute( """ @@ -1386,8 +1386,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 1 - assert await get_version(conn) == 45 + assert applied == 2 + assert await get_version(conn) == 46 cursor = await conn.execute( """ @@ -1439,8 +1439,8 @@ class TestMigration040: applied = await run_migrations(conn) - assert applied == 6 - assert await get_version(conn) == 45 + assert applied == 7 + assert await get_version(conn) == 46 await conn.execute( """ @@ -1501,8 +1501,8 @@ class TestMigration041: applied = await run_migrations(conn) - assert applied == 5 - assert await get_version(conn) == 45 + assert applied == 6 + assert await get_version(conn) == 46 await conn.execute( """ @@ -1554,8 +1554,8 @@ class TestMigration042: applied = await run_migrations(conn) - assert applied == 4 - assert await get_version(conn) == 45 + assert applied == 5 + assert await get_version(conn) == 46 await conn.execute( """ @@ -1577,6 +1577,179 @@ class TestMigration042: await conn.close() +class TestMigration046: + """Test migration 046: clean orphaned contact child rows.""" + + @pytest.mark.asyncio + async def test_merges_uniquely_resolvable_orphans_and_drops_unresolved_ones(self): + conn = await aiosqlite.connect(":memory:") + conn.row_factory = aiosqlite.Row + try: + await set_version(conn, 45) + await conn.execute(""" + CREATE TABLE contacts ( + public_key TEXT PRIMARY KEY, + name TEXT + ) + """) + await conn.execute(""" + CREATE TABLE contact_name_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + public_key TEXT NOT NULL, + name TEXT NOT NULL, + first_seen INTEGER NOT NULL, + last_seen INTEGER NOT NULL, + UNIQUE(public_key, name) + ) + """) + await conn.execute(""" + CREATE TABLE contact_advert_paths ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + public_key TEXT NOT NULL, + path_hex TEXT NOT NULL, + path_len INTEGER NOT NULL, + first_seen INTEGER NOT NULL, + last_seen INTEGER NOT NULL, + heard_count INTEGER NOT NULL DEFAULT 1, + UNIQUE(public_key, path_hex, path_len) + ) + """) + + resolved_prefix = "abc123" + resolved_key = resolved_prefix + ("00" * 29) + ambiguous_prefix = "deadbe" + ambiguous_key_a = ambiguous_prefix + ("11" * 29) + ambiguous_key_b = ambiguous_prefix + ("22" * 29) + dead_prefix = "ffffaa" + + await conn.execute( + "INSERT INTO contacts (public_key, name) VALUES (?, ?), (?, ?), (?, ?)", + ( + resolved_key, + "Resolved Sender", + ambiguous_key_a, + "Ambiguous A", + ambiguous_key_b, + "Ambiguous B", + ), + ) + await conn.execute( + """ + INSERT INTO contact_name_history (public_key, name, first_seen, last_seen) + VALUES (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?) + """, + ( + resolved_key, + "Resolved Sender", + 900, + 905, + resolved_prefix, + "Prefix Sender", + 1000, + 1010, + ambiguous_prefix, + "Ambiguous Prefix", + 1100, + 1110, + ), + ) + await conn.execute( + """ + INSERT INTO contact_advert_paths + (public_key, path_hex, path_len, first_seen, last_seen, heard_count) + VALUES + (?, ?, ?, ?, ?, ?), + (?, ?, ?, ?, ?, ?), + (?, ?, ?, ?, ?, ?), + (?, ?, ?, ?, ?, ?) + """, + ( + resolved_key, + "1122", + 1, + 950, + 960, + 2, + resolved_prefix, + "1122", + 1, + 1001, + 1002, + 3, + ambiguous_prefix, + "3344", + 2, + 1200, + 1201, + 1, + dead_prefix, + "5566", + 1, + 1300, + 1301, + 1, + ), + ) + await conn.commit() + + applied = await run_migrations(conn) + + assert applied == 1 + assert await get_version(conn) == 46 + + cursor = await conn.execute( + """ + SELECT name, first_seen, last_seen + FROM contact_name_history + WHERE public_key = ? + ORDER BY name + """, + (resolved_key,), + ) + rows = await cursor.fetchall() + assert [(row["name"], row["first_seen"], row["last_seen"]) for row in rows] == [ + ("Prefix Sender", 1000, 1010), + ("Resolved Sender", 900, 905), + ] + + cursor = await conn.execute( + """ + SELECT path_hex, path_len, first_seen, last_seen, heard_count + FROM contact_advert_paths + WHERE public_key = ? + ORDER BY path_hex, path_len + """, + (resolved_key,), + ) + rows = await cursor.fetchall() + assert [ + ( + row["path_hex"], + row["path_len"], + row["first_seen"], + row["last_seen"], + row["heard_count"], + ) + for row in rows + ] == [ + ("1122", 1, 950, 1002, 5), + ] + + for orphan_key in (resolved_prefix, ambiguous_prefix, dead_prefix): + cursor = await conn.execute( + "SELECT COUNT(*) FROM contact_name_history WHERE public_key = ?", + (orphan_key,), + ) + assert (await cursor.fetchone())[0] == 0 + cursor = await conn.execute( + "SELECT COUNT(*) FROM contact_advert_paths WHERE public_key = ?", + (orphan_key,), + ) + assert (await cursor.fetchone())[0] == 0 + finally: + await conn.close() + + class TestMigrationPacketHelpers: """Test migration-local packet helpers against canonical path validation."""