Fix DM collapse on same second send

This commit is contained in:
Jack Kingsman
2026-03-16 14:53:48 -07:00
parent 04733b6a02
commit 370ff115b4
39 changed files with 310 additions and 142 deletions
+11 -19
View File
@@ -331,12 +331,9 @@ class TestDMEchoDetection:
assert msg_id is not None
broadcasts.clear()
# Duplicate arrives via different path
pkt2, _ = await RawPacketRepository.create(b"dm_in_2", SENDER_TIMESTAMP + 1)
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await create_dm_message_from_decrypted(
packet_id=pkt2,
packet_id=pkt1,
decrypted=decrypted,
their_public_key=CONTACT_PUB,
our_public_key=OUR_PUB,
@@ -388,12 +385,9 @@ class TestDMEchoDetection:
assert msg_id is not None
broadcasts.clear()
# Duplicate arrives, also with no path
pkt2, _ = await RawPacketRepository.create(b"dm_np_2", SENDER_TIMESTAMP + 1)
with patch("app.packet_processor.broadcast_event", mock_broadcast):
result = await create_dm_message_from_decrypted(
packet_id=pkt2,
packet_id=pkt1,
decrypted=decrypted,
their_public_key=CONTACT_PUB,
our_public_key=OUR_PUB,
@@ -832,21 +826,19 @@ class TestDirectMessageDirectionDetection:
class TestConcurrentDMDedup:
"""Test that concurrent DM processing deduplicates via atomic INSERT OR IGNORE.
"""Test that concurrent DM processing deduplicates by raw-packet identity.
On a mesh network, the same DM packet can arrive via two RF paths nearly
simultaneously, causing two concurrent calls to create_dm_message_from_decrypted.
SQLite's INSERT OR IGNORE ensures only one message is stored.
On a mesh network, the same DM payload can be observed twice before the first
handler finishes. Both arrivals reuse the same raw_packets row and should end
up attached to a single message.
"""
@pytest.mark.asyncio
async def test_concurrent_identical_dms_only_store_once(self, test_db, captured_broadcasts):
"""Two concurrent create_dm_message_from_decrypted calls with identical content
should result in exactly one stored message."""
async def test_concurrent_same_packet_dms_only_store_once(self, test_db, captured_broadcasts):
"""Two concurrent handlers for the same raw DM packet store one message."""
from app.packet_processor import create_dm_message_from_decrypted
pkt1, _ = await RawPacketRepository.create(b"concurrent_dm_1", SENDER_TIMESTAMP)
pkt2, _ = await RawPacketRepository.create(b"concurrent_dm_2", SENDER_TIMESTAMP + 1)
packet_id, _ = await RawPacketRepository.create(b"concurrent_dm_1", SENDER_TIMESTAMP)
decrypted = DecryptedDirectMessage(
timestamp=SENDER_TIMESTAMP,
@@ -861,7 +853,7 @@ class TestConcurrentDMDedup:
with patch("app.packet_processor.broadcast_event", mock_broadcast):
results = await asyncio.gather(
create_dm_message_from_decrypted(
packet_id=pkt1,
packet_id=packet_id,
decrypted=decrypted,
their_public_key=CONTACT_PUB,
our_public_key=OUR_PUB,
@@ -870,7 +862,7 @@ class TestConcurrentDMDedup:
outgoing=False,
),
create_dm_message_from_decrypted(
packet_id=pkt2,
packet_id=packet_id,
decrypted=decrypted,
their_public_key=CONTACT_PUB,
our_public_key=OUR_PUB,
+4 -3
View File
@@ -78,8 +78,8 @@ async def test_null_sender_timestamp_defaults_to_received_at(test_db):
@pytest.mark.asyncio
async def test_duplicate_with_same_text_and_null_timestamp_rejected(test_db):
"""Two messages with same content and sender_timestamp should be deduped."""
async def test_direct_messages_with_same_text_and_timestamp_are_allowed(test_db):
"""Direct messages no longer share the channel echo dedup index."""
received_at = 600
msg_id1 = await MessageRepository.create(
msg_type="PRIV",
@@ -97,7 +97,8 @@ async def test_duplicate_with_same_text_and_null_timestamp_rejected(test_db):
sender_timestamp=received_at,
received_at=received_at,
)
assert msg_id2 is None # duplicate rejected
assert msg_id2 is not None
assert msg_id2 != msg_id1
@pytest.mark.asyncio
+26 -12
View File
@@ -574,7 +574,7 @@ class TestMigration019:
@pytest.mark.asyncio
async def test_migration_drops_messages_unique_constraint(self):
"""Migration rebuilds messages without UNIQUE, preserving data and dedup index."""
"""Migration rebuilds messages without UNIQUE, preserving data and channel dedup index."""
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
@@ -657,7 +657,7 @@ class TestMigration019:
assert rows[1]["type"] == "PRIV"
assert rows[1]["outgoing"] == 1
# Verify dedup index still works (INSERT OR IGNORE should ignore duplicates)
# Verify channel dedup index still works (INSERT OR IGNORE should ignore duplicates)
cursor = await conn.execute(
"INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at) "
"VALUES (?, ?, ?, ?, ?)",
@@ -665,11 +665,25 @@ class TestMigration019:
)
assert cursor.rowcount == 0 # Duplicate ignored
# Direct messages no longer use the shared dedup index.
cursor = await conn.execute(
"INSERT OR IGNORE INTO messages (type, conversation_key, text, sender_timestamp, received_at) "
"VALUES (?, ?, ?, ?, ?)",
("PRIV", "abc123", "dm text", 2000, 9999),
)
assert cursor.rowcount == 1
# Verify dedup index exists
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE name='idx_messages_dedup_null_safe'"
)
assert await cursor.fetchone() is not None
cursor = await conn.execute(
"SELECT sql FROM sqlite_master WHERE name='idx_messages_dedup_null_safe'"
)
index_sql = (await cursor.fetchone())["sql"]
assert "WHERE type = 'CHAN'" in index_sql
finally:
await conn.close()
@@ -1116,8 +1130,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 4
assert await get_version(conn) == 42
assert applied == 5
assert await get_version(conn) == 43
cursor = await conn.execute(
"""
@@ -1186,8 +1200,8 @@ class TestMigration039:
applied = await run_migrations(conn)
assert applied == 4
assert await get_version(conn) == 42
assert applied == 5
assert await get_version(conn) == 43
cursor = await conn.execute(
"""
@@ -1240,8 +1254,8 @@ class TestMigration040:
applied = await run_migrations(conn)
assert applied == 3
assert await get_version(conn) == 42
assert applied == 4
assert await get_version(conn) == 43
await conn.execute(
"""
@@ -1302,8 +1316,8 @@ class TestMigration041:
applied = await run_migrations(conn)
assert applied == 2
assert await get_version(conn) == 42
assert applied == 3
assert await get_version(conn) == 43
await conn.execute(
"""
@@ -1355,8 +1369,8 @@ class TestMigration042:
applied = await run_migrations(conn)
assert applied == 1
assert await get_version(conn) == 42
assert applied == 2
assert await get_version(conn) == 43
await conn.execute(
"""
+60 -8
View File
@@ -896,8 +896,58 @@ class TestCreateDMMessageFromDecrypted:
assert message_broadcasts[0]["data"]["outgoing"] is True
@pytest.mark.asyncio
async def test_returns_none_for_duplicate_dm(self, test_db, captured_broadcasts):
"""create_dm_message_from_decrypted returns None for duplicate DM."""
async def test_returns_none_for_same_raw_packet_duplicate_dm(
self, test_db, captured_broadcasts
):
"""Reprocessing the same raw DM packet reuses the existing message."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import create_dm_message_from_decrypted
packet_id, _ = await RawPacketRepository.create(b"dm_packet_1", 1700000000)
decrypted = DecryptedDirectMessage(
timestamp=1700000000,
flags=0,
message="Duplicate DM test",
dest_hash="fa",
src_hash="a1",
)
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
# First call creates the message
msg_id_1 = await create_dm_message_from_decrypted(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=self.A1B2C3_PUB,
our_public_key=self.FACE12_PUB,
received_at=1700000001,
outgoing=False,
)
# Second call for the same packet returns None and does not create a new row
msg_id_2 = await create_dm_message_from_decrypted(
packet_id=packet_id,
decrypted=decrypted,
their_public_key=self.A1B2C3_PUB,
our_public_key=self.FACE12_PUB,
received_at=1700000002,
outgoing=False,
)
assert msg_id_1 is not None
assert msg_id_2 is None # Duplicate detected
# Only one message broadcast
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
@pytest.mark.asyncio
async def test_allows_same_text_same_second_dms_from_distinct_packets(
self, test_db, captured_broadcasts
):
"""Distinct DM packets with the same text/timestamp both store."""
from app.decoder import DecryptedDirectMessage
from app.packet_processor import create_dm_message_from_decrypted
@@ -915,7 +965,6 @@ class TestCreateDMMessageFromDecrypted:
broadcasts, mock_broadcast = captured_broadcasts
with patch("app.packet_processor.broadcast_event", mock_broadcast):
# First call creates the message
msg_id_1 = await create_dm_message_from_decrypted(
packet_id=packet_id_1,
decrypted=decrypted,
@@ -924,8 +973,6 @@ class TestCreateDMMessageFromDecrypted:
received_at=1700000001,
outgoing=False,
)
# Second call with same content returns None
msg_id_2 = await create_dm_message_from_decrypted(
packet_id=packet_id_2,
decrypted=decrypted,
@@ -936,11 +983,16 @@ class TestCreateDMMessageFromDecrypted:
)
assert msg_id_1 is not None
assert msg_id_2 is None # Duplicate detected
assert msg_id_2 is not None
assert msg_id_1 != msg_id_2
messages = await MessageRepository.get_all(
msg_type="PRIV", conversation_key=self.A1B2C3_PUB.lower(), limit=10
)
assert len(messages) == 2
# Only one message broadcast
message_broadcasts = [b for b in broadcasts if b["type"] == "message"]
assert len(message_broadcasts) == 1
assert len(message_broadcasts) == 2
@pytest.mark.asyncio
async def test_links_raw_packet_to_dm_message(self, test_db, captured_broadcasts):