From 8cc542ce23f38b02af7e94a1fb151a73d021d9ed Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Fri, 10 Apr 2026 15:36:53 -0700 Subject: [PATCH] Fix same-second same-message collision in room servers with per-sender disambiguation at DB level --- app/database.py | 2 +- app/migrations.py | 44 ++++++++ .../src/test/useConversationMessages.test.ts | 74 ++++++++++++ frontend/src/utils/messageIdentity.ts | 8 +- tests/test_echo_dedup.py | 106 ++++++++++++++++++ tests/test_migrations.py | 45 ++++---- 6 files changed, 257 insertions(+), 22 deletions(-) diff --git a/app/database.py b/app/database.py index 6fa85ad..bf43ec7 100644 --- a/app/database.py +++ b/app/database.py @@ -136,7 +136,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_dedup_null_safe ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0)) WHERE type = 'CHAN'; CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_incoming_priv_dedup - ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0)) + ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0), COALESCE(sender_key, '')) WHERE type = 'PRIV' AND outgoing = 0; CREATE INDEX IF NOT EXISTS idx_messages_sender_key ON messages(sender_key); CREATE INDEX IF NOT EXISTS idx_messages_pagination diff --git a/app/migrations.py b/app/migrations.py index 871eb78..14639df 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -419,6 +419,12 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 55) applied += 1 + if version < 56: + logger.info("Applying migration 56: add sender_key to incoming PRIV dedup index") + await _migrate_056_priv_dedup_include_sender_key(conn) + await set_version(conn, 56) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -3307,3 +3313,41 @@ async def _migrate_055_favorites_to_columns(conn: aiosqlite.Connection) -> None: await conn.commit() else: raise + + +async def _migrate_056_priv_dedup_include_sender_key(conn: aiosqlite.Connection) -> None: + """Add sender_key to the incoming PRIV dedup index. + + Room-server posts are stored as PRIV messages sharing one conversation_key + (the room contact). Without sender_key in the uniqueness constraint, two + different room participants sending identical text in the same clock second + collide and the second message is silently dropped. + + Adding COALESCE(sender_key, '') is strictly more permissive — no existing + rows can conflict — so the migration only needs to rebuild the index. + """ + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='messages'" + ) + if await cursor.fetchone() is None: + await conn.commit() + return + + # The index references type, conversation_key, sender_timestamp, outgoing, + # and sender_key. Some migration tests create minimal messages tables that + # lack these columns. Skip gracefully when the schema is too old. + col_cursor = await conn.execute("PRAGMA table_info(messages)") + columns = {row[1] for row in await col_cursor.fetchall()} + required = {"type", "conversation_key", "sender_timestamp", "outgoing", "sender_key"} + if not required.issubset(columns): + await conn.commit() + return + + await conn.execute("DROP INDEX IF EXISTS idx_messages_incoming_priv_dedup") + await conn.execute( + """CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_incoming_priv_dedup + ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0), + COALESCE(sender_key, '')) + WHERE type = 'PRIV' AND outgoing = 0""" + ) + await conn.commit() diff --git a/frontend/src/test/useConversationMessages.test.ts b/frontend/src/test/useConversationMessages.test.ts index 683d1e4..937ff12 100644 --- a/frontend/src/test/useConversationMessages.test.ts +++ b/frontend/src/test/useConversationMessages.test.ts @@ -161,6 +161,80 @@ describe('getMessageContentKey', () => { expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2)); }); + + it('PRIV messages with different sender_key produce different keys (room dedup)', () => { + const msg1 = createMessage({ + type: 'PRIV', + conversation_key: 'room_pubkey', + text: 'ok', + sender_timestamp: 1700000000, + sender_key: 'alice_key', + }); + const msg2 = createMessage({ + type: 'PRIV', + conversation_key: 'room_pubkey', + text: 'ok', + sender_timestamp: 1700000000, + sender_key: 'bob_key', + }); + + expect(getMessageContentKey(msg1)).not.toBe(getMessageContentKey(msg2)); + }); + + it('PRIV messages with same sender_key still dedup (true room echo)', () => { + const msg1 = createMessage({ + type: 'PRIV', + conversation_key: 'room_pubkey', + text: 'ok', + sender_timestamp: 1700000000, + sender_key: 'alice_key', + }); + const msg2 = createMessage({ + type: 'PRIV', + conversation_key: 'room_pubkey', + text: 'ok', + sender_timestamp: 1700000000, + sender_key: 'alice_key', + }); + + expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2)); + }); + + it('CHAN messages ignore sender_key (channel dedup unchanged)', () => { + const msg1 = createMessage({ + type: 'CHAN', + text: 'hello', + sender_timestamp: 1700000000, + sender_key: 'alice_key', + }); + const msg2 = createMessage({ + type: 'CHAN', + text: 'hello', + sender_timestamp: 1700000000, + sender_key: 'bob_key', + }); + + expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2)); + }); + + it('PRIV messages with null sender_key still dedup normally', () => { + const msg1 = createMessage({ + type: 'PRIV', + conversation_key: 'contact_key', + text: 'hi', + sender_timestamp: 1700000000, + sender_key: null, + }); + const msg2 = createMessage({ + type: 'PRIV', + conversation_key: 'contact_key', + text: 'hi', + sender_timestamp: 1700000000, + sender_key: null, + }); + + expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2)); + }); }); describe('mergePendingAck', () => { diff --git a/frontend/src/utils/messageIdentity.ts b/frontend/src/utils/messageIdentity.ts index 359bc52..0fb94a9 100644 --- a/frontend/src/utils/messageIdentity.ts +++ b/frontend/src/utils/messageIdentity.ts @@ -1,10 +1,14 @@ import type { Message } from '../types'; -// Content identity matches the frontend's message-level dedup contract. +// Content identity matches the backend's message-level dedup indexes. export function getMessageContentKey(msg: Message): string { // When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs). // When null, include msg.id so each message gets a unique key — avoids silently dropping // different messages that share the same text and received_at second. const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`; - return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`; + // For incoming PRIV messages (room-server posts), include sender_key so that + // two different room participants sending identical text in the same second + // are not collapsed. Mirrors idx_messages_incoming_priv_dedup. + const senderSuffix = msg.type === 'PRIV' && msg.sender_key ? `-${msg.sender_key}` : ''; + return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}${senderSuffix}`; } diff --git a/tests/test_echo_dedup.py b/tests/test_echo_dedup.py index 69f08a6..f0a23f3 100644 --- a/tests/test_echo_dedup.py +++ b/tests/test_echo_dedup.py @@ -1286,3 +1286,109 @@ class TestMessageAckedBroadcastShape: assert isinstance(payload["ack_count"], int) assert payload["ack_count"] == 0 # Outgoing DM duplicates no longer count as delivery assert payload["packet_id"] == pkt1 + + +class TestRoomServerMessageDedup: + """Test that room-server posts from different authors are not collapsed. + + Room messages are PRIV type sharing one conversation_key (the room contact's + pubkey). The dedup index includes sender_key so that two different room + participants sending identical text in the same clock second are stored as + separate messages. + """ + + ROOM_PUB = "bb" * 32 # Room contact public key + SENDER_A_KEY = "aa" * 32 + SENDER_B_KEY = "cc" * 32 + + @pytest.mark.asyncio + async def test_distinct_room_authors_same_text_same_second_stored_separately(self, test_db): + """Two room users sending identical text in the same second produce two rows.""" + msg_id_a = await MessageRepository.create( + msg_type="PRIV", + text="ok", + conversation_key=self.ROOM_PUB, + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP, + outgoing=False, + sender_key=self.SENDER_A_KEY, + sender_name="Alice", + ) + assert msg_id_a is not None + + msg_id_b = await MessageRepository.create( + msg_type="PRIV", + text="ok", + conversation_key=self.ROOM_PUB, + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP + 1, + outgoing=False, + sender_key=self.SENDER_B_KEY, + sender_name="Bob", + ) + assert msg_id_b is not None, ( + "Second room post with different sender_key should not be deduped" + ) + assert msg_id_a != msg_id_b + + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=self.ROOM_PUB, limit=10 + ) + assert len(messages) == 2 + + @pytest.mark.asyncio + async def test_same_room_author_same_text_same_second_still_deduped(self, test_db): + """True echo from the same room author is still collapsed (same sender_key).""" + msg_id_1 = await MessageRepository.create( + msg_type="PRIV", + text="ok", + conversation_key=self.ROOM_PUB, + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP, + outgoing=False, + sender_key=self.SENDER_A_KEY, + sender_name="Alice", + ) + assert msg_id_1 is not None + + msg_id_2 = await MessageRepository.create( + msg_type="PRIV", + text="ok", + conversation_key=self.ROOM_PUB, + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP + 1, + outgoing=False, + sender_key=self.SENDER_A_KEY, + sender_name="Alice", + ) + assert msg_id_2 is None, "Same sender_key should still be deduped" + + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=self.ROOM_PUB, limit=10 + ) + assert len(messages) == 1 + + @pytest.mark.asyncio + async def test_null_sender_key_still_dedupes_normally(self, test_db): + """Non-room incoming DMs (sender_key=None) still dedupe on content.""" + msg_id_1 = await MessageRepository.create( + msg_type="PRIV", + text="hello", + conversation_key=CONTACT_PUB.lower(), + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP, + outgoing=False, + sender_key=None, + ) + assert msg_id_1 is not None + + msg_id_2 = await MessageRepository.create( + msg_type="PRIV", + text="hello", + conversation_key=CONTACT_PUB.lower(), + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP + 1, + outgoing=False, + sender_key=None, + ) + assert msg_id_2 is None, "Both NULL sender_key should still collide" diff --git a/tests/test_migrations.py b/tests/test_migrations.py index a7ac19a..c545f3b 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -7,6 +7,12 @@ import pytest from app.migrations import get_version, run_migrations, set_version +# Updated automatically when a new migration is added. Migration tests that +# run ``run_migrations`` to completion assert ``get_version == LATEST`` and +# ``applied == LATEST - starting_version`` so only this constant needs to +# change, not every individual assertion. +LATEST_SCHEMA_VERSION = 56 + class TestMigration001: """Test migration 001: add last_read_at columns.""" @@ -833,9 +839,9 @@ class TestMigration044: assert [row["message_id"] for row in await cursor.fetchall()] == [1, 1] cursor = await conn.execute( - "INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at, outgoing) " - "VALUES (?, ?, ?, ?, ?, ?)", - ("PRIV", "abc123", "hello", 0, 9999, 0), + "INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at, outgoing, sender_key) " + "VALUES (?, ?, ?, ?, ?, ?, ?)", + ("PRIV", "abc123", "hello", 0, 9999, 0, "abc123"), ) assert cursor.rowcount == 0 @@ -844,6 +850,7 @@ class TestMigration044: ) index_sql = (await cursor.fetchone())["sql"] assert "WHERE type = 'PRIV' AND outgoing = 0" in index_sql + assert "sender_key" in index_sql finally: await conn.close() @@ -1224,8 +1231,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 17 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 38 + assert await get_version(conn) == LATEST_SCHEMA_VERSION cursor = await conn.execute( """ @@ -1296,8 +1303,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 17 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 38 + assert await get_version(conn) == LATEST_SCHEMA_VERSION cursor = await conn.execute( """ @@ -1363,8 +1370,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 11 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 44 + assert await get_version(conn) == LATEST_SCHEMA_VERSION cursor = await conn.execute( """ @@ -1416,8 +1423,8 @@ class TestMigration040: applied = await run_migrations(conn) - assert applied == 16 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 39 + assert await get_version(conn) == LATEST_SCHEMA_VERSION await conn.execute( """ @@ -1478,8 +1485,8 @@ class TestMigration041: applied = await run_migrations(conn) - assert applied == 15 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 40 + assert await get_version(conn) == LATEST_SCHEMA_VERSION await conn.execute( """ @@ -1531,8 +1538,8 @@ class TestMigration042: applied = await run_migrations(conn) - assert applied == 14 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 41 + assert await get_version(conn) == LATEST_SCHEMA_VERSION await conn.execute( """ @@ -1671,8 +1678,8 @@ class TestMigration046: applied = await run_migrations(conn) - assert applied == 10 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 45 + assert await get_version(conn) == LATEST_SCHEMA_VERSION cursor = await conn.execute( """ @@ -1765,8 +1772,8 @@ class TestMigration047: applied = await run_migrations(conn) - assert applied == 9 - assert await get_version(conn) == 55 + assert applied == LATEST_SCHEMA_VERSION - 46 + assert await get_version(conn) == LATEST_SCHEMA_VERSION cursor = await conn.execute( """