Clean up orphaned contact child rows and add foreign key enforcement

This commit is contained in:
Jack Kingsman
2026-03-19 20:56:36 -07:00
parent 9d129260fd
commit fdd82e1f77
4 changed files with 336 additions and 12 deletions

View File

@@ -353,6 +353,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
await set_version(conn, 45)
applied += 1
# Migration 46: Clean orphaned contact child rows left by old prefix promotion
if version < 46:
logger.info("Applying migration 46: clean orphaned contact child rows")
await _migrate_046_cleanup_orphaned_contact_child_rows(conn)
await set_version(conn, 46)
applied += 1
if applied > 0:
logger.info(
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
@@ -2773,3 +2780,91 @@ async def _migrate_045_rebuild_contacts_direct_route_columns(conn: aiosqlite.Con
await conn.execute("DROP TABLE contacts")
await conn.execute("ALTER TABLE contacts_new RENAME TO contacts")
await conn.commit()
async def _migrate_046_cleanup_orphaned_contact_child_rows(conn: aiosqlite.Connection) -> None:
"""Move uniquely resolvable orphan contact child rows onto full contacts, drop the rest."""
existing_tables_cursor = await conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
existing_tables = {row[0] for row in await existing_tables_cursor.fetchall()}
if "contacts" not in existing_tables:
await conn.commit()
return
child_tables = [
table
for table in ("contact_name_history", "contact_advert_paths")
if table in existing_tables
]
if not child_tables:
await conn.commit()
return
orphan_keys: set[str] = set()
for table in child_tables:
cursor = await conn.execute(
f"""
SELECT DISTINCT child.public_key
FROM {table} child
LEFT JOIN contacts c ON c.public_key = child.public_key
WHERE c.public_key IS NULL
"""
)
orphan_keys.update(row[0] for row in await cursor.fetchall())
for orphan_key in sorted(orphan_keys, key=len, reverse=True):
match_cursor = await conn.execute(
"""
SELECT public_key
FROM contacts
WHERE length(public_key) = 64
AND public_key LIKE ? || '%'
ORDER BY public_key
""",
(orphan_key.lower(),),
)
matches = [row[0] for row in await match_cursor.fetchall()]
resolved_key = matches[0] if len(matches) == 1 else None
if resolved_key is not None:
if "contact_name_history" in child_tables:
await conn.execute(
"""
INSERT INTO contact_name_history (public_key, name, first_seen, last_seen)
SELECT ?, name, first_seen, last_seen
FROM contact_name_history
WHERE public_key = ?
ON CONFLICT(public_key, name) DO UPDATE SET
first_seen = MIN(contact_name_history.first_seen, excluded.first_seen),
last_seen = MAX(contact_name_history.last_seen, excluded.last_seen)
""",
(resolved_key, orphan_key),
)
if "contact_advert_paths" in child_tables:
await conn.execute(
"""
INSERT INTO contact_advert_paths
(public_key, path_hex, path_len, first_seen, last_seen, heard_count)
SELECT ?, path_hex, path_len, first_seen, last_seen, heard_count
FROM contact_advert_paths
WHERE public_key = ?
ON CONFLICT(public_key, path_hex, path_len) DO UPDATE SET
first_seen = MIN(contact_advert_paths.first_seen, excluded.first_seen),
last_seen = MAX(contact_advert_paths.last_seen, excluded.last_seen),
heard_count = contact_advert_paths.heard_count + excluded.heard_count
""",
(resolved_key, orphan_key),
)
if "contact_name_history" in child_tables:
await conn.execute(
"DELETE FROM contact_name_history WHERE public_key = ?",
(orphan_key,),
)
if "contact_advert_paths" in child_tables:
await conn.execute(
"DELETE FROM contact_advert_paths WHERE public_key = ?",
(orphan_key,),
)
await conn.commit()

View File

@@ -431,6 +431,43 @@ class ContactRepository:
Returns the placeholder public keys that were merged into the full key.
"""
async def migrate_child_rows(old_key: str, new_key: str) -> None:
await db.conn.execute(
"""
INSERT INTO contact_name_history (public_key, name, first_seen, last_seen)
SELECT ?, name, first_seen, last_seen
FROM contact_name_history
WHERE public_key = ?
ON CONFLICT(public_key, name) DO UPDATE SET
first_seen = MIN(contact_name_history.first_seen, excluded.first_seen),
last_seen = MAX(contact_name_history.last_seen, excluded.last_seen)
""",
(new_key, old_key),
)
await db.conn.execute(
"""
INSERT INTO contact_advert_paths
(public_key, path_hex, path_len, first_seen, last_seen, heard_count)
SELECT ?, path_hex, path_len, first_seen, last_seen, heard_count
FROM contact_advert_paths
WHERE public_key = ?
ON CONFLICT(public_key, path_hex, path_len) DO UPDATE SET
first_seen = MIN(contact_advert_paths.first_seen, excluded.first_seen),
last_seen = MAX(contact_advert_paths.last_seen, excluded.last_seen),
heard_count = contact_advert_paths.heard_count + excluded.heard_count
""",
(new_key, old_key),
)
await db.conn.execute(
"DELETE FROM contact_name_history WHERE public_key = ?",
(old_key,),
)
await db.conn.execute(
"DELETE FROM contact_advert_paths WHERE public_key = ?",
(old_key,),
)
normalized_full_key = full_key.lower()
cursor = await db.conn.execute(
"""
@@ -467,6 +504,8 @@ class ContactRepository:
if (match_row["match_count"] if match_row is not None else 0) != 1:
continue
await migrate_child_rows(old_key, normalized_full_key)
if full_exists:
await db.conn.execute(
"""

View File

@@ -18,6 +18,8 @@ from app.event_handlers import (
track_pending_ack,
)
from app.repository import (
ContactAdvertPathRepository,
ContactNameHistoryRepository,
ContactRepository,
MessageRepository,
)
@@ -618,6 +620,8 @@ class TestContactMessageCLIFiltering:
sender_timestamp=1700000000,
received_at=1700000000,
)
await ContactNameHistoryRepository.record_name(prefix, "Prefix Sender", 1699999990)
await ContactAdvertPathRepository.record_observation(prefix, "1122", 1699999995)
with patch("app.event_handlers.broadcast_event") as mock_broadcast:
@@ -646,6 +650,19 @@ class TestContactMessageCLIFiltering:
assert len(messages) == 1
assert messages[0].conversation_key == full_key
assert await ContactNameHistoryRepository.get_history(prefix) == []
assert await ContactAdvertPathRepository.get_recent_for_contact(prefix) == []
resolved_history = await ContactNameHistoryRepository.get_history(full_key)
assert {entry.name for entry in resolved_history} == {
"Prefix Sender",
"Resolved Sender",
}
resolved_paths = await ContactAdvertPathRepository.get_recent_for_contact(full_key)
assert len(resolved_paths) == 1
assert resolved_paths[0].path == "1122"
event_types = [call.args[0] for call in mock_broadcast.call_args_list]
assert "contact" in event_types
assert "contact_resolved" in event_types

View File

@@ -1247,8 +1247,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 7
assert await get_version(conn) == 45
assert applied == 8
assert await get_version(conn) == 46
cursor = await conn.execute(
"""
@@ -1319,8 +1319,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 7
assert await get_version(conn) == 45
assert applied == 8
assert await get_version(conn) == 46
cursor = await conn.execute(
"""
@@ -1386,8 +1386,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 1
assert await get_version(conn) == 45
assert applied == 2
assert await get_version(conn) == 46
cursor = await conn.execute(
"""
@@ -1439,8 +1439,8 @@ class TestMigration040:
applied = await run_migrations(conn)
assert applied == 6
assert await get_version(conn) == 45
assert applied == 7
assert await get_version(conn) == 46
await conn.execute(
"""
@@ -1501,8 +1501,8 @@ class TestMigration041:
applied = await run_migrations(conn)
assert applied == 5
assert await get_version(conn) == 45
assert applied == 6
assert await get_version(conn) == 46
await conn.execute(
"""
@@ -1554,8 +1554,8 @@ class TestMigration042:
applied = await run_migrations(conn)
assert applied == 4
assert await get_version(conn) == 45
assert applied == 5
assert await get_version(conn) == 46
await conn.execute(
"""
@@ -1577,6 +1577,179 @@ class TestMigration042:
await conn.close()
class TestMigration046:
"""Test migration 046: clean orphaned contact child rows."""
@pytest.mark.asyncio
async def test_merges_uniquely_resolvable_orphans_and_drops_unresolved_ones(self):
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 45)
await conn.execute("""
CREATE TABLE contacts (
public_key TEXT PRIMARY KEY,
name TEXT
)
""")
await conn.execute("""
CREATE TABLE contact_name_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
public_key TEXT NOT NULL,
name TEXT NOT NULL,
first_seen INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
UNIQUE(public_key, name)
)
""")
await conn.execute("""
CREATE TABLE contact_advert_paths (
id INTEGER PRIMARY KEY AUTOINCREMENT,
public_key TEXT NOT NULL,
path_hex TEXT NOT NULL,
path_len INTEGER NOT NULL,
first_seen INTEGER NOT NULL,
last_seen INTEGER NOT NULL,
heard_count INTEGER NOT NULL DEFAULT 1,
UNIQUE(public_key, path_hex, path_len)
)
""")
resolved_prefix = "abc123"
resolved_key = resolved_prefix + ("00" * 29)
ambiguous_prefix = "deadbe"
ambiguous_key_a = ambiguous_prefix + ("11" * 29)
ambiguous_key_b = ambiguous_prefix + ("22" * 29)
dead_prefix = "ffffaa"
await conn.execute(
"INSERT INTO contacts (public_key, name) VALUES (?, ?), (?, ?), (?, ?)",
(
resolved_key,
"Resolved Sender",
ambiguous_key_a,
"Ambiguous A",
ambiguous_key_b,
"Ambiguous B",
),
)
await conn.execute(
"""
INSERT INTO contact_name_history (public_key, name, first_seen, last_seen)
VALUES (?, ?, ?, ?), (?, ?, ?, ?), (?, ?, ?, ?)
""",
(
resolved_key,
"Resolved Sender",
900,
905,
resolved_prefix,
"Prefix Sender",
1000,
1010,
ambiguous_prefix,
"Ambiguous Prefix",
1100,
1110,
),
)
await conn.execute(
"""
INSERT INTO contact_advert_paths
(public_key, path_hex, path_len, first_seen, last_seen, heard_count)
VALUES
(?, ?, ?, ?, ?, ?),
(?, ?, ?, ?, ?, ?),
(?, ?, ?, ?, ?, ?),
(?, ?, ?, ?, ?, ?)
""",
(
resolved_key,
"1122",
1,
950,
960,
2,
resolved_prefix,
"1122",
1,
1001,
1002,
3,
ambiguous_prefix,
"3344",
2,
1200,
1201,
1,
dead_prefix,
"5566",
1,
1300,
1301,
1,
),
)
await conn.commit()
applied = await run_migrations(conn)
assert applied == 1
assert await get_version(conn) == 46
cursor = await conn.execute(
"""
SELECT name, first_seen, last_seen
FROM contact_name_history
WHERE public_key = ?
ORDER BY name
""",
(resolved_key,),
)
rows = await cursor.fetchall()
assert [(row["name"], row["first_seen"], row["last_seen"]) for row in rows] == [
("Prefix Sender", 1000, 1010),
("Resolved Sender", 900, 905),
]
cursor = await conn.execute(
"""
SELECT path_hex, path_len, first_seen, last_seen, heard_count
FROM contact_advert_paths
WHERE public_key = ?
ORDER BY path_hex, path_len
""",
(resolved_key,),
)
rows = await cursor.fetchall()
assert [
(
row["path_hex"],
row["path_len"],
row["first_seen"],
row["last_seen"],
row["heard_count"],
)
for row in rows
] == [
("1122", 1, 950, 1002, 5),
]
for orphan_key in (resolved_prefix, ambiguous_prefix, dead_prefix):
cursor = await conn.execute(
"SELECT COUNT(*) FROM contact_name_history WHERE public_key = ?",
(orphan_key,),
)
assert (await cursor.fetchone())[0] == 0
cursor = await conn.execute(
"SELECT COUNT(*) FROM contact_advert_paths WHERE public_key = ?",
(orphan_key,),
)
assert (await cursor.fetchone())[0] == 0
finally:
await conn.close()
class TestMigrationPacketHelpers:
"""Test migration-local packet helpers against canonical path validation."""