diff --git a/AGENTS.md b/AGENTS.md index e0c6e27..9d39694 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -163,6 +163,8 @@ MeshCore firmware can encode path hops as 1-byte, 2-byte, or 3-byte identifiers. **Direct messages**: Expected ACK code is tracked. When ACK event arrives, message marked as acked. +Outgoing DMs send once immediately, then may retry up to 2 more times in the background if still unacked. Retry timing follows the radio's `suggested_timeout` from `PACKET_MSG_SENT`, and the final retry is sent as flood even when a routing override is configured. DM ACK state is terminal on first ACK: sibling retry ACK codes are cleared so one DM should not accumulate multiple delivery confirmations from different retry attempts. + **Channel messages**: Flood messages echo back through repeaters. Repeats are identified by the database UNIQUE constraint on `(type, conversation_key, text, sender_timestamp)` — when an INSERT hits a duplicate, `_handle_duplicate_message()` in `packet_processor.py` adds the new path and, for outgoing messages only, increments the ack count. Incoming repeats add path data but do not change the ack count. There is no timestamp-windowed matching; deduplication is exact-match only. This message-layer echo/path handling is independent of raw-packet storage deduplication. diff --git a/app/AGENTS.md b/app/AGENTS.md index bff1faf..c1305e4 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -118,6 +118,10 @@ app/ - `services/dm_ingest.py` is the one place that should decide fallback-context resolution, DM dedup/reconciliation, and packet-linked vs. content-based storage behavior. - `CONTACT_MSG_RECV` is a fallback path, not a parallel source of truth. If you change DM storage behavior, trace both `event_handlers.py` and `packet_processor.py`. - DM ACK tracking is an in-memory pending/buffered map in `services/dm_ack_tracker.py`, with periodic expiry from `radio_sync.py`. +- Outgoing DMs send once inline, store/broadcast immediately after the first successful `MSG_SENT`, then may retry up to 2 more times in the background if still unacked. +- DM retry timing follows the firmware-provided `suggested_timeout` from `PACKET_MSG_SENT`; do not replace it with a fixed app timeout unless you intentionally want more aggressive duplicate-prone retries. +- The final DM retry is intentionally sent as flood via `reset_path(...)`, even when a routing override exists. +- DM ACK state is terminal on first ACK. Retry attempts may register multiple expected ACK codes for the same message, but sibling pending codes are cleared once one ACK wins so a DM should not accrue multiple delivery confirmations from retries. ### Echo/repeat dedup diff --git a/app/event_handlers.py b/app/event_handlers.py index f367a16..dafa9f5 100644 --- a/app/event_handlers.py +++ b/app/event_handlers.py @@ -273,6 +273,7 @@ async def on_ack(event: "Event") -> None: message_id = dm_ack_tracker.pop_pending_ack(ack_code) if message_id is not None: + dm_ack_tracker.clear_pending_acks_for_message(message_id) logger.info("ACK received for message %d", message_id) # DM ACKs don't carry path data, so paths is intentionally omitted. # The frontend's mergePendingAck handles the missing field correctly, diff --git a/app/services/dm_ack_tracker.py b/app/services/dm_ack_tracker.py index 9012657..ef5f656 100644 --- a/app/services/dm_ack_tracker.py +++ b/app/services/dm_ack_tracker.py @@ -71,3 +71,15 @@ def pop_pending_ack(ack_code: str) -> int | None: return None message_id, _, _ = pending return message_id + + +def clear_pending_acks_for_message(message_id: int) -> None: + """Remove any still-pending ACK codes for a message once one ACK wins.""" + sibling_codes = [ + code + for code, (pending_message_id, _created_at, _timeout_ms) in _pending_acks.items() + if pending_message_id == message_id + ] + for code in sibling_codes: + del _pending_acks[code] + logger.debug("Cleared sibling pending ACK %s for message %d", code, message_id) diff --git a/app/services/message_send.py b/app/services/message_send.py index a22d079..841efa7 100644 --- a/app/services/message_send.py +++ b/app/services/message_send.py @@ -11,6 +11,7 @@ from meshcore import EventType from app.models import ResendChannelMessageResponse from app.region_scope import normalize_region_scope from app.repository import AppSettingsRepository, ContactRepository, MessageRepository +from app.services import dm_ack_tracker from app.services.messages import ( build_message_model, create_outgoing_channel_message, @@ -29,10 +30,15 @@ BroadcastFn = Callable[..., Any] TrackAckFn = Callable[[str, int, int], bool] NowFn = Callable[[], float] OutgoingReservationKey = tuple[str, str, str] +RetryTaskScheduler = Callable[[Any], Any] _pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {} _outgoing_timestamp_reservations_lock = asyncio.Lock() +DM_SEND_MAX_ATTEMPTS = 3 +DEFAULT_DM_ACK_TIMEOUT_MS = 10000 +DM_RETRY_WAIT_MARGIN = 1.2 + async def allocate_outgoing_sender_timestamp( *, @@ -248,6 +254,183 @@ async def send_channel_message_with_effective_scope( ) +def _extract_expected_ack_code(result: Any) -> str | None: + if result is None or result.type == EventType.ERROR: + return None + payload = result.payload or {} + expected_ack = payload.get("expected_ack") + if not expected_ack: + return None + return expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack + + +def _get_ack_tracking_timeout_ms(result: Any) -> int: + if result is None or result.type == EventType.ERROR: + return DEFAULT_DM_ACK_TIMEOUT_MS + payload = result.payload or {} + suggested_timeout = payload.get("suggested_timeout") + if suggested_timeout is None: + return DEFAULT_DM_ACK_TIMEOUT_MS + try: + return max(1, int(suggested_timeout)) + except (TypeError, ValueError): + return DEFAULT_DM_ACK_TIMEOUT_MS + + +def _get_direct_message_retry_timeout_ms(result: Any) -> int: + """Return the ACK window to wait before retrying a DM. + + The MeshCore firmware already computes and returns `suggested_timeout` in + `PACKET_MSG_SENT`, derived from estimated packet airtime and route mode. + We use that firmware-supplied window directly so retries do not fire before + the radio's own ACK timeout expires. + + Sources: + - https://github.com/meshcore-dev/MeshCore/blob/main/src/helpers/BaseChatMesh.cpp + - https://github.com/meshcore-dev/MeshCore/blob/main/examples/companion_radio/MyMesh.cpp + - https://github.com/meshcore-dev/MeshCore/blob/main/docs/companion_protocol.md + """ + return _get_ack_tracking_timeout_ms(result) + + +async def _apply_direct_message_ack_tracking( + *, + result: Any, + message_id: int, + track_pending_ack_fn: TrackAckFn, + broadcast_fn: BroadcastFn, +) -> int: + ack_code = _extract_expected_ack_code(result) + if not ack_code: + return 0 + + timeout_ms = _get_ack_tracking_timeout_ms(result) + matched_immediately = track_pending_ack_fn(ack_code, message_id, timeout_ms) is True + logger.debug("Tracking ACK %s for message %d", ack_code, message_id) + if matched_immediately: + dm_ack_tracker.clear_pending_acks_for_message(message_id) + return await increment_ack_and_broadcast( + message_id=message_id, + broadcast_fn=broadcast_fn, + ) + return 0 + + +async def _is_message_acked(*, message_id: int, message_repository) -> bool: + acked_count, _paths = await message_repository.get_ack_and_paths(message_id) + return acked_count > 0 + + +async def _retry_direct_message_until_acked( + *, + contact, + text: str, + message_id: int, + sender_timestamp: int, + radio_manager, + track_pending_ack_fn: TrackAckFn, + broadcast_fn: BroadcastFn, + wait_timeout_ms: int, + sleep_fn, + message_repository, +) -> None: + next_wait_timeout_ms = wait_timeout_ms + for attempt in range(1, DM_SEND_MAX_ATTEMPTS): + await sleep_fn((next_wait_timeout_ms / 1000) * DM_RETRY_WAIT_MARGIN) + if await _is_message_acked(message_id=message_id, message_repository=message_repository): + return + + try: + async with radio_manager.radio_operation("retry_direct_message") as mc: + contact_data = contact.to_radio_dict() + add_result = await mc.commands.add_contact(contact_data) + if add_result.type == EventType.ERROR: + logger.warning( + "Failed to reload contact %s on radio before DM retry: %s", + contact.public_key[:12], + add_result.payload, + ) + cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) + if not cached_contact: + cached_contact = contact_data + + if attempt == DM_SEND_MAX_ATTEMPTS - 1: + reset_result = await mc.commands.reset_path(contact.public_key) + if reset_result is None: + logger.warning( + "No response from radio for reset_path to %s before final DM retry", + contact.public_key[:12], + ) + elif reset_result.type == EventType.ERROR: + logger.warning( + "Failed to reset path before final DM retry to %s: %s", + contact.public_key[:12], + reset_result.payload, + ) + refreshed_contact = mc.get_contact_by_key_prefix(contact.public_key[:12]) + if refreshed_contact: + cached_contact = refreshed_contact + + result = await mc.commands.send_msg( + dst=cached_contact, + msg=text, + timestamp=sender_timestamp, + attempt=attempt, + ) + except Exception: + logger.exception( + "Background DM retry attempt %d/%d failed for %s", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + ) + continue + + if result is None: + logger.warning( + "No response from radio after background DM retry attempt %d/%d to %s", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + ) + continue + + if result.type == EventType.ERROR: + logger.warning( + "Background DM retry attempt %d/%d failed for %s: %s", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + result.payload, + ) + continue + + if await _is_message_acked(message_id=message_id, message_repository=message_repository): + return + + ack_code = _extract_expected_ack_code(result) + if not ack_code: + logger.warning( + "Background DM retry attempt %d/%d for %s returned no expected_ack; " + "stopping retries to avoid duplicate sends", + attempt + 1, + DM_SEND_MAX_ATTEMPTS, + contact.public_key[:12], + ) + return + + next_wait_timeout_ms = _get_direct_message_retry_timeout_ms(result) + + ack_count = await _apply_direct_message_ack_tracking( + result=result, + message_id=message_id, + track_pending_ack_fn=track_pending_ack_fn, + broadcast_fn=broadcast_fn, + ) + if ack_count > 0: + return + + async def send_direct_message_to_contact( *, contact, @@ -256,10 +439,17 @@ async def send_direct_message_to_contact( broadcast_fn: BroadcastFn, track_pending_ack_fn: TrackAckFn, now_fn: NowFn, + retry_task_scheduler: RetryTaskScheduler | None = None, + retry_sleep_fn=None, message_repository=MessageRepository, contact_repository=ContactRepository, ) -> Any: """Send a direct message and persist/broadcast the outgoing row.""" + if retry_task_scheduler is None: + retry_task_scheduler = asyncio.create_task + if retry_sleep_fn is None: + retry_sleep_fn = asyncio.sleep + contact_data = contact.to_radio_dict() sent_at: int | None = None sender_timestamp: int | None = None @@ -328,18 +518,33 @@ async def send_direct_message_to_contact( await contact_repository.update_last_contacted(contact.public_key.lower(), sent_at) - expected_ack = result.payload.get("expected_ack") - suggested_timeout: int = result.payload.get("suggested_timeout", 10000) - if expected_ack: - ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack - matched_immediately = track_pending_ack_fn(ack_code, message.id, suggested_timeout) is True - logger.debug("Tracking ACK %s for message %d", ack_code, message.id) - if matched_immediately: - ack_count = await increment_ack_and_broadcast( + ack_code = _extract_expected_ack_code(result) + retry_timeout_ms = _get_direct_message_retry_timeout_ms(result) + ack_count = await _apply_direct_message_ack_tracking( + result=result, + message_id=message.id, + track_pending_ack_fn=track_pending_ack_fn, + broadcast_fn=broadcast_fn, + ) + if ack_count > 0: + message.acked = ack_count + return message + + if DM_SEND_MAX_ATTEMPTS > 1 and ack_code: + retry_task_scheduler( + _retry_direct_message_until_acked( + contact=contact, + text=text, message_id=message.id, + sender_timestamp=sender_timestamp, + radio_manager=radio_manager, + track_pending_ack_fn=track_pending_ack_fn, broadcast_fn=broadcast_fn, + wait_timeout_ms=retry_timeout_ms, + sleep_fn=retry_sleep_fn, + message_repository=message_repository, ) - message.acked = ack_count + ) return message diff --git a/frontend/src/components/ContactRoutingOverrideModal.tsx b/frontend/src/components/ContactRoutingOverrideModal.tsx index dbb02e7..c728311 100644 --- a/frontend/src/components/ContactRoutingOverrideModal.tsx +++ b/frontend/src/components/ContactRoutingOverrideModal.tsx @@ -118,6 +118,10 @@ export function ContactRoutingOverrideModal({ />
Use comma-separated 1, 2, or 3 byte hop IDs for an explicit path.
++ Note: direct messages that do not see an ACK retry up to 3 times. The final retry is + sent as flood, even when forced routing is configured. +