From 350c85ca6d29865088b88e7687f1c280da5df525 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Wed, 18 Mar 2026 17:16:34 -0700 Subject: [PATCH] Behave better around DM dedupe/storage. Closes #77. --- app/AGENTS.md | 4 +- app/database.py | 8 +- app/migrations.py | 159 ++++++++++++++++++++++++++++++++ app/repository/messages.py | 21 +++-- app/services/dm_ingest.py | 2 + app/services/messages.py | 4 + tests/test_echo_dedup.py | 76 +++++++++++++++ tests/test_key_normalization.py | 7 +- tests/test_migrations.py | 137 +++++++++++++++++++++++++-- tests/test_packet_pipeline.py | 11 +-- tests/test_repository.py | 41 ++++++++ 11 files changed, 436 insertions(+), 34 deletions(-) diff --git a/app/AGENTS.md b/app/AGENTS.md index c1305e4..276bd97 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -126,7 +126,7 @@ app/ ### Echo/repeat dedup - Message uniqueness: `(type, conversation_key, text, sender_timestamp)`. -- Duplicate insert is treated as an echo/repeat: the new path (if any) is appended, and the ACK count is incremented only for outgoing channel messages. Incoming repeats and direct-message duplicates may still add path data, but DM delivery state advances only from real ACK events. +- Duplicate insert is treated as an echo/repeat: the new path (if any) is appended, and the ACK count is incremented only for outgoing channel messages. Incoming direct messages with the same conversation/text/sender timestamp also collapse onto one stored row, with later observations merging path data instead of creating a second DM. ### Raw packet dedup policy @@ -364,7 +364,7 @@ tests/ The MeshCore radio protocol encodes `sender_timestamp` as a 4-byte little-endian integer (Unix seconds). This is a firmware-level wire format — the radio, the Python library (`commands/messaging.py`), and the decoder (`decoder.py`) all read/write exactly 4 bytes. Millisecond Unix timestamps would overflow 4 bytes, so higher resolution is not possible without a firmware change. -**Consequence:** Channel-message dedup still operates at 1-second granularity because the radio protocol only provides second-resolution `sender_timestamp`. Do not attempt to fix this by switching to millisecond timestamps — it will break echo dedup (the echo's 4-byte timestamp won't match the stored value) and overflow `to_bytes(4, "little")`. Direct messages no longer share that channel dedup index; they are deduplicated by raw-packet identity instead so legitimate same-text same-second DMs can coexist. +**Consequence:** Message dedup still operates at 1-second granularity because the radio protocol only provides second-resolution `sender_timestamp`. Do not attempt to fix this by switching to millisecond timestamps — it will break echo dedup (the echo's 4-byte timestamp won't match the stored value) and overflow `to_bytes(4, "little")`. Incoming DMs now share the same second-resolution content identity tradeoff as channel echoes: same-contact same-text same-second observations collapse onto one stored row. ### Outgoing DM echoes remain undecrypted diff --git a/app/database.py b/app/database.py index 838ba67..dbc7719 100644 --- a/app/database.py +++ b/app/database.py @@ -50,10 +50,9 @@ CREATE TABLE IF NOT EXISTS messages ( acked INTEGER DEFAULT 0, sender_name TEXT, sender_key TEXT - -- Deduplication: channel echoes/repeats use a channel-only unique index on - -- identical conversation/text/timestamp. Direct messages are deduplicated - -- separately via raw-packet linkage so legitimate same-text same-second DMs - -- can coexist. + -- Deduplication: channel echoes/repeats use a content/time unique index so + -- duplicate observations reconcile onto a single stored row. Legacy + -- databases may also gain an incoming-DM content index via migration 44. -- Enforced via idx_messages_dedup_null_safe (unique index) rather than a table constraint -- to avoid the storage overhead of SQLite's autoindex duplicating every message text. ); @@ -97,6 +96,7 @@ CREATE INDEX IF NOT EXISTS idx_raw_packets_message_id ON raw_packets(message_id) CREATE UNIQUE INDEX IF NOT EXISTS idx_raw_packets_payload_hash ON raw_packets(payload_hash); CREATE INDEX IF NOT EXISTS idx_contacts_on_radio ON contacts(on_radio); -- idx_messages_sender_key is created by migration 25 (after adding the sender_key column) +-- idx_messages_incoming_priv_dedup is created by migration 44 after legacy rows are reconciled CREATE INDEX IF NOT EXISTS idx_contact_advert_paths_recent ON contact_advert_paths(public_key, last_seen DESC); CREATE INDEX IF NOT EXISTS idx_contact_name_history_key diff --git a/app/migrations.py b/app/migrations.py index 31611a2..ef81561 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -8,6 +8,7 @@ This approach is safe for existing users - their databases have user_version=0, so all migrations run in order on first startup after upgrade. """ +import json import logging from hashlib import sha256 @@ -338,6 +339,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 43) applied += 1 + # Migration 44: Deduplicate incoming direct messages by content/timestamp + if version < 44: + logger.info("Applying migration 44: dedupe incoming direct messages") + await _migrate_044_dedupe_incoming_direct_messages(conn) + await set_version(conn, 44) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -2476,3 +2484,154 @@ async def _migrate_043_split_message_dedup_by_type(conn: aiosqlite.Connection) - WHERE type = 'CHAN'""" ) await conn.commit() + + +def _merge_message_paths(paths_json_values: list[str | None]) -> str | None: + """Merge multiple message path arrays into one exact-observation list.""" + merged: list[dict[str, object]] = [] + seen: set[tuple[object | None, object | None, object | None]] = set() + + for paths_json in paths_json_values: + if not paths_json: + continue + try: + parsed = json.loads(paths_json) + except (TypeError, json.JSONDecodeError): + continue + if not isinstance(parsed, list): + continue + for entry in parsed: + if not isinstance(entry, dict): + continue + key = ( + entry.get("path"), + entry.get("received_at"), + entry.get("path_len"), + ) + if key in seen: + continue + seen.add(key) + merged.append(entry) + + return json.dumps(merged) if merged else None + + +async def _migrate_044_dedupe_incoming_direct_messages(conn: aiosqlite.Connection) -> None: + """Collapse same-contact same-text same-second incoming DMs into one row.""" + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='messages'" + ) + if await cursor.fetchone() is None: + await conn.commit() + return + + cursor = await conn.execute("PRAGMA table_info(messages)") + columns = {row[1] for row in await cursor.fetchall()} + required_columns = { + "id", + "type", + "conversation_key", + "text", + "sender_timestamp", + "received_at", + "paths", + "txt_type", + "signature", + "outgoing", + "acked", + "sender_name", + "sender_key", + } + if not required_columns.issubset(columns): + logger.debug("messages table missing incoming-DM dedup columns, skipping migration 44") + await conn.commit() + return + + raw_packets_cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='raw_packets'" + ) + raw_packets_exists = await raw_packets_cursor.fetchone() is not None + + duplicate_groups_cursor = await conn.execute( + """ + SELECT conversation_key, text, + COALESCE(sender_timestamp, 0) AS normalized_sender_timestamp, + COUNT(*) AS duplicate_count + FROM messages + WHERE type = 'PRIV' AND outgoing = 0 + GROUP BY conversation_key, text, COALESCE(sender_timestamp, 0) + HAVING COUNT(*) > 1 + """ + ) + duplicate_groups = await duplicate_groups_cursor.fetchall() + + for group in duplicate_groups: + normalized_sender_timestamp = group["normalized_sender_timestamp"] + rows_cursor = await conn.execute( + """ + SELECT * + FROM messages + WHERE type = 'PRIV' AND outgoing = 0 + AND conversation_key = ? AND text = ? + AND COALESCE(sender_timestamp, 0) = ? + ORDER BY id ASC + """, + ( + group["conversation_key"], + group["text"], + normalized_sender_timestamp, + ), + ) + rows = list(await rows_cursor.fetchall()) + if len(rows) < 2: + continue + + keeper = rows[0] + duplicate_ids = [row["id"] for row in rows[1:]] + merged_paths = _merge_message_paths([row["paths"] for row in rows]) + merged_received_at = min(row["received_at"] for row in rows) + merged_txt_type = next((row["txt_type"] for row in rows if row["txt_type"] != 0), 0) + merged_signature = next((row["signature"] for row in rows if row["signature"]), None) + merged_sender_name = next((row["sender_name"] for row in rows if row["sender_name"]), None) + merged_sender_key = next((row["sender_key"] for row in rows if row["sender_key"]), None) + merged_acked = max(int(row["acked"] or 0) for row in rows) + + await conn.execute( + """ + UPDATE messages + SET received_at = ?, paths = ?, txt_type = ?, signature = ?, + acked = ?, sender_name = ?, sender_key = ? + WHERE id = ? + """, + ( + merged_received_at, + merged_paths, + merged_txt_type, + merged_signature, + merged_acked, + merged_sender_name, + merged_sender_key, + keeper["id"], + ), + ) + + if raw_packets_exists: + for duplicate_id in duplicate_ids: + await conn.execute( + "UPDATE raw_packets SET message_id = ? WHERE message_id = ?", + (keeper["id"], duplicate_id), + ) + + placeholders = ",".join("?" for _ in duplicate_ids) + await conn.execute( + f"DELETE FROM messages WHERE id IN ({placeholders})", + duplicate_ids, + ) + + await conn.execute("DROP INDEX IF EXISTS idx_messages_incoming_priv_dedup") + await conn.execute( + """CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_incoming_priv_dedup + ON messages(type, conversation_key, text, COALESCE(sender_timestamp, 0)) + WHERE type = 'PRIV' AND outgoing = 0""" + ) + await conn.commit() diff --git a/app/repository/messages.py b/app/repository/messages.py index ab6c160..8b73cf6 100644 --- a/app/repository/messages.py +++ b/app/repository/messages.py @@ -66,9 +66,10 @@ class MessageRepository: ) -> int | None: """Create a message, returning the ID or None if duplicate. - Uses INSERT OR IGNORE to handle the UNIQUE constraint on - (type, conversation_key, text, sender_timestamp). This prevents - duplicate messages when the same message arrives via multiple RF paths. + Uses INSERT OR IGNORE to handle the message dedup indexes: + - channel messages dedupe by content/timestamp for echo reconciliation + - incoming direct messages dedupe by conversation/text/timestamp so + raw-packet and fallback observations merge onto one row The path parameter is converted to the paths JSON array format. """ @@ -559,16 +560,20 @@ class MessageRepository: conversation_key: str, text: str, sender_timestamp: int | None, + outgoing: bool | None = None, ) -> "Message | None": """Look up a message by its unique content fields.""" - cursor = await db.conn.execute( - """ + query = """ SELECT * FROM messages WHERE type = ? AND conversation_key = ? AND text = ? AND (sender_timestamp = ? OR (sender_timestamp IS NULL AND ? IS NULL)) - """, - (msg_type, conversation_key, text, sender_timestamp, sender_timestamp), - ) + """ + params: list[Any] = [msg_type, conversation_key, text, sender_timestamp, sender_timestamp] + if outgoing is not None: + query += " AND outgoing = ?" + params.append(1 if outgoing else 0) + query += " ORDER BY id ASC" + cursor = await db.conn.execute(query, params) row = await cursor.fetchone() if not row: return None diff --git a/app/services/dm_ingest.py b/app/services/dm_ingest.py index c0963c7..240f748 100644 --- a/app/services/dm_ingest.py +++ b/app/services/dm_ingest.py @@ -152,6 +152,7 @@ async def _store_direct_message( conversation_key=conversation_key, text=text, sender_timestamp=sender_timestamp, + outgoing=outgoing, ) if existing_msg is not None: await reconcile_duplicate_message( @@ -185,6 +186,7 @@ async def _store_direct_message( conversation_key=conversation_key, text=text, sender_timestamp=sender_timestamp, + outgoing=outgoing, path=path, received_at=received_at, path_len=path_len, diff --git a/app/services/messages.py b/app/services/messages.py index 1b6a711..da67882 100644 --- a/app/services/messages.py +++ b/app/services/messages.py @@ -171,6 +171,7 @@ async def handle_duplicate_message( conversation_key: str, text: str, sender_timestamp: int, + outgoing: bool | None = None, path: str | None, received_at: int, path_len: int | None = None, @@ -182,6 +183,7 @@ async def handle_duplicate_message( conversation_key=conversation_key, text=text, sender_timestamp=sender_timestamp, + outgoing=outgoing, ) if not existing_msg: label = "message" if msg_type == "CHAN" else "DM" @@ -246,6 +248,7 @@ async def create_message_from_decrypted( conversation_key=channel_key_normalized, text=text, sender_timestamp=timestamp, + outgoing=None, path=path, received_at=received, path_len=path_len, @@ -355,6 +358,7 @@ async def create_fallback_channel_message( conversation_key=conversation_key_normalized, text=text, sender_timestamp=sender_timestamp, + outgoing=None, path=path, received_at=received_at, path_len=path_len, diff --git a/tests/test_echo_dedup.py b/tests/test_echo_dedup.py index a418fe3..65aba2d 100644 --- a/tests/test_echo_dedup.py +++ b/tests/test_echo_dedup.py @@ -614,6 +614,82 @@ class TestDualPathDedup: assert msg.paths is not None assert any(p.path == "bbcc" for p in msg.paths) + @pytest.mark.asyncio + async def test_incoming_duplicate_does_not_reconcile_onto_matching_outgoing_dm( + self, test_db, captured_broadcasts + ): + """Incoming DM duplicates must merge onto the incoming row, not a sent row.""" + from app.event_handlers import on_contact_message + from app.packet_processor import create_dm_message_from_decrypted + + await ContactRepository.upsert( + { + "public_key": CONTACT_PUB.lower(), + "name": "TestContact", + "type": 1, + "last_seen": SENDER_TIMESTAMP, + "last_contacted": SENDER_TIMESTAMP, + "first_seen": SENDER_TIMESTAMP, + "on_radio": False, + "out_path_hash_mode": 0, + } + ) + + outgoing_id = await MessageRepository.create( + msg_type="PRIV", + text="Mirror text", + conversation_key=CONTACT_PUB.lower(), + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP - 1, + outgoing=True, + ) + assert outgoing_id is not None + + pkt_id, _ = await RawPacketRepository.create(b"incoming_primary", SENDER_TIMESTAMP) + decrypted = DecryptedDirectMessage( + timestamp=SENDER_TIMESTAMP, + flags=0, + message="Mirror text", + dest_hash="fa", + src_hash="a1", + ) + + broadcasts, mock_broadcast = captured_broadcasts + + with patch("app.packet_processor.broadcast_event", mock_broadcast): + incoming_id = await create_dm_message_from_decrypted( + packet_id=pkt_id, + decrypted=decrypted, + their_public_key=CONTACT_PUB, + our_public_key=OUR_PUB, + received_at=SENDER_TIMESTAMP, + outgoing=False, + ) + + assert incoming_id is not None + broadcasts.clear() + + mock_event = MagicMock() + mock_event.payload = { + "public_key": CONTACT_PUB, + "text": "Mirror text", + "txt_type": 0, + "sender_timestamp": SENDER_TIMESTAMP, + "path": "bbcc", + "path_len": 2, + } + + with patch("app.event_handlers.broadcast_event", mock_broadcast): + await on_contact_message(mock_event) + + incoming_msg = await MessageRepository.get_by_id(incoming_id) + outgoing_msg = await MessageRepository.get_by_id(outgoing_id) + assert incoming_msg is not None + assert outgoing_msg is not None + assert incoming_msg.paths is not None + assert any(p.path == "bbcc" for p in incoming_msg.paths) + assert outgoing_msg.paths is None + @pytest.mark.asyncio async def test_fallback_path_duplicate_reconciles_path_without_new_row( self, test_db, captured_broadcasts diff --git a/tests/test_key_normalization.py b/tests/test_key_normalization.py index b558476..1490888 100644 --- a/tests/test_key_normalization.py +++ b/tests/test_key_normalization.py @@ -78,8 +78,8 @@ async def test_null_sender_timestamp_defaults_to_received_at(test_db): @pytest.mark.asyncio -async def test_direct_messages_with_same_text_and_timestamp_are_allowed(test_db): - """Direct messages no longer share the channel echo dedup index.""" +async def test_incoming_direct_messages_with_same_text_and_timestamp_dedup(test_db): + """Incoming direct messages now collapse onto one content-identity row.""" received_at = 600 msg_id1 = await MessageRepository.create( msg_type="PRIV", @@ -97,8 +97,7 @@ async def test_direct_messages_with_same_text_and_timestamp_are_allowed(test_db) sender_timestamp=received_at, received_at=received_at, ) - assert msg_id2 is not None - assert msg_id2 != msg_id1 + assert msg_id2 is None @pytest.mark.asyncio diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 18a78d0..88b1b30 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -1,5 +1,7 @@ """Tests for database migrations.""" +import json + import aiosqlite import pytest @@ -754,6 +756,121 @@ class TestMigration020: await conn.close() +class TestMigration044: + """Test migration 044: dedupe incoming direct messages.""" + + @pytest.mark.asyncio + async def test_migration_merges_incoming_dm_duplicates_and_adds_index(self): + """Migration 44 collapses duplicate incoming DMs and re-links raw packets.""" + conn = await aiosqlite.connect(":memory:") + conn.row_factory = aiosqlite.Row + try: + await set_version(conn, 43) + + await conn.execute( + """ + CREATE TABLE messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + type TEXT NOT NULL, + conversation_key TEXT NOT NULL, + text TEXT NOT NULL, + sender_timestamp INTEGER, + received_at INTEGER NOT NULL, + paths TEXT, + txt_type INTEGER DEFAULT 0, + signature TEXT, + outgoing INTEGER DEFAULT 0, + acked INTEGER DEFAULT 0, + sender_name TEXT, + sender_key TEXT + ) + """ + ) + await conn.execute( + """ + CREATE TABLE raw_packets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp INTEGER NOT NULL, + data BLOB NOT NULL, + message_id INTEGER + ) + """ + ) + await conn.execute( + """ + INSERT INTO messages + (id, type, conversation_key, text, sender_timestamp, received_at, paths, + txt_type, signature, outgoing, acked, sender_name, sender_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (1, "PRIV", "abc123", "hello", 0, 1001, None, 0, None, 0, 0, None, "abc123"), + ) + await conn.execute( + """ + INSERT INTO messages + (id, type, conversation_key, text, sender_timestamp, received_at, paths, + txt_type, signature, outgoing, acked, sender_name, sender_key) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + 2, + "PRIV", + "abc123", + "hello", + None, + 1002, + json.dumps([{"path": "", "received_at": 1002, "path_len": 0}]), + 2, + "abcd", + 0, + 0, + "Alice", + "abc123", + ), + ) + await conn.execute( + "INSERT INTO raw_packets (timestamp, data, message_id) VALUES (?, ?, ?)", + (1001, b"pkt1", 1), + ) + await conn.execute( + "INSERT INTO raw_packets (timestamp, data, message_id) VALUES (?, ?, ?)", + (1002, b"pkt2", 2), + ) + await conn.commit() + + await run_migrations(conn) + + cursor = await conn.execute("SELECT * FROM messages") + rows = await cursor.fetchall() + assert len(rows) == 1 + assert rows[0]["id"] == 1 + assert rows[0]["received_at"] == 1001 + assert rows[0]["signature"] == "abcd" + assert rows[0]["txt_type"] == 2 + assert rows[0]["sender_name"] == "Alice" + assert json.loads(rows[0]["paths"]) == [ + {"path": "", "received_at": 1002, "path_len": 0} + ] + + cursor = await conn.execute("SELECT message_id FROM raw_packets ORDER BY id") + assert [row["message_id"] for row in await cursor.fetchall()] == [1, 1] + + cursor = await conn.execute( + "INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at, outgoing) " + "VALUES (?, ?, ?, ?, ?, ?)", + ("PRIV", "abc123", "hello", 0, 9999, 0), + ) + assert cursor.rowcount == 0 + + cursor = await conn.execute( + "SELECT sql FROM sqlite_master WHERE name='idx_messages_incoming_priv_dedup'" + ) + index_sql = (await cursor.fetchone())["sql"] + assert "WHERE type = 'PRIV' AND outgoing = 0" in index_sql + finally: + await conn.close() + + class TestMigration028: """Test migration 028: convert payload_hash from TEXT to BLOB.""" @@ -1130,8 +1247,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 5 - assert await get_version(conn) == 43 + assert applied == 6 + assert await get_version(conn) == 44 cursor = await conn.execute( """ @@ -1200,8 +1317,8 @@ class TestMigration039: applied = await run_migrations(conn) - assert applied == 5 - assert await get_version(conn) == 43 + assert applied == 6 + assert await get_version(conn) == 44 cursor = await conn.execute( """ @@ -1254,8 +1371,8 @@ class TestMigration040: applied = await run_migrations(conn) - assert applied == 4 - assert await get_version(conn) == 43 + assert applied == 5 + assert await get_version(conn) == 44 await conn.execute( """ @@ -1316,8 +1433,8 @@ class TestMigration041: applied = await run_migrations(conn) - assert applied == 3 - assert await get_version(conn) == 43 + assert applied == 4 + assert await get_version(conn) == 44 await conn.execute( """ @@ -1369,8 +1486,8 @@ class TestMigration042: applied = await run_migrations(conn) - assert applied == 2 - assert await get_version(conn) == 43 + assert applied == 3 + assert await get_version(conn) == 44 await conn.execute( """ diff --git a/tests/test_packet_pipeline.py b/tests/test_packet_pipeline.py index 5da2cc9..4e9360f 100644 --- a/tests/test_packet_pipeline.py +++ b/tests/test_packet_pipeline.py @@ -944,10 +944,10 @@ class TestCreateDMMessageFromDecrypted: assert len(message_broadcasts) == 1 @pytest.mark.asyncio - async def test_allows_same_text_same_second_dms_from_distinct_packets( + async def test_dedupes_same_text_same_second_incoming_dms_from_distinct_packets( self, test_db, captured_broadcasts ): - """Distinct DM packets with the same text/timestamp both store.""" + """Distinct incoming DM observations with the same text/timestamp merge.""" from app.decoder import DecryptedDirectMessage from app.packet_processor import create_dm_message_from_decrypted @@ -983,16 +983,15 @@ class TestCreateDMMessageFromDecrypted: ) assert msg_id_1 is not None - assert msg_id_2 is not None - assert msg_id_1 != msg_id_2 + assert msg_id_2 is None messages = await MessageRepository.get_all( msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10 ) - assert len(messages) == 2 + assert len(messages) == 1 message_broadcasts = [b for b in broadcasts if b["type"] == "message"] - assert len(message_broadcasts) == 2 + assert len(message_broadcasts) == 1 @pytest.mark.asyncio async def test_links_raw_packet_to_dm_message(self, test_db, captured_broadcasts): diff --git a/tests/test_repository.py b/tests/test_repository.py index 0e787ab..178de93 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -171,6 +171,47 @@ class TestMessageRepositoryGetByContent: assert result.sender_timestamp is None assert result.outgoing is True + @pytest.mark.asyncio + async def test_get_by_content_can_filter_incoming_vs_outgoing(self, test_db): + """Outgoing filter keeps incoming duplicate reconciliation on the right row.""" + conversation_key = "abc123abc123abc123abc123abc12300" + incoming_id = await _create_message( + test_db, + msg_type="PRIV", + conversation_key=conversation_key, + text="Same text", + sender_timestamp=1700000000, + outgoing=False, + ) + outgoing_id = await _create_message( + test_db, + msg_type="PRIV", + conversation_key=conversation_key, + text="Same text", + sender_timestamp=1700000000, + outgoing=True, + ) + + incoming = await MessageRepository.get_by_content( + msg_type="PRIV", + conversation_key=conversation_key, + text="Same text", + sender_timestamp=1700000000, + outgoing=False, + ) + outgoing = await MessageRepository.get_by_content( + msg_type="PRIV", + conversation_key=conversation_key, + text="Same text", + sender_timestamp=1700000000, + outgoing=True, + ) + + assert incoming is not None + assert outgoing is not None + assert incoming.id == incoming_id + assert outgoing.id == outgoing_id + @pytest.mark.asyncio async def test_get_by_content_distinguishes_by_timestamp(self, test_db): """Different sender_timestamps are distinguished correctly."""