diff --git a/app/database.py b/app/database.py index f0cb53d..5e90215 100644 --- a/app/database.py +++ b/app/database.py @@ -51,9 +51,8 @@ CREATE TABLE IF NOT EXISTS messages ( sender_name TEXT, sender_key TEXT -- Deduplication: identical text + timestamp in the same conversation is treated as a - -- mesh echo/repeat. Second-precision timestamps mean two intentional identical messages - -- within the same second would collide, but this is not feasible in practice — LoRa - -- transmission takes several seconds per message, and the UI clears the input on send. + -- mesh echo/repeat. Outgoing sends allocate a collision-free sender_timestamp before + -- transmit so legitimate repeat sends do not collide with this index. -- Enforced via idx_messages_dedup_null_safe (unique index) rather than a table constraint -- to avoid the storage overhead of SQLite's autoindex duplicating every message text. ); diff --git a/app/event_handlers.py b/app/event_handlers.py index 617b84e..9e84d3a 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -28,11 +28,12 @@ logger = logging.getLogger(__name__) # This prevents handler duplication after reconnects _active_subscriptions: list["Subscription"] = [] _pending_acks = dm_ack_tracker._pending_acks +_buffered_acks = dm_ack_tracker._buffered_acks -def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> None: +def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool: """Compatibility wrapper for pending DM ACK tracking.""" - dm_ack_tracker.track_pending_ack(expected_ack, message_id, timeout_ms) + return dm_ack_tracker.track_pending_ack(expected_ack, message_id, timeout_ms) def cleanup_expired_acks() -> None: @@ -302,6 +303,7 @@ async def on_ack(event: "Event") -> None: # preserving any previously known paths. await increment_ack_and_broadcast(message_id=message_id, broadcast_fn=broadcast_event) else: + dm_ack_tracker.buffer_unmatched_ack(ack_code) logger.debug("ACK code %s does not match any pending messages", ack_code) diff --git a/app/services/dm_ack_tracker.py b/app/services/dm_ack_tracker.py index b882073..9012657 100644 --- a/app/services/dm_ack_tracker.py +++ b/app/services/dm_ack_tracker.py @@ -6,12 +6,26 @@ import time logger = logging.getLogger(__name__) PendingAck = tuple[int, float, int] +BUFFERED_ACK_TTL_SECONDS = 30.0 _pending_acks: dict[str, PendingAck] = {} +_buffered_acks: dict[str, float] = {} -def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> None: - """Track an expected ACK code for an outgoing direct message.""" +def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> bool: + """Track an expected ACK code for an outgoing direct message. + + Returns True when the ACK was already observed and buffered before registration. + """ + buffered_at = _buffered_acks.pop(expected_ack, None) + if buffered_at is not None: + logger.debug( + "Matched buffered ACK %s immediately for message %d", + expected_ack, + message_id, + ) + return True + _pending_acks[expected_ack] = (message_id, time.time(), timeout_ms) logger.debug( "Tracking pending ACK %s for message %d (timeout %dms)", @@ -19,6 +33,13 @@ def track_pending_ack(expected_ack: str, message_id: int, timeout_ms: int) -> No message_id, timeout_ms, ) + return False + + +def buffer_unmatched_ack(ack_code: str) -> None: + """Remember an ACK that arrived before its message registration.""" + _buffered_acks[ack_code] = time.time() + logger.debug("Buffered unmatched ACK %s for late registration", ack_code) def cleanup_expired_acks() -> None: @@ -33,6 +54,15 @@ def cleanup_expired_acks() -> None: del _pending_acks[code] logger.debug("Expired pending ACK %s", code) + expired_buffered_codes = [ + code + for code, buffered_at in _buffered_acks.items() + if now - buffered_at > BUFFERED_ACK_TTL_SECONDS + ] + for code in expired_buffered_codes: + del _buffered_acks[code] + logger.debug("Expired buffered ACK %s", code) + def pop_pending_ack(ack_code: str) -> int | None: """Claim the tracked message ID for an ACK code if present.""" diff --git a/app/services/message_send.py b/app/services/message_send.py index 1aa4d6a..6d5960b 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -1,5 +1,6 @@ """Shared send/resend orchestration for outgoing messages.""" +import asyncio import logging from collections.abc import Callable from typing import Any @@ -13,13 +14,85 @@ from app.services.messages import ( build_message_model, create_outgoing_channel_message, create_outgoing_direct_message, + increment_ack_and_broadcast, ) logger = logging.getLogger(__name__) BroadcastFn = Callable[..., Any] -TrackAckFn = Callable[[str, int, int], None] +TrackAckFn = Callable[[str, int, int], bool] NowFn = Callable[[], float] +OutgoingReservationKey = tuple[str, str, str] + +_pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {} +_outgoing_timestamp_reservations_lock = asyncio.Lock() + + +async def allocate_outgoing_sender_timestamp( + *, + message_repository, + msg_type: str, + conversation_key: str, + text: str, + requested_timestamp: int, +) -> int: + """Pick a sender timestamp that will not collide with an existing stored message.""" + reservation_key = (msg_type, conversation_key, text) + candidate = requested_timestamp + while True: + async with _outgoing_timestamp_reservations_lock: + reserved = _pending_outgoing_timestamp_reservations.get(reservation_key, set()) + is_reserved = candidate in reserved + + if is_reserved: + candidate += 1 + continue + + existing = await message_repository.get_by_content( + msg_type=msg_type, + conversation_key=conversation_key, + text=text, + sender_timestamp=candidate, + ) + if existing is not None: + candidate += 1 + continue + + async with _outgoing_timestamp_reservations_lock: + reserved = _pending_outgoing_timestamp_reservations.setdefault(reservation_key, set()) + if candidate in reserved: + candidate += 1 + continue + reserved.add(candidate) + break + + if candidate != requested_timestamp: + logger.info( + "Bumped outgoing %s timestamp for %s from %d to %d to avoid same-content collision", + msg_type, + conversation_key[:12], + requested_timestamp, + candidate, + ) + + return candidate + + +async def release_outgoing_sender_timestamp( + *, + msg_type: str, + conversation_key: str, + text: str, + sender_timestamp: int, +) -> None: + reservation_key = (msg_type, conversation_key, text) + async with _outgoing_timestamp_reservations_lock: + reserved = _pending_outgoing_timestamp_reservations.get(reservation_key) + if not reserved: + return + reserved.discard(sender_timestamp) + if not reserved: + _pending_outgoing_timestamp_reservations.pop(reservation_key, None) async def send_channel_message_with_effective_scope( @@ -175,49 +248,78 @@ async def send_direct_message_to_contact( ) -> Any: """Send a direct message and persist/broadcast the outgoing row.""" contact_data = contact.to_radio_dict() - async with radio_manager.radio_operation("send_direct_message") as mc: - logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12]) - add_result = await mc.commands.add_contact(contact_data) - if add_result.type == EventType.ERROR: - logger.warning("Failed to add contact to radio: %s", add_result.payload) + sent_at: int | None = None + sender_timestamp: int | None = None + message = None + result = None + try: + async with radio_manager.radio_operation("send_direct_message") as mc: + logger.debug("Ensuring contact %s is on radio before sending", contact.public_key[:12]) + add_result = await mc.commands.add_contact(contact_data) + if add_result.type == EventType.ERROR: + logger.warning("Failed to add contact to radio: %s", add_result.payload) - cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) - if not cached_contact: - cached_contact = contact_data + cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) + if not cached_contact: + cached_contact = contact_data - logger.info("Sending direct message to %s", contact.public_key[:12]) - now = int(now_fn()) - result = await mc.commands.send_msg( - dst=cached_contact, - msg=text, - timestamp=now, + logger.info("Sending direct message to %s", contact.public_key[:12]) + sent_at = int(now_fn()) + sender_timestamp = await allocate_outgoing_sender_timestamp( + message_repository=message_repository, + msg_type="PRIV", + conversation_key=contact.public_key.lower(), + text=text, + requested_timestamp=sent_at, + ) + result = await mc.commands.send_msg( + dst=cached_contact, + msg=text, + timestamp=sender_timestamp, + ) + + if result is None or result.type == EventType.ERROR: + raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") + + message = await create_outgoing_direct_message( + conversation_key=contact.public_key.lower(), + text=text, + sender_timestamp=sender_timestamp, + received_at=sent_at, + broadcast_fn=broadcast_fn, + message_repository=message_repository, ) + if message is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) + finally: + if sender_timestamp is not None: + await release_outgoing_sender_timestamp( + msg_type="PRIV", + conversation_key=contact.public_key.lower(), + text=text, + sender_timestamp=sender_timestamp, + ) - if result.type == EventType.ERROR: - raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") + if sent_at is None or sender_timestamp is None or message is None or result is None: + raise HTTPException(status_code=500, detail="Failed to store outgoing message") - message = await create_outgoing_direct_message( - conversation_key=contact.public_key.lower(), - text=text, - sender_timestamp=now, - received_at=now, - broadcast_fn=broadcast_fn, - message_repository=message_repository, - ) - if message is None: - raise HTTPException( - status_code=500, - detail="Failed to store outgoing message - unexpected duplicate", - ) - - await contact_repository.update_last_contacted(contact.public_key.lower(), now) + await contact_repository.update_last_contacted(contact.public_key.lower(), sent_at) expected_ack = result.payload.get("expected_ack") suggested_timeout: int = result.payload.get("suggested_timeout", 10000) if expected_ack: ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack - track_pending_ack_fn(ack_code, message.id, suggested_timeout) + matched_immediately = track_pending_ack_fn(ack_code, message.id, suggested_timeout) is True logger.debug("Tracking ACK %s for message %d", ack_code, message.id) + if matched_immediately: + ack_count = await increment_ack_and_broadcast( + message_id=message.id, + broadcast_fn=broadcast_fn, + ) + message.acked = ack_count return message @@ -236,56 +338,76 @@ async def send_channel_message_to_channel( message_repository=MessageRepository, ) -> Any: """Send a channel message and persist/broadcast the outgoing row.""" - now: int | None = None + sent_at: int | None = None + sender_timestamp: int | None = None radio_name = "" our_public_key: str | None = None text_with_sender = text + outgoing_message = None - async with radio_manager.radio_operation("send_channel_message") as mc: - radio_name = mc.self_info.get("name", "") if mc.self_info else "" - our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None - text_with_sender = f"{radio_name}: {text}" if radio_name else text - logger.info("Sending channel message to %s: %s", channel.name, text[:50]) + try: + async with radio_manager.radio_operation("send_channel_message") as mc: + radio_name = mc.self_info.get("name", "") if mc.self_info else "" + our_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None + text_with_sender = f"{radio_name}: {text}" if radio_name else text + logger.info("Sending channel message to %s: %s", channel.name, text[:50]) - now = int(now_fn()) - timestamp_bytes = now.to_bytes(4, "little") + sent_at = int(now_fn()) + sender_timestamp = await allocate_outgoing_sender_timestamp( + message_repository=message_repository, + msg_type="CHAN", + conversation_key=channel_key_upper, + text=text_with_sender, + requested_timestamp=sent_at, + ) + timestamp_bytes = sender_timestamp.to_bytes(4, "little") - result = await send_channel_message_with_effective_scope( - mc=mc, - channel=channel, - channel_key=channel_key_upper, - key_bytes=key_bytes, - text=text, - timestamp_bytes=timestamp_bytes, - action_label="sending message", - radio_manager=radio_manager, - temp_radio_slot=temp_radio_slot, - error_broadcast_fn=error_broadcast_fn, + result = await send_channel_message_with_effective_scope( + mc=mc, + channel=channel, + channel_key=channel_key_upper, + key_bytes=key_bytes, + text=text, + timestamp_bytes=timestamp_bytes, + action_label="sending message", + radio_manager=radio_manager, + temp_radio_slot=temp_radio_slot, + error_broadcast_fn=error_broadcast_fn, + ) + + if result.type == EventType.ERROR: + raise HTTPException( + status_code=500, detail=f"Failed to send message: {result.payload}" + ) + + outgoing_message = await create_outgoing_channel_message( + conversation_key=channel_key_upper, + text=text_with_sender, + sender_timestamp=sender_timestamp, + received_at=sent_at, + sender_name=radio_name or None, + sender_key=our_public_key, + channel_name=channel.name, + broadcast_fn=broadcast_fn, + message_repository=message_repository, ) + if outgoing_message is None: + raise HTTPException( + status_code=500, + detail="Failed to store outgoing message - unexpected duplicate", + ) + finally: + if sender_timestamp is not None: + await release_outgoing_sender_timestamp( + msg_type="CHAN", + conversation_key=channel_key_upper, + text=text_with_sender, + sender_timestamp=sender_timestamp, + ) - if result.type == EventType.ERROR: - raise HTTPException(status_code=500, detail=f"Failed to send message: {result.payload}") - - if now is None: + if sent_at is None or sender_timestamp is None or outgoing_message is None: raise HTTPException(status_code=500, detail="Failed to store outgoing message") - outgoing_message = await create_outgoing_channel_message( - conversation_key=channel_key_upper, - text=text_with_sender, - sender_timestamp=now, - received_at=now, - sender_name=radio_name or None, - sender_key=our_public_key, - channel_name=channel.name, - broadcast_fn=broadcast_fn, - message_repository=message_repository, - ) - if outgoing_message is None: - raise HTTPException( - status_code=500, - detail="Failed to store outgoing message - unexpected duplicate", - ) - message_id = outgoing_message.id acked_count, paths = await message_repository.get_ack_and_paths(message_id) return build_message_model( @@ -293,8 +415,8 @@ async def send_channel_message_to_channel( msg_type="CHAN", conversation_key=channel_key_upper, text=text_with_sender, - sender_timestamp=now, - received_at=now, + sender_timestamp=sender_timestamp, + received_at=sent_at, paths=paths, outgoing=True, acked=acked_count, @@ -325,61 +447,82 @@ async def resend_channel_message_record( detail=f"Invalid channel key format: {message.conversation_key}", ) from None - now: int | None = None - if new_timestamp: - now = int(now_fn()) - timestamp_bytes = now.to_bytes(4, "little") - else: - timestamp_bytes = message.sender_timestamp.to_bytes(4, "little") + sent_at: int | None = None + sender_timestamp = message.sender_timestamp + timestamp_bytes = message.sender_timestamp.to_bytes(4, "little") resend_public_key: str | None = None radio_name = "" + new_message = None + stored_text = message.text - async with radio_manager.radio_operation("resend_channel_message") as mc: - radio_name = mc.self_info.get("name", "") if mc.self_info else "" - resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None - text_to_send = message.text - if radio_name and text_to_send.startswith(f"{radio_name}: "): - text_to_send = text_to_send[len(f"{radio_name}: ") :] + try: + async with radio_manager.radio_operation("resend_channel_message") as mc: + radio_name = mc.self_info.get("name", "") if mc.self_info else "" + resend_public_key = (mc.self_info.get("public_key") or None) if mc.self_info else None + text_to_send = message.text + if radio_name and text_to_send.startswith(f"{radio_name}: "): + text_to_send = text_to_send[len(f"{radio_name}: ") :] + if new_timestamp: + sent_at = int(now_fn()) + sender_timestamp = await allocate_outgoing_sender_timestamp( + message_repository=message_repository, + msg_type="CHAN", + conversation_key=message.conversation_key, + text=stored_text, + requested_timestamp=sent_at, + ) + timestamp_bytes = sender_timestamp.to_bytes(4, "little") - result = await send_channel_message_with_effective_scope( - mc=mc, - channel=channel, - channel_key=message.conversation_key, - key_bytes=key_bytes, - text=text_to_send, - timestamp_bytes=timestamp_bytes, - action_label="resending message", - radio_manager=radio_manager, - temp_radio_slot=temp_radio_slot, - error_broadcast_fn=error_broadcast_fn, - ) - if result.type == EventType.ERROR: - raise HTTPException( - status_code=500, - detail=f"Failed to resend message: {result.payload}", + result = await send_channel_message_with_effective_scope( + mc=mc, + channel=channel, + channel_key=message.conversation_key, + key_bytes=key_bytes, + text=text_to_send, + timestamp_bytes=timestamp_bytes, + action_label="resending message", + radio_manager=radio_manager, + temp_radio_slot=temp_radio_slot, + error_broadcast_fn=error_broadcast_fn, + ) + if result.type == EventType.ERROR: + raise HTTPException( + status_code=500, + detail=f"Failed to resend message: {result.payload}", + ) + + if new_timestamp: + if sent_at is None: + raise HTTPException(status_code=500, detail="Failed to assign resend timestamp") + new_message = await create_outgoing_channel_message( + conversation_key=message.conversation_key, + text=message.text, + sender_timestamp=sender_timestamp, + received_at=sent_at, + sender_name=radio_name or None, + sender_key=resend_public_key, + channel_name=channel.name, + broadcast_fn=broadcast_fn, + message_repository=message_repository, + ) + if new_message is None: + raise HTTPException( + status_code=500, + detail="Failed to store resent message - unexpected duplicate", + ) + finally: + if new_timestamp and sent_at is not None: + await release_outgoing_sender_timestamp( + msg_type="CHAN", + conversation_key=message.conversation_key, + text=stored_text, + sender_timestamp=sender_timestamp, ) if new_timestamp: - if now is None: + if sent_at is None or new_message is None: raise HTTPException(status_code=500, detail="Failed to assign resend timestamp") - new_message = await create_outgoing_channel_message( - conversation_key=message.conversation_key, - text=message.text, - sender_timestamp=now, - received_at=now, - sender_name=radio_name or None, - sender_key=resend_public_key, - channel_name=channel.name, - broadcast_fn=broadcast_fn, - message_repository=message_repository, - ) - if new_message is None: - logger.warning( - "Duplicate timestamp collision resending message %d — radio sent but DB row not created", - message.id, - ) - return {"status": "ok", "message_id": message.id} logger.info( "Resent channel message %d as new message %d to %s", diff --git a/tests/test_api.py b/tests/test_api.py index eea9048..64be464 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -387,6 +387,7 @@ class TestMessagesEndpoint: _patch_require_connected(mock_mc), patch("app.routers.messages.MessageRepository") as mock_msg_repo, ): + mock_msg_repo.get_by_content = AsyncMock(return_value=None) # Simulate duplicate - create returns None mock_msg_repo.create = AsyncMock(return_value=None) @@ -422,6 +423,7 @@ class TestMessagesEndpoint: _patch_require_connected(mock_mc), patch("app.routers.messages.MessageRepository") as mock_msg_repo, ): + mock_msg_repo.get_by_content = AsyncMock(return_value=None) # Simulate duplicate - create returns None mock_msg_repo.create = AsyncMock(return_value=None) diff --git a/tests/test_event_handlers.py b/tests/test_event_handlers.py index 9b452b0..3353c72 100644 --- a/tests/test_event_handlers.py +++ b/tests/test_event_handlers.py @@ -11,6 +11,7 @@ import pytest from app.event_handlers import ( _active_subscriptions, + _buffered_acks, _pending_acks, cleanup_expired_acks, register_event_handlers, @@ -26,9 +27,11 @@ from app.repository import ( def clear_test_state(): """Clear pending ACKs and subscriptions before each test.""" _pending_acks.clear() + _buffered_acks.clear() _active_subscriptions.clear() yield _pending_acks.clear() + _buffered_acks.clear() _active_subscriptions.clear() @@ -107,6 +110,28 @@ class TestAckTracking: assert len(_pending_acks) == 50 assert all(k.startswith("valid_") for k in _pending_acks) + def test_track_pending_ack_consumes_buffered_ack(self): + """A buffered ACK should be matched immediately when the send registers later.""" + _buffered_acks["early"] = time.time() + + matched = track_pending_ack("early", message_id=42, timeout_ms=5000) + + assert matched is True + assert "early" not in _buffered_acks + assert "early" not in _pending_acks + + def test_cleanup_removes_expired_buffered_acks(self): + """Buffered ACKs should expire so unmatched early ACKs do not leak forever.""" + from app.services.dm_ack_tracker import BUFFERED_ACK_TTL_SECONDS + + _buffered_acks["stale"] = time.time() - (BUFFERED_ACK_TTL_SECONDS + 1) + _buffered_acks["fresh"] = time.time() + + cleanup_expired_acks() + + assert "stale" not in _buffered_acks + assert "fresh" in _buffered_acks + class TestAckEventHandler: """Test the on_ack event handler.""" @@ -174,6 +199,7 @@ class TestAckEventHandler: mock_broadcast.assert_not_called() assert "expected" in _pending_acks + assert "different" in _buffered_acks @pytest.mark.asyncio async def test_ack_empty_code_ignored(self, test_db): @@ -189,6 +215,35 @@ class TestAckEventHandler: mock_broadcast.assert_not_called() + @pytest.mark.asyncio + async def test_buffered_ack_can_be_claimed_after_early_arrival(self, test_db): + """An ACK that arrives before registration should be recoverable.""" + from app.event_handlers import on_ack + + msg_id = await MessageRepository.create( + msg_type="PRIV", + text="Hello", + received_at=1700000000, + conversation_key="aa" * 32, + sender_timestamp=1700000000, + outgoing=True, + ) + + with patch("app.event_handlers.broadcast_event") as mock_broadcast: + + class MockEvent: + payload = {"code": "earlyack"} + + await on_ack(MockEvent()) + + assert "earlyack" in _buffered_acks + assert track_pending_ack("earlyack", message_id=msg_id, timeout_ms=10000) is True + assert "earlyack" not in _buffered_acks + assert "earlyack" not in _pending_acks + ack_count, _ = await MessageRepository.get_ack_and_paths(msg_id) + assert ack_count == 0 + mock_broadcast.assert_not_called() + class TestContactMessageCLIFiltering: """Test that CLI responses (txt_type=1) are filtered out.""" diff --git a/tests/test_send_messages.py b/tests/test_send_messages.py index 6d0781a..b7e3d14 100644 --- a/tests/test_send_messages.py +++ b/tests/test_send_messages.py @@ -24,6 +24,7 @@ from app.routers.messages import ( send_channel_message, send_direct_message, ) +from app.services import dm_ack_tracker @pytest.fixture(autouse=True) @@ -35,6 +36,8 @@ def _reset_radio_state(): prev_connection_info = radio_manager._connection_info prev_slot_by_key = radio_manager._channel_slot_by_key.copy() prev_key_by_slot = radio_manager._channel_key_by_slot.copy() + prev_pending_acks = dm_ack_tracker._pending_acks.copy() + prev_buffered_acks = dm_ack_tracker._buffered_acks.copy() yield radio_manager._meshcore = prev radio_manager._operation_lock = prev_lock @@ -42,6 +45,10 @@ def _reset_radio_state(): radio_manager._connection_info = prev_connection_info radio_manager._channel_slot_by_key = prev_slot_by_key radio_manager._channel_key_by_slot = prev_key_by_slot + dm_ack_tracker._pending_acks.clear() + dm_ack_tracker._pending_acks.update(prev_pending_acks) + dm_ack_tracker._buffered_acks.clear() + dm_ack_tracker._buffered_acks.update(prev_buffered_acks) def _make_radio_result(payload=None): @@ -190,6 +197,80 @@ class TestOutgoingDMBroadcast: assert contact_payload["out_path_len"] == 2 assert contact_payload["out_path_hash_mode"] == 1 + @pytest.mark.asyncio + async def test_send_dm_same_second_duplicate_bumps_timestamp(self, test_db): + mc = _make_mc() + pub_key = "fa" * 32 + await _insert_contact(pub_key, "Alice") + + now = int(time.time()) + original_id = await MessageRepository.create( + msg_type="PRIV", + text="hello", + conversation_key=pub_key, + sender_timestamp=now, + received_at=now, + outgoing=True, + ) + assert original_id is not None + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.routers.messages.time") as mock_time, + ): + mock_time.time.return_value = float(now) + result = await send_direct_message( + SendDirectMessageRequest(destination=pub_key, text="hello") + ) + + assert result.id != original_id + assert result.sender_timestamp == now + 1 + assert result.received_at == now + assert mc.commands.send_msg.await_args.kwargs["timestamp"] == now + 1 + + @pytest.mark.asyncio + async def test_send_dm_applies_buffered_ack_from_early_arrival(self, test_db): + from app.event_handlers import on_ack + + mc = _make_mc() + ack_bytes = b"\xde\xad\xbe\xef" + result = MagicMock() + result.type = EventType.MSG_SENT + result.payload = { + "expected_ack": ack_bytes, + "suggested_timeout": 8000, + } + mc.commands.send_msg = AsyncMock(return_value=result) + + pub_key = "fb" * 32 + await _insert_contact(pub_key, "Alice") + + class MockAckEvent: + payload = {"code": "deadbeef"} + + broadcasts = [] + + def capture_broadcast(event_type, data): + broadcasts.append((event_type, data)) + + with ( + patch("app.event_handlers.broadcast_event", side_effect=capture_broadcast), + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event", side_effect=capture_broadcast), + ): + await on_ack(MockAckEvent()) + message = await send_direct_message( + SendDirectMessageRequest(destination=pub_key, text="Hello") + ) + + ack_count, _ = await MessageRepository.get_ack_and_paths(message.id) + assert ack_count == 1 + assert message.acked == 1 + assert any(event_type == "message_acked" for event_type, _data in broadcasts) + class TestOutgoingChannelBroadcast: """Test that outgoing channel messages are broadcast via broadcast_event for fanout dispatch.""" @@ -223,6 +304,42 @@ class TestOutgoingChannelBroadcast: assert data["sender_name"] == "MyNode" assert data["channel_name"] == "#general" + @pytest.mark.asyncio + async def test_send_channel_same_second_duplicate_bumps_timestamp(self, test_db): + mc = _make_mc(name="MyNode") + chan_key = "ac" * 16 + await ChannelRepository.upsert(key=chan_key, name="#general") + + now = int(time.time()) + original_id = await MessageRepository.create( + msg_type="CHAN", + text="MyNode: hello", + conversation_key=chan_key.upper(), + sender_timestamp=now, + received_at=now, + outgoing=True, + ) + assert original_id is not None + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.routers.messages.time") as mock_time, + ): + mock_time.time.return_value = float(now) + result = await send_channel_message( + SendChannelMessageRequest(channel_key=chan_key, text="hello") + ) + + assert result.id != original_id + assert result.sender_timestamp == now + 1 + assert result.received_at == now + sent_timestamp = int.from_bytes( + mc.commands.send_chan_msg.await_args.kwargs["timestamp"], "little" + ) + assert sent_timestamp == now + 1 + @pytest.mark.asyncio async def test_send_channel_msg_response_includes_current_ack_count(self, test_db): """Send response reflects latest DB ack count at response time.""" @@ -619,8 +736,8 @@ class TestResendChannelMessage: assert "restore failed" in mock_broadcast_error.call_args.args[0].lower() @pytest.mark.asyncio - async def test_resend_new_timestamp_collision_returns_original_id(self, test_db): - """When new-timestamp resend collides (same second), return original ID gracefully.""" + async def test_resend_new_timestamp_collision_bumps_timestamp(self, test_db): + """New-timestamp resend should bump the transmit timestamp instead of reusing the row.""" mc = _make_mc(name="MyNode") chan_key = "dd" * 16 await ChannelRepository.upsert(key=chan_key, name="#collision") @@ -642,13 +759,19 @@ class TestResendChannelMessage: patch("app.routers.messages.broadcast_event"), patch("app.routers.messages.time") as mock_time, ): - # Force the same second so MessageRepository.create returns None (duplicate) mock_time.time.return_value = float(now) result = await resend_channel_message(msg_id, new_timestamp=True) - # Should succeed gracefully, returning the original message ID assert result["status"] == "ok" - assert result["message_id"] == msg_id + assert result["message_id"] != msg_id + resent = await MessageRepository.get_by_id(result["message_id"]) + assert resent is not None + assert resent.sender_timestamp == now + 1 + assert resent.received_at == now + sent_timestamp = int.from_bytes( + mc.commands.send_chan_msg.await_args.kwargs["timestamp"], "little" + ) + assert sent_timestamp == now + 1 @pytest.mark.asyncio async def test_resend_non_outgoing_returns_400(self, test_db):