From 020acbda02a98a494dba3ee6df7b9e8a51016325 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Tue, 17 Mar 2026 18:12:07 -0700 Subject: [PATCH] Do better DM retry to align with stndard firmware retry (but so that we can track the acks). Closes #73. --- AGENTS.md | 2 + app/AGENTS.md | 4 + app/event_handlers.py | 1 + app/services/dm_ack_tracker.py | 12 + app/services/message_send.py | 223 +++++++++++++++++- .../ContactRoutingOverrideModal.tsx | 4 + tests/test_ack_tracking_wiring.py | 7 + tests/test_api.py | 7 + tests/test_event_handlers.py | 36 +++ tests/test_send_messages.py | 187 +++++++++++++++ 10 files changed, 474 insertions(+), 9 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index e0c6e27..9d39694 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -163,6 +163,8 @@ MeshCore firmware can encode path hops as 1-byte, 2-byte, or 3-byte identifiers. **Direct messages**: Expected ACK code is tracked. When ACK event arrives, message marked as acked. +Outgoing DMs send once immediately, then may retry up to 2 more times in the background if still unacked. Retry timing follows the radio's `suggested_timeout` from `PACKET_MSG_SENT`, and the final retry is sent as flood even when a routing override is configured. DM ACK state is terminal on first ACK: sibling retry ACK codes are cleared so one DM should not accumulate multiple delivery confirmations from different retry attempts. + **Channel messages**: Flood messages echo back through repeaters. Repeats are identified by the database UNIQUE constraint on `(type, conversation_key, text, sender_timestamp)` — when an INSERT hits a duplicate, `_handle_duplicate_message()` in `packet_processor.py` adds the new path and, for outgoing messages only, increments the ack count. Incoming repeats add path data but do not change the ack count. There is no timestamp-windowed matching; deduplication is exact-match only. This message-layer echo/path handling is independent of raw-packet storage deduplication. diff --git a/app/AGENTS.md b/app/AGENTS.md index bff1faf..c1305e4 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -118,6 +118,10 @@ app/ - `services/dm_ingest.py` is the one place that should decide fallback-context resolution, DM dedup/reconciliation, and packet-linked vs. content-based storage behavior. - `CONTACT_MSG_RECV` is a fallback path, not a parallel source of truth. If you change DM storage behavior, trace both `event_handlers.py` and `packet_processor.py`. - DM ACK tracking is an in-memory pending/buffered map in `services/dm_ack_tracker.py`, with periodic expiry from `radio_sync.py`. +- Outgoing DMs send once inline, store/broadcast immediately after the first successful `MSG_SENT`, then may retry up to 2 more times in the background if still unacked. +- DM retry timing follows the firmware-provided `suggested_timeout` from `PACKET_MSG_SENT`; do not replace it with a fixed app timeout unless you intentionally want more aggressive duplicate-prone retries. +- The final DM retry is intentionally sent as flood via `reset_path(...)`, even when a routing override exists. +- DM ACK state is terminal on first ACK. Retry attempts may register multiple expected ACK codes for the same message, but sibling pending codes are cleared once one ACK wins so a DM should not accrue multiple delivery confirmations from retries. ### Echo/repeat dedup diff --git a/app/event_handlers.py b/app/event_handlers.py index f367a16..dafa9f5 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -273,6 +273,7 @@ async def on_ack(event: "Event") -> None: message_id = dm_ack_tracker.pop_pending_ack(ack_code) if message_id is not None: + dm_ack_tracker.clear_pending_acks_for_message(message_id) logger.info("ACK received for message %d", message_id) # DM ACKs don't carry path data, so paths is intentionally omitted. # The frontend's mergePendingAck handles the missing field correctly, diff --git a/app/services/dm_ack_tracker.py b/app/services/dm_ack_tracker.py index 9012657..ef5f656 100644 --- a/app/services/dm_ack_tracker.py +++ b/app/services/dm_ack_tracker.py @@ -71,3 +71,15 @@ def pop_pending_ack(ack_code: str) -> int | None: return None message_id, _, _ = pending return message_id + + +def clear_pending_acks_for_message(message_id: int) -> None: + """Remove any still-pending ACK codes for a message once one ACK wins.""" + sibling_codes = [ + code + for code, (pending_message_id, _created_at, _timeout_ms) in _pending_acks.items() + if pending_message_id == message_id + ] + for code in sibling_codes: + del _pending_acks[code] + logger.debug("Cleared sibling pending ACK %s for message %d", code, message_id) diff --git a/app/services/message_send.py b/app/services/message_send.py index a22d079..841efa7 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -11,6 +11,7 @@ from meshcore import EventType from app.models import ResendChannelMessageResponse from app.region_scope import normalize_region_scope from app.repository import AppSettingsRepository, ContactRepository, MessageRepository +from app.services import dm_ack_tracker from app.services.messages import ( build_message_model, create_outgoing_channel_message, @@ -29,10 +30,15 @@ BroadcastFn = Callable[..., Any] TrackAckFn = Callable[[str, int, int], bool] NowFn = Callable[[], float] OutgoingReservationKey = tuple[str, str, str] +RetryTaskScheduler = Callable[[Any], Any] _pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {} _outgoing_timestamp_reservations_lock = asyncio.Lock() +DM_SEND_MAX_ATTEMPTS = 3 +DEFAULT_DM_ACK_TIMEOUT_MS = 10000 +DM_RETRY_WAIT_MARGIN = 1.2 + async def allocate_outgoing_sender_timestamp( *, @@ -248,6 +254,183 @@ async def send_channel_message_with_effective_scope( ) +def _extract_expected_ack_code(result: Any) -> str | None: + if result is None or result.type == EventType.ERROR: + return None + payload = result.payload or {} + expected_ack = payload.get("expected_ack") + if not expected_ack: + return None + return expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack + + +def _get_ack_tracking_timeout_ms(result: Any) -> int: + if result is None or result.type == EventType.ERROR: + return DEFAULT_DM_ACK_TIMEOUT_MS + payload = result.payload or {} + suggested_timeout = payload.get("suggested_timeout") + if suggested_timeout is None: + return DEFAULT_DM_ACK_TIMEOUT_MS + try: + return max(1, int(suggested_timeout)) + except (TypeError, ValueError): + return DEFAULT_DM_ACK_TIMEOUT_MS + + +def _get_direct_message_retry_timeout_ms(result: Any) -> int: + """Return the ACK window to wait before retrying a DM. + + The MeshCore firmware already computes and returns `suggested_timeout` in + `PACKET_MSG_SENT`, derived from estimated packet airtime and route mode. + We use that firmware-supplied window directly so retries do not fire before + the radio's own ACK timeout expires. + + Sources: + - https://github.com/meshcore-dev/MeshCore/blob/main/src/helpers/BaseChatMesh.cpp + - https://github.com/meshcore-dev/MeshCore/blob/main/examples/companion_radio/MyMesh.cpp + - https://github.com/meshcore-dev/MeshCore/blob/main/docs/companion_protocol.md + """ + return _get_ack_tracking_timeout_ms(result) + + +async def _apply_direct_message_ack_tracking( + *, + result: Any, + message_id: int, + track_pending_ack_fn: TrackAckFn, + broadcast_fn: BroadcastFn, +) -> int: + ack_code = _extract_expected_ack_code(result) + if not ack_code: + return 0 + + timeout_ms = _get_ack_tracking_timeout_ms(result) + matched_immediately = track_pending_ack_fn(ack_code, message_id, timeout_ms) is True + logger.debug("Tracking ACK %s for message %d", ack_code, message_id) + if matched_immediately: + dm_ack_tracker.clear_pending_acks_for_message(message_id) + return await increment_ack_and_broadcast( + message_id=message_id, + broadcast_fn=broadcast_fn, + ) + return 0 + + +async def _is_message_acked(*, message_id: int, message_repository) -> bool: + acked_count, _paths = await message_repository.get_ack_and_paths(message_id) + return acked_count > 0 + + +async def _retry_direct_message_until_acked( + *, + contact, + text: str, + message_id: int, + sender_timestamp: int, + radio_manager, + track_pending_ack_fn: TrackAckFn, + broadcast_fn: BroadcastFn, + wait_timeout_ms: int, + sleep_fn, + message_repository, +) -> None: + next_wait_timeout_ms = wait_timeout_ms + for attempt in range(1, DM_SEND_MAX_ATTEMPTS): + await sleep_fn((next_wait_timeout_ms / 1000) * DM_RETRY_WAIT_MARGIN) + if await _is_message_acked(message_id=message_id, message_repository=message_repository): + return + + try: + async with radio_manager.radio_operation("retry_direct_message") as mc: + contact_data = contact.to_radio_dict() + add_result = await mc.commands.add_contact(contact_data) + if add_result.type == EventType.ERROR: + logger.warning( + "Failed to reload contact %s on radio before DM retry: %s", + contact.public_key[:12], + add_result.payload, + ) + cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) + if not cached_contact: + cached_contact = contact_data + + if attempt == DM_SEND_MAX_ATTEMPTS - 1: + reset_result = await mc.commands.reset_path(contact.public_key) + if reset_result is None: + logger.warning( + "No response from radio for reset_path to %s before final DM retry", + contact.public_key[:12], + ) + elif reset_result.type == EventType.ERROR: + logger.warning( + "Failed to reset path before final DM retry to %s: %s", + contact.public_key[:12], + reset_result.payload, + ) + refreshed_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) + if refreshed_contact: + cached_contact = refreshed_contact + + result = await mc.commands.send_msg( + dst=cached_contact, + msg=text, + timestamp=sender_timestamp, + attempt=attempt, + ) + except Exception: + logger.exception( + "Background DM retry attempt %d/%d failed for %s", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + ) + continue + + if result is None: + logger.warning( + "No response from radio after background DM retry attempt %d/%d to %s", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + ) + continue + + if result.type == EventType.ERROR: + logger.warning( + "Background DM retry attempt %d/%d failed for %s: %s", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + result.payload, + ) + continue + + if await _is_message_acked(message_id=message_id, message_repository=message_repository): + return + + ack_code = _extract_expected_ack_code(result) + if not ack_code: + logger.warning( + "Background DM retry attempt %d/%d for %s returned no expected_ack; " + "stopping retries to avoid duplicate sends", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + ) + return + + next_wait_timeout_ms = _get_direct_message_retry_timeout_ms(result) + + ack_count = await _apply_direct_message_ack_tracking( + result=result, + message_id=message_id, + track_pending_ack_fn=track_pending_ack_fn, + broadcast_fn=broadcast_fn, + ) + if ack_count > 0: + return + + async def send_direct_message_to_contact( *, contact, @@ -256,10 +439,17 @@ async def send_direct_message_to_contact( broadcast_fn: BroadcastFn, track_pending_ack_fn: TrackAckFn, now_fn: NowFn, + retry_task_scheduler: RetryTaskScheduler | None = None, + retry_sleep_fn=None, message_repository=MessageRepository, contact_repository=ContactRepository, ) -> Any: """Send a direct message and persist/broadcast the outgoing row.""" + if retry_task_scheduler is None: + retry_task_scheduler = asyncio.create_task + if retry_sleep_fn is None: + retry_sleep_fn = asyncio.sleep + contact_data = contact.to_radio_dict() sent_at: int | None = None sender_timestamp: int | None = None @@ -328,18 +518,33 @@ async def send_direct_message_to_contact( await contact_repository.update_last_contacted(contact.public_key.lower(), sent_at) - expected_ack = result.payload.get("expected_ack") - suggested_timeout: int = result.payload.get("suggested_timeout", 10000) - if expected_ack: - ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack - matched_immediately = track_pending_ack_fn(ack_code, message.id, suggested_timeout) is True - logger.debug("Tracking ACK %s for message %d", ack_code, message.id) - if matched_immediately: - ack_count = await increment_ack_and_broadcast( + ack_code = _extract_expected_ack_code(result) + retry_timeout_ms = _get_direct_message_retry_timeout_ms(result) + ack_count = await _apply_direct_message_ack_tracking( + result=result, + message_id=message.id, + track_pending_ack_fn=track_pending_ack_fn, + broadcast_fn=broadcast_fn, + ) + if ack_count > 0: + message.acked = ack_count + return message + + if DM_SEND_MAX_ATTEMPTS > 1 and ack_code: + retry_task_scheduler( + _retry_direct_message_until_acked( + contact=contact, + text=text, message_id=message.id, + sender_timestamp=sender_timestamp, + radio_manager=radio_manager, + track_pending_ack_fn=track_pending_ack_fn, broadcast_fn=broadcast_fn, + wait_timeout_ms=retry_timeout_ms, + sleep_fn=retry_sleep_fn, + message_repository=message_repository, ) - message.acked = ack_count + ) return message diff --git a/frontend/src/components/ContactRoutingOverrideModal.tsx b/frontend/src/components/ContactRoutingOverrideModal.tsx index dbb02e7..c728311 100644 --- a/frontend/src/components/ContactRoutingOverrideModal.tsx +++ b/frontend/src/components/ContactRoutingOverrideModal.tsx @@ -118,6 +118,10 @@ export function ContactRoutingOverrideModal({ />

Use comma-separated 1, 2, or 3 byte hop IDs for an explicit path.

+

+ Note: direct messages that do not see an ACK retry up to 3 times. The final retry is + sent as flood, even when forced routing is configured. +

diff --git a/tests/test_ack_tracking_wiring.py b/tests/test_ack_tracking_wiring.py index 0042200..53be2fa 100644 --- a/tests/test_ack_tracking_wiring.py +++ b/tests/test_ack_tracking_wiring.py @@ -10,6 +10,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from meshcore import EventType +import app.services.message_send as message_send_service from app.models import SendDirectMessageRequest from app.radio import radio_manager from app.repository import ContactRepository @@ -26,6 +27,12 @@ def _reset_radio_state(): radio_manager._operation_lock = prev_lock +@pytest.fixture(autouse=True) +def _disable_background_dm_retries(monkeypatch): + monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1) + yield + + def _make_mc(name="TestNode"): mc = MagicMock() mc.self_info = {"name": name} diff --git a/tests/test_api.py b/tests/test_api.py index 8de9abb..550f9fd 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -12,6 +12,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from fastapi import HTTPException +import app.services.message_send as message_send_service from app.radio import radio_manager from app.repository import ( ChannelRepository, @@ -39,6 +40,12 @@ def _reset_radio_state(): radio_manager._channel_key_by_slot = prev_key_by_slot +@pytest.fixture(autouse=True) +def _disable_background_dm_retries(monkeypatch): + monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1) + yield + + def _patch_require_connected(mc=None, *, detail="Radio not connected"): if mc is None: return patch( diff --git a/tests/test_event_handlers.py b/tests/test_event_handlers.py index 3353c72..8d938a6 100644 --- a/tests/test_event_handlers.py +++ b/tests/test_event_handlers.py @@ -201,6 +201,42 @@ class TestAckEventHandler: assert "expected" in _pending_acks assert "different" in _buffered_acks + @pytest.mark.asyncio + async def test_first_dm_ack_clears_sibling_retry_codes(self, test_db): + """A DM should stop at ack_count=1 even if retry ACK codes arrive later.""" + from app.event_handlers import on_ack + + msg_id = await MessageRepository.create( + msg_type="PRIV", + text="Hello", + received_at=1700000000, + conversation_key="aa" * 32, + sender_timestamp=1700000000, + outgoing=True, + ) + + track_pending_ack("ack1", message_id=msg_id, timeout_ms=10000) + track_pending_ack("ack2", message_id=msg_id, timeout_ms=10000) + + with patch("app.event_handlers.broadcast_event") as mock_broadcast: + + class FirstAckEvent: + payload = {"code": "ack1"} + + class SecondAckEvent: + payload = {"code": "ack2"} + + await on_ack(FirstAckEvent()) + await on_ack(SecondAckEvent()) + + ack_count, _ = await MessageRepository.get_ack_and_paths(msg_id) + assert ack_count == 1 + assert "ack2" not in _pending_acks + assert "ack2" in _buffered_acks + mock_broadcast.assert_called_once_with( + "message_acked", {"message_id": msg_id, "ack_count": 1} + ) + @pytest.mark.asyncio async def test_ack_empty_code_ignored(self, test_db): """ACK with empty code is ignored.""" diff --git a/tests/test_send_messages.py b/tests/test_send_messages.py index cdc8365..dbb5500 100644 --- a/tests/test_send_messages.py +++ b/tests/test_send_messages.py @@ -8,6 +8,7 @@ import pytest from fastapi import HTTPException from meshcore import EventType +import app.services.message_send as message_send_service from app.models import ( SendChannelMessageRequest, SendDirectMessageRequest, @@ -69,6 +70,7 @@ def _make_mc(name="TestNode"): mc.commands.send_msg = AsyncMock(return_value=_make_radio_result()) mc.commands.send_chan_msg = AsyncMock(return_value=_make_radio_result()) mc.commands.add_contact = AsyncMock(return_value=_make_radio_result()) + mc.commands.reset_path = AsyncMock(return_value=MagicMock(type=EventType.OK, payload={})) mc.commands.set_channel = AsyncMock(return_value=_make_radio_result()) mc.get_contact_by_key_prefix = MagicMock(return_value=None) return mc @@ -94,6 +96,12 @@ async def _insert_contact(public_key, name="Alice", **overrides): await ContactRepository.upsert(data) +@pytest.fixture(autouse=True) +def _disable_background_dm_retries(monkeypatch): + monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1) + yield + + class TestOutgoingDMBroadcast: """Test that outgoing DMs are broadcast via broadcast_event for fanout dispatch.""" @@ -272,6 +280,185 @@ class TestOutgoingDMBroadcast: assert message.acked == 1 assert any(event_type == "message_acked" for event_type, _data in broadcasts) + @pytest.mark.asyncio + async def test_send_dm_without_expected_ack_does_not_schedule_retries(self, test_db): + mc = _make_mc() + pub_key = "fb" * 32 + await _insert_contact(pub_key, "Alice") + + mc.commands.send_msg = AsyncMock(return_value=_make_radio_result({})) + + with ( + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.services.message_send.asyncio.create_task") as mock_create_task, + ): + message = await send_direct_message( + SendDirectMessageRequest(destination=pub_key, text="Hello") + ) + + assert message.acked == 0 + mock_create_task.assert_not_called() + + @pytest.mark.asyncio + async def test_send_dm_background_retries_reset_path_before_final_attempt(self, test_db): + mc = _make_mc() + pub_key = "fc" * 32 + await _insert_contact(pub_key, "Alice") + + mc.commands.send_msg = AsyncMock( + side_effect=[ + _make_radio_result( + {"expected_ack": b"\x00\x00\x00\x01", "suggested_timeout": 8000} + ), + _make_radio_result( + {"expected_ack": b"\x00\x00\x00\x02", "suggested_timeout": 7000} + ), + _make_radio_result( + {"expected_ack": b"\x00\x00\x00\x03", "suggested_timeout": 6000} + ), + ] + ) + + retry_tasks = [] + loop = asyncio.get_running_loop() + slept_for = [] + + def schedule_retry(coro): + task = loop.create_task(coro) + retry_tasks.append(task) + return task + + async def no_wait(seconds): + slept_for.append(seconds) + return None + + with ( + patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3), + patch("app.routers.messages.track_pending_ack", return_value=False), + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry), + patch("app.services.message_send.asyncio.sleep", side_effect=no_wait), + ): + await send_direct_message(SendDirectMessageRequest(destination=pub_key, text="Hello")) + await asyncio.gather(*retry_tasks) + + assert mc.commands.send_msg.await_count == 3 + assert mc.commands.add_contact.await_count == 3 + assert mc.commands.send_msg.await_args_list[1].kwargs["attempt"] == 1 + assert mc.commands.send_msg.await_args_list[2].kwargs["attempt"] == 2 + mc.commands.reset_path.assert_awaited_once_with(pub_key) + assert slept_for == pytest.approx([9.6, 8.4]) + + @pytest.mark.asyncio + async def test_send_dm_background_retry_stops_after_late_ack(self, test_db): + from app.event_handlers import on_ack + + mc = _make_mc() + pub_key = "fd" * 32 + await _insert_contact(pub_key, "Alice") + + mc.commands.send_msg = AsyncMock( + return_value=_make_radio_result( + {"expected_ack": b"\xde\xad\xbe\xef", "suggested_timeout": 8000} + ) + ) + + retry_tasks = [] + sleep_gate = asyncio.Event() + loop = asyncio.get_running_loop() + + def schedule_retry(coro): + task = loop.create_task(coro) + retry_tasks.append(task) + return task + + async def gated_sleep(_seconds): + await sleep_gate.wait() + + class MockAckEvent: + payload = {"code": "deadbeef"} + + with ( + patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3), + patch("app.event_handlers.broadcast_event"), + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry), + patch("app.services.message_send.asyncio.sleep", side_effect=gated_sleep), + ): + message = await send_direct_message( + SendDirectMessageRequest(destination=pub_key, text="Hello") + ) + await on_ack(MockAckEvent()) + sleep_gate.set() + await asyncio.gather(*retry_tasks) + + ack_count, _ = await MessageRepository.get_ack_and_paths(message.id) + assert ack_count == 1 + assert mc.commands.send_msg.await_count == 1 + + @pytest.mark.asyncio + async def test_buffered_retry_ack_clears_older_dm_ack_codes(self, test_db): + from app.event_handlers import on_ack + + mc = _make_mc() + pub_key = "fe" * 32 + await _insert_contact(pub_key, "Alice") + + mc.commands.send_msg = AsyncMock( + side_effect=[ + _make_radio_result( + {"expected_ack": b"\xaa\xaa\xaa\x01", "suggested_timeout": 8000} + ), + _make_radio_result( + {"expected_ack": b"\xbb\xbb\xbb\x02", "suggested_timeout": 8000} + ), + ] + ) + + retry_tasks = [] + sleep_gate = asyncio.Event() + loop = asyncio.get_running_loop() + + def schedule_retry(coro): + task = loop.create_task(coro) + retry_tasks.append(task) + return task + + async def gated_sleep(_seconds): + await sleep_gate.wait() + + class RetryAckEvent: + payload = {"code": "bbbbbb02"} + + class FirstAckEvent: + payload = {"code": "aaaaaa01"} + + with ( + patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3), + patch("app.event_handlers.broadcast_event"), + patch("app.routers.messages.require_connected", return_value=mc), + patch.object(radio_manager, "_meshcore", mc), + patch("app.routers.messages.broadcast_event"), + patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry), + patch("app.services.message_send.asyncio.sleep", side_effect=gated_sleep), + ): + message = await send_direct_message( + SendDirectMessageRequest(destination=pub_key, text="Hello") + ) + await on_ack(RetryAckEvent()) + sleep_gate.set() + await asyncio.gather(*retry_tasks) + await on_ack(FirstAckEvent()) + + ack_count, _ = await MessageRepository.get_ack_and_paths(message.id) + assert ack_count == 1 + class TestOutgoingChannelBroadcast: """Test that outgoing channel messages are broadcast via broadcast_event for fanout dispatch."""