Fix same-second same-message collision in room servers with per-sender disambiguation at DB level

This commit is contained in:
Jack Kingsman
2026-04-10 15:36:53 -07:00
parent a7258c120e
commit 8cc542ce23
6 changed files with 257 additions and 22 deletions

View File

@@ -136,7 +136,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_dedup_null_safe
ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0))
WHERE type = 'CHAN';
CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_incoming_priv_dedup
ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0))
ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0), COALESCE(sender_key, ''))
WHERE type = 'PRIV' AND outgoing = 0;
CREATE INDEX IF NOT EXISTS idx_messages_sender_key ON messages(sender_key);
CREATE INDEX IF NOT EXISTS idx_messages_pagination

View File

@@ -419,6 +419,12 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
await set_version(conn, 55)
applied += 1
if version < 56:
logger.info("Applying migration 56: add sender_key to incoming PRIV dedup index")
await _migrate_056_priv_dedup_include_sender_key(conn)
await set_version(conn, 56)
applied += 1
if applied > 0:
logger.info(
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
@@ -3307,3 +3313,41 @@ async def _migrate_055_favorites_to_columns(conn: aiosqlite.Connection) -> None:
await conn.commit()
else:
raise
async def _migrate_056_priv_dedup_include_sender_key(conn: aiosqlite.Connection) -> None:
"""Add sender_key to the incoming PRIV dedup index.
Room-server posts are stored as PRIV messages sharing one conversation_key
(the room contact). Without sender_key in the uniqueness constraint, two
different room participants sending identical text in the same clock second
collide and the second message is silently dropped.
Adding COALESCE(sender_key, '') is strictly more permissive — no existing
rows can conflict — so the migration only needs to rebuild the index.
"""
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='messages'"
)
if await cursor.fetchone() is None:
await conn.commit()
return
# The index references type, conversation_key, sender_timestamp, outgoing,
# and sender_key. Some migration tests create minimal messages tables that
# lack these columns. Skip gracefully when the schema is too old.
col_cursor = await conn.execute("PRAGMA table_info(messages)")
columns = {row[1] for row in await col_cursor.fetchall()}
required = {"type", "conversation_key", "sender_timestamp", "outgoing", "sender_key"}
if not required.issubset(columns):
await conn.commit()
return
await conn.execute("DROP INDEX IF EXISTS idx_messages_incoming_priv_dedup")
await conn.execute(
"""CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_incoming_priv_dedup
ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0),
COALESCE(sender_key, ''))
WHERE type = 'PRIV' AND outgoing = 0"""
)
await conn.commit()

View File

@@ -161,6 +161,80 @@ describe('getMessageContentKey', () => {
expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2));
});
it('PRIV messages with different sender_key produce different keys (room dedup)', () => {
const msg1 = createMessage({
type: 'PRIV',
conversation_key: 'room_pubkey',
text: 'ok',
sender_timestamp: 1700000000,
sender_key: 'alice_key',
});
const msg2 = createMessage({
type: 'PRIV',
conversation_key: 'room_pubkey',
text: 'ok',
sender_timestamp: 1700000000,
sender_key: 'bob_key',
});
expect(getMessageContentKey(msg1)).not.toBe(getMessageContentKey(msg2));
});
it('PRIV messages with same sender_key still dedup (true room echo)', () => {
const msg1 = createMessage({
type: 'PRIV',
conversation_key: 'room_pubkey',
text: 'ok',
sender_timestamp: 1700000000,
sender_key: 'alice_key',
});
const msg2 = createMessage({
type: 'PRIV',
conversation_key: 'room_pubkey',
text: 'ok',
sender_timestamp: 1700000000,
sender_key: 'alice_key',
});
expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2));
});
it('CHAN messages ignore sender_key (channel dedup unchanged)', () => {
const msg1 = createMessage({
type: 'CHAN',
text: 'hello',
sender_timestamp: 1700000000,
sender_key: 'alice_key',
});
const msg2 = createMessage({
type: 'CHAN',
text: 'hello',
sender_timestamp: 1700000000,
sender_key: 'bob_key',
});
expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2));
});
it('PRIV messages with null sender_key still dedup normally', () => {
const msg1 = createMessage({
type: 'PRIV',
conversation_key: 'contact_key',
text: 'hi',
sender_timestamp: 1700000000,
sender_key: null,
});
const msg2 = createMessage({
type: 'PRIV',
conversation_key: 'contact_key',
text: 'hi',
sender_timestamp: 1700000000,
sender_key: null,
});
expect(getMessageContentKey(msg1)).toBe(getMessageContentKey(msg2));
});
});
describe('mergePendingAck', () => {

View File

@@ -1,10 +1,14 @@
import type { Message } from '../types';
// Content identity matches the frontend's message-level dedup contract.
// Content identity matches the backend's message-level dedup indexes.
export function getMessageContentKey(msg: Message): string {
// When sender_timestamp exists, dedup by content (catches radio-path duplicates with different IDs).
// When null, include msg.id so each message gets a unique key — avoids silently dropping
// different messages that share the same text and received_at second.
const ts = msg.sender_timestamp ?? `r${msg.received_at}-${msg.id}`;
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}`;
// For incoming PRIV messages (room-server posts), include sender_key so that
// two different room participants sending identical text in the same second
// are not collapsed. Mirrors idx_messages_incoming_priv_dedup.
const senderSuffix = msg.type === 'PRIV' && msg.sender_key ? `-${msg.sender_key}` : '';
return `${msg.type}-${msg.conversation_key}-${msg.text}-${ts}${senderSuffix}`;
}

View File

@@ -1286,3 +1286,109 @@ class TestMessageAckedBroadcastShape:
assert isinstance(payload["ack_count"], int)
assert payload["ack_count"] == 0 # Outgoing DM duplicates no longer count as delivery
assert payload["packet_id"] == pkt1
class TestRoomServerMessageDedup:
"""Test that room-server posts from different authors are not collapsed.
Room messages are PRIV type sharing one conversation_key (the room contact's
pubkey). The dedup index includes sender_key so that two different room
participants sending identical text in the same clock second are stored as
separate messages.
"""
ROOM_PUB = "bb" * 32 # Room contact public key
SENDER_A_KEY = "aa" * 32
SENDER_B_KEY = "cc" * 32
@pytest.mark.asyncio
async def test_distinct_room_authors_same_text_same_second_stored_separately(self, test_db):
"""Two room users sending identical text in the same second produce two rows."""
msg_id_a = await MessageRepository.create(
msg_type="PRIV",
text="ok",
conversation_key=self.ROOM_PUB,
sender_timestamp=SENDER_TIMESTAMP,
received_at=SENDER_TIMESTAMP,
outgoing=False,
sender_key=self.SENDER_A_KEY,
sender_name="Alice",
)
assert msg_id_a is not None
msg_id_b = await MessageRepository.create(
msg_type="PRIV",
text="ok",
conversation_key=self.ROOM_PUB,
sender_timestamp=SENDER_TIMESTAMP,
received_at=SENDER_TIMESTAMP + 1,
outgoing=False,
sender_key=self.SENDER_B_KEY,
sender_name="Bob",
)
assert msg_id_b is not None, (
"Second room post with different sender_key should not be deduped"
)
assert msg_id_a != msg_id_b
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.ROOM_PUB, limit=10
)
assert len(messages) == 2
@pytest.mark.asyncio
async def test_same_room_author_same_text_same_second_still_deduped(self, test_db):
"""True echo from the same room author is still collapsed (same sender_key)."""
msg_id_1 = await MessageRepository.create(
msg_type="PRIV",
text="ok",
conversation_key=self.ROOM_PUB,
sender_timestamp=SENDER_TIMESTAMP,
received_at=SENDER_TIMESTAMP,
outgoing=False,
sender_key=self.SENDER_A_KEY,
sender_name="Alice",
)
assert msg_id_1 is not None
msg_id_2 = await MessageRepository.create(
msg_type="PRIV",
text="ok",
conversation_key=self.ROOM_PUB,
sender_timestamp=SENDER_TIMESTAMP,
received_at=SENDER_TIMESTAMP + 1,
outgoing=False,
sender_key=self.SENDER_A_KEY,
sender_name="Alice",
)
assert msg_id_2 is None, "Same sender_key should still be deduped"
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.ROOM_PUB, limit=10
)
assert len(messages) == 1
@pytest.mark.asyncio
async def test_null_sender_key_still_dedupes_normally(self, test_db):
"""Non-room incoming DMs (sender_key=None) still dedupe on content."""
msg_id_1 = await MessageRepository.create(
msg_type="PRIV",
text="hello",
conversation_key=CONTACT_PUB.lower(),
sender_timestamp=SENDER_TIMESTAMP,
received_at=SENDER_TIMESTAMP,
outgoing=False,
sender_key=None,
)
assert msg_id_1 is not None
msg_id_2 = await MessageRepository.create(
msg_type="PRIV",
text="hello",
conversation_key=CONTACT_PUB.lower(),
sender_timestamp=SENDER_TIMESTAMP,
received_at=SENDER_TIMESTAMP + 1,
outgoing=False,
sender_key=None,
)
assert msg_id_2 is None, "Both NULL sender_key should still collide"

View File

@@ -7,6 +7,12 @@ import pytest
from app.migrations import get_version, run_migrations, set_version
# Updated automatically when a new migration is added. Migration tests that
# run ``run_migrations`` to completion assert ``get_version == LATEST`` and
# ``applied == LATEST - starting_version`` so only this constant needs to
# change, not every individual assertion.
LATEST_SCHEMA_VERSION = 56
class TestMigration001:
"""Test migration 001: add last_read_at columns."""
@@ -833,9 +839,9 @@ class TestMigration044:
assert [row["message_id"] for row in await cursor.fetchall()] == [1, 1]
cursor = await conn.execute(
"INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at, outgoing) "
"VALUES (?, ?, ?, ?, ?, ?)",
("PRIV", "abc123", "hello", 0, 9999, 0),
"INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at, outgoing, sender_key) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
("PRIV", "abc123", "hello", 0, 9999, 0, "abc123"),
)
assert cursor.rowcount == 0
@@ -844,6 +850,7 @@ class TestMigration044:
)
index_sql = (await cursor.fetchone())["sql"]
assert "WHERE type = 'PRIV' AND outgoing = 0" in index_sql
assert "sender_key" in index_sql
finally:
await conn.close()
@@ -1224,8 +1231,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 17
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 38
assert await get_version(conn) == LATEST_SCHEMA_VERSION
cursor = await conn.execute(
"""
@@ -1296,8 +1303,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 17
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 38
assert await get_version(conn) == LATEST_SCHEMA_VERSION
cursor = await conn.execute(
"""
@@ -1363,8 +1370,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 11
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 44
assert await get_version(conn) == LATEST_SCHEMA_VERSION
cursor = await conn.execute(
"""
@@ -1416,8 +1423,8 @@ class TestMigration040:
applied = await run_migrations(conn)
assert applied == 16
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 39
assert await get_version(conn) == LATEST_SCHEMA_VERSION
await conn.execute(
"""
@@ -1478,8 +1485,8 @@ class TestMigration041:
applied = await run_migrations(conn)
assert applied == 15
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 40
assert await get_version(conn) == LATEST_SCHEMA_VERSION
await conn.execute(
"""
@@ -1531,8 +1538,8 @@ class TestMigration042:
applied = await run_migrations(conn)
assert applied == 14
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 41
assert await get_version(conn) == LATEST_SCHEMA_VERSION
await conn.execute(
"""
@@ -1671,8 +1678,8 @@ class TestMigration046:
applied = await run_migrations(conn)
assert applied == 10
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 45
assert await get_version(conn) == LATEST_SCHEMA_VERSION
cursor = await conn.execute(
"""
@@ -1765,8 +1772,8 @@ class TestMigration047:
applied = await run_migrations(conn)
assert applied == 9
assert await get_version(conn) == 55
assert applied == LATEST_SCHEMA_VERSION - 46
assert await get_version(conn) == LATEST_SCHEMA_VERSION
cursor = await conn.execute(
"""