diff --git a/app/packet_processor.py b/app/packet_processor.py index a87e4cb..36ab6d0 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -39,6 +39,7 @@ from app.repository import ( ChannelRepository, ContactAdvertPathRepository, ContactRepository, + MessageRepository, RawPacketRepository, ) from app.services.contact_reconciliation import ( @@ -645,10 +646,30 @@ async def _process_direct_message( ) if result is not None: - # Successfully decrypted! + # In the ambiguous direction case (both first bytes match), we + # defaulted to incoming. Check if a matching outgoing message + # already exists — if so, this is actually our own outgoing echo + # and should be treated as such instead of creating a duplicate + # incoming row. + effective_outgoing = is_outgoing + if not is_outgoing and dest_hash == src_hash: + existing_outgoing = await MessageRepository.get_by_content( + msg_type="PRIV", + conversation_key=contact.public_key.lower(), + text=result.message, + sender_timestamp=result.timestamp, + outgoing=True, + ) + if existing_outgoing is not None: + effective_outgoing = True + logger.debug( + "Ambiguous DM resolved as outgoing echo (matched existing sent msg %d)", + existing_outgoing.id, + ) + logger.debug( "Decrypted DM %s contact %s: %s", - "to" if is_outgoing else "from", + "to" if effective_outgoing else "from", contact.name or contact.public_key[:12], result.message[:50] if result.message else "", ) @@ -664,7 +685,7 @@ async def _process_direct_message( path_len=packet_info.path_length if packet_info else None, rssi=rssi, snr=snr, - outgoing=is_outgoing, + outgoing=effective_outgoing, ) return { diff --git a/app/services/message_send.py b/app/services/message_send.py index 299f5a3..9425b9d 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -264,38 +264,43 @@ async def send_channel_message_with_effective_scope( return send_result finally: if override_scope and override_scope != baseline_scope: - try: - restore_result = await mc.commands.set_flood_scope( - baseline_scope if baseline_scope else "" - ) - if restore_result is not None and restore_result.type == EventType.ERROR: - logger.error( - "Failed to restore baseline flood_scope after sending to %s: %s", + restored = False + for attempt in range(3): + try: + restore_result = await mc.commands.set_flood_scope( + baseline_scope if baseline_scope else "" + ) + if restore_result is not None and restore_result.type == EventType.ERROR: + logger.warning( + "Attempt %d/3: failed to restore flood_scope after sending to %s: %s", + attempt + 1, + channel.name, + restore_result.payload, + ) + else: + logger.debug( + "Restored baseline flood_scope after channel send: %r", + baseline_scope or "(disabled)", + ) + restored = True + break + except Exception: + logger.exception( + "Attempt %d/3: exception restoring flood_scope after sending to %s", + attempt + 1, channel.name, - restore_result.payload, ) - error_broadcast_fn( - "Regional override restore failed", - ( - f"Sent to {channel.name}, but restoring flood scope failed. " - "The radio may still be region-scoped. Consider rebooting the radio." - ), - ) - else: - logger.debug( - "Restored baseline flood_scope after channel send: %r", - baseline_scope or "(disabled)", - ) - except Exception: - logger.exception( - "Failed to restore baseline flood_scope after sending to %s", + if not restored: + logger.error( + "All 3 attempts to restore flood_scope failed for %s", channel.name, ) error_broadcast_fn( "Regional override restore failed", ( - f"Sent to {channel.name}, but restoring flood scope failed. " - "The radio may still be region-scoped. Consider rebooting the radio." + f"Sent to {channel.name}, but restoring flood scope failed " + f"after 3 attempts. The radio may still be region-scoped. " + f"Consider rebooting the radio." ), ) diff --git a/tests/test_echo_dedup.py b/tests/test_echo_dedup.py index f0a23f3..909a40d 100644 --- a/tests/test_echo_dedup.py +++ b/tests/test_echo_dedup.py @@ -987,6 +987,130 @@ class TestDirectMessageDirectionDetection: assert len(messages) == 1 assert messages[0].outgoing is False # Defaults to incoming + @pytest.mark.asyncio + async def test_ambiguous_direction_resolves_outgoing_echo(self, test_db, captured_broadcasts): + """Ambiguous direction resolves to outgoing when a matching sent message exists. + + Uses real colliding keys where both public keys start with 0xAA. + Without the fix, the echo would be stored as a second (incoming) row. + """ + from app.packet_processor import _process_direct_message + + our_pub = "AAAA09479CF6FD6733CF052769E7C229CB86CA7F81E82439F9E4EB832CA7F8DC" + contact_pub = "AAAA2A563964F9B66E25E81FE6931B0E72AF585AEF79F43C1364DB4F6F882F07" + our_pub_bytes = bytes.fromhex(our_pub) + first_byte = "aa" + + await ContactRepository.upsert( + {"public_key": contact_pub, "name": "CollidingContact", "type": 1} + ) + + # The send endpoint already stored the outgoing message + outgoing_id = await MessageRepository.create( + msg_type="PRIV", + text="Echo collision test", + conversation_key=contact_pub.lower(), + sender_timestamp=SENDER_TIMESTAMP, + received_at=SENDER_TIMESTAMP, + outgoing=True, + ) + assert outgoing_id is not None + + packet_info = MagicMock() + packet_info.payload = bytes([0xAA, 0xAA, 0x00, 0x00]) + b"\x00" * 20 + packet_info.path = b"\xbb" + packet_info.path_length = 1 + + decrypted = DecryptedDirectMessage( + timestamp=SENDER_TIMESTAMP, + flags=0, + message="Echo collision test", + dest_hash=first_byte, + src_hash=first_byte, + ) + + pkt_id, _ = await RawPacketRepository.create(b"ambig_echo", SENDER_TIMESTAMP + 1) + broadcasts, mock_broadcast = captured_broadcasts + + with ( + patch("app.packet_processor.has_private_key", return_value=True), + patch("app.packet_processor.get_private_key", return_value=b"\x00" * 32), + patch("app.packet_processor.get_public_key", return_value=our_pub_bytes), + patch("app.packet_processor.try_decrypt_dm", return_value=decrypted), + patch("app.packet_processor.broadcast_event", mock_broadcast), + ): + result = await _process_direct_message( + b"\x00" * 40, pkt_id, SENDER_TIMESTAMP + 1, packet_info + ) + + assert result is not None + + # Should have exactly one message — the original outgoing, not a ghost incoming + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=contact_pub.lower(), limit=10 + ) + assert len(messages) == 1 + assert messages[0].outgoing is True + assert messages[0].id == outgoing_id + + # Path from the echo should have been added to the outgoing message + ack_broadcasts = [b for b in broadcasts if b["type"] == "message_acked"] + assert len(ack_broadcasts) == 1 + assert ack_broadcasts[0]["data"]["message_id"] == outgoing_id + assert any(p["path"] == "bb" for p in ack_broadcasts[0]["data"]["paths"]) + + @pytest.mark.asyncio + async def test_ambiguous_direction_genuine_incoming_still_stored( + self, test_db, captured_broadcasts + ): + """Ambiguous direction with no matching outgoing message stores as incoming.""" + from app.packet_processor import _process_direct_message + + our_pub = "AAAA09479CF6FD6733CF052769E7C229CB86CA7F81E82439F9E4EB832CA7F8DC" + contact_pub = "AAAA2A563964F9B66E25E81FE6931B0E72AF585AEF79F43C1364DB4F6F882F07" + our_pub_bytes = bytes.fromhex(our_pub) + first_byte = "aa" + + await ContactRepository.upsert( + {"public_key": contact_pub, "name": "CollidingContact", "type": 1} + ) + + # No outgoing message exists — this is a genuine incoming DM + packet_info = MagicMock() + packet_info.payload = bytes([0xAA, 0xAA, 0x00, 0x00]) + b"\x00" * 20 + packet_info.path = b"" + packet_info.path_length = 0 + + decrypted = DecryptedDirectMessage( + timestamp=SENDER_TIMESTAMP, + flags=0, + message="Genuine incoming", + dest_hash=first_byte, + src_hash=first_byte, + ) + + pkt_id, _ = await RawPacketRepository.create(b"ambig_genuine", SENDER_TIMESTAMP) + broadcasts, mock_broadcast = captured_broadcasts + + with ( + patch("app.packet_processor.has_private_key", return_value=True), + patch("app.packet_processor.get_private_key", return_value=b"\x00" * 32), + patch("app.packet_processor.get_public_key", return_value=our_pub_bytes), + patch("app.packet_processor.try_decrypt_dm", return_value=decrypted), + patch("app.packet_processor.broadcast_event", mock_broadcast), + ): + result = await _process_direct_message( + b"\x00" * 40, pkt_id, SENDER_TIMESTAMP, packet_info + ) + + assert result is not None + + messages = await MessageRepository.get_all( + msg_type="PRIV", conversation_key=contact_pub.lower(), limit=10 + ) + assert len(messages) == 1 + assert messages[0].outgoing is False # Still incoming when no outgoing match + @pytest.mark.asyncio async def test_neither_hash_matches_returns_none(self, test_db, captured_broadcasts): """Neither hash byte matches us → not our message → returns None."""