Do better DM retry to align with stndard firmware retry (but so that we can track the acks). Closes #73.

This commit is contained in:
Jack Kingsman
2026-03-17 18:12:07 -07:00
parent d5b8f7d462
commit 020acbda02
10 changed files with 474 additions and 9 deletions

View File

@@ -163,6 +163,8 @@ MeshCore firmware can encode path hops as 1-byte, 2-byte, or 3-byte identifiers.
**Direct messages**: Expected ACK code is tracked. When ACK event arrives, message marked as acked.
Outgoing DMs send once immediately, then may retry up to 2 more times in the background if still unacked. Retry timing follows the radio's `suggested_timeout` from `PACKET_MSG_SENT`, and the final retry is sent as flood even when a routing override is configured. DM ACK state is terminal on first ACK: sibling retry ACK codes are cleared so one DM should not accumulate multiple delivery confirmations from different retry attempts.
**Channel messages**: Flood messages echo back through repeaters. Repeats are identified by the database UNIQUE constraint on `(type, conversation_key, text, sender_timestamp)` — when an INSERT hits a duplicate, `_handle_duplicate_message()` in `packet_processor.py` adds the new path and, for outgoing messages only, increments the ack count. Incoming repeats add path data but do not change the ack count. There is no timestamp-windowed matching; deduplication is exact-match only.
This message-layer echo/path handling is independent of raw-packet storage deduplication.

View File

@@ -118,6 +118,10 @@ app/
- `services/dm_ingest.py` is the one place that should decide fallback-context resolution, DM dedup/reconciliation, and packet-linked vs. content-based storage behavior.
- `CONTACT_MSG_RECV` is a fallback path, not a parallel source of truth. If you change DM storage behavior, trace both `event_handlers.py` and `packet_processor.py`.
- DM ACK tracking is an in-memory pending/buffered map in `services/dm_ack_tracker.py`, with periodic expiry from `radio_sync.py`.
- Outgoing DMs send once inline, store/broadcast immediately after the first successful `MSG_SENT`, then may retry up to 2 more times in the background if still unacked.
- DM retry timing follows the firmware-provided `suggested_timeout` from `PACKET_MSG_SENT`; do not replace it with a fixed app timeout unless you intentionally want more aggressive duplicate-prone retries.
- The final DM retry is intentionally sent as flood via `reset_path(...)`, even when a routing override exists.
- DM ACK state is terminal on first ACK. Retry attempts may register multiple expected ACK codes for the same message, but sibling pending codes are cleared once one ACK wins so a DM should not accrue multiple delivery confirmations from retries.
### Echo/repeat dedup

View File

@@ -273,6 +273,7 @@ async def on_ack(event: "Event") -> None:
message_id = dm_ack_tracker.pop_pending_ack(ack_code)
if message_id is not None:
dm_ack_tracker.clear_pending_acks_for_message(message_id)
logger.info("ACK received for message %d", message_id)
# DM ACKs don't carry path data, so paths is intentionally omitted.
# The frontend's mergePendingAck handles the missing field correctly,

View File

@@ -71,3 +71,15 @@ def pop_pending_ack(ack_code: str) -> int | None:
return None
message_id, _, _ = pending
return message_id
def clear_pending_acks_for_message(message_id: int) -> None:
"""Remove any still-pending ACK codes for a message once one ACK wins."""
sibling_codes = [
code
for code, (pending_message_id, _created_at, _timeout_ms) in _pending_acks.items()
if pending_message_id == message_id
]
for code in sibling_codes:
del _pending_acks[code]
logger.debug("Cleared sibling pending ACK %s for message %d", code, message_id)

View File

@@ -11,6 +11,7 @@ from meshcore import EventType
from app.models import ResendChannelMessageResponse
from app.region_scope import normalize_region_scope
from app.repository import AppSettingsRepository, ContactRepository, MessageRepository
from app.services import dm_ack_tracker
from app.services.messages import (
build_message_model,
create_outgoing_channel_message,
@@ -29,10 +30,15 @@ BroadcastFn = Callable[..., Any]
TrackAckFn = Callable[[str, int, int], bool]
NowFn = Callable[[], float]
OutgoingReservationKey = tuple[str, str, str]
RetryTaskScheduler = Callable[[Any], Any]
_pending_outgoing_timestamp_reservations: dict[OutgoingReservationKey, set[int]] = {}
_outgoing_timestamp_reservations_lock = asyncio.Lock()
DM_SEND_MAX_ATTEMPTS = 3
DEFAULT_DM_ACK_TIMEOUT_MS = 10000
DM_RETRY_WAIT_MARGIN = 1.2
async def allocate_outgoing_sender_timestamp(
*,
@@ -248,6 +254,183 @@ async def send_channel_message_with_effective_scope(
)
def _extract_expected_ack_code(result: Any) -> str | None:
if result is None or result.type == EventType.ERROR:
return None
payload = result.payload or {}
expected_ack = payload.get("expected_ack")
if not expected_ack:
return None
return expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
def _get_ack_tracking_timeout_ms(result: Any) -> int:
if result is None or result.type == EventType.ERROR:
return DEFAULT_DM_ACK_TIMEOUT_MS
payload = result.payload or {}
suggested_timeout = payload.get("suggested_timeout")
if suggested_timeout is None:
return DEFAULT_DM_ACK_TIMEOUT_MS
try:
return max(1, int(suggested_timeout))
except (TypeError, ValueError):
return DEFAULT_DM_ACK_TIMEOUT_MS
def _get_direct_message_retry_timeout_ms(result: Any) -> int:
"""Return the ACK window to wait before retrying a DM.
The MeshCore firmware already computes and returns `suggested_timeout` in
`PACKET_MSG_SENT`, derived from estimated packet airtime and route mode.
We use that firmware-supplied window directly so retries do not fire before
the radio's own ACK timeout expires.
Sources:
- https://github.com/meshcore-dev/MeshCore/blob/main/src/helpers/BaseChatMesh.cpp
- https://github.com/meshcore-dev/MeshCore/blob/main/examples/companion_radio/MyMesh.cpp
- https://github.com/meshcore-dev/MeshCore/blob/main/docs/companion_protocol.md
"""
return _get_ack_tracking_timeout_ms(result)
async def _apply_direct_message_ack_tracking(
*,
result: Any,
message_id: int,
track_pending_ack_fn: TrackAckFn,
broadcast_fn: BroadcastFn,
) -> int:
ack_code = _extract_expected_ack_code(result)
if not ack_code:
return 0
timeout_ms = _get_ack_tracking_timeout_ms(result)
matched_immediately = track_pending_ack_fn(ack_code, message_id, timeout_ms) is True
logger.debug("Tracking ACK %s for message %d", ack_code, message_id)
if matched_immediately:
dm_ack_tracker.clear_pending_acks_for_message(message_id)
return await increment_ack_and_broadcast(
message_id=message_id,
broadcast_fn=broadcast_fn,
)
return 0
async def _is_message_acked(*, message_id: int, message_repository) -> bool:
acked_count, _paths = await message_repository.get_ack_and_paths(message_id)
return acked_count > 0
async def _retry_direct_message_until_acked(
*,
contact,
text: str,
message_id: int,
sender_timestamp: int,
radio_manager,
track_pending_ack_fn: TrackAckFn,
broadcast_fn: BroadcastFn,
wait_timeout_ms: int,
sleep_fn,
message_repository,
) -> None:
next_wait_timeout_ms = wait_timeout_ms
for attempt in range(1, DM_SEND_MAX_ATTEMPTS):
await sleep_fn((next_wait_timeout_ms / 1000) * DM_RETRY_WAIT_MARGIN)
if await _is_message_acked(message_id=message_id, message_repository=message_repository):
return
try:
async with radio_manager.radio_operation("retry_direct_message") as mc:
contact_data = contact.to_radio_dict()
add_result = await mc.commands.add_contact(contact_data)
if add_result.type == EventType.ERROR:
logger.warning(
"Failed to reload contact %s on radio before DM retry: %s",
contact.public_key[:12],
add_result.payload,
)
cached_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if not cached_contact:
cached_contact = contact_data
if attempt == DM_SEND_MAX_ATTEMPTS - 1:
reset_result = await mc.commands.reset_path(contact.public_key)
if reset_result is None:
logger.warning(
"No response from radio for reset_path to %s before final DM retry",
contact.public_key[:12],
)
elif reset_result.type == EventType.ERROR:
logger.warning(
"Failed to reset path before final DM retry to %s: %s",
contact.public_key[:12],
reset_result.payload,
)
refreshed_contact = mc.get_contact_by_key_prefix(contact.public_key[:12])
if refreshed_contact:
cached_contact = refreshed_contact
result = await mc.commands.send_msg(
dst=cached_contact,
msg=text,
timestamp=sender_timestamp,
attempt=attempt,
)
except Exception:
logger.exception(
"Background DM retry attempt %d/%d failed for %s",
attempt + 1,
DM_SEND_MAX_ATTEMPTS,
contact.public_key[:12],
)
continue
if result is None:
logger.warning(
"No response from radio after background DM retry attempt %d/%d to %s",
attempt + 1,
DM_SEND_MAX_ATTEMPTS,
contact.public_key[:12],
)
continue
if result.type == EventType.ERROR:
logger.warning(
"Background DM retry attempt %d/%d failed for %s: %s",
attempt + 1,
DM_SEND_MAX_ATTEMPTS,
contact.public_key[:12],
result.payload,
)
continue
if await _is_message_acked(message_id=message_id, message_repository=message_repository):
return
ack_code = _extract_expected_ack_code(result)
if not ack_code:
logger.warning(
"Background DM retry attempt %d/%d for %s returned no expected_ack; "
"stopping retries to avoid duplicate sends",
attempt + 1,
DM_SEND_MAX_ATTEMPTS,
contact.public_key[:12],
)
return
next_wait_timeout_ms = _get_direct_message_retry_timeout_ms(result)
ack_count = await _apply_direct_message_ack_tracking(
result=result,
message_id=message_id,
track_pending_ack_fn=track_pending_ack_fn,
broadcast_fn=broadcast_fn,
)
if ack_count > 0:
return
async def send_direct_message_to_contact(
*,
contact,
@@ -256,10 +439,17 @@ async def send_direct_message_to_contact(
broadcast_fn: BroadcastFn,
track_pending_ack_fn: TrackAckFn,
now_fn: NowFn,
retry_task_scheduler: RetryTaskScheduler | None = None,
retry_sleep_fn=None,
message_repository=MessageRepository,
contact_repository=ContactRepository,
) -> Any:
"""Send a direct message and persist/broadcast the outgoing row."""
if retry_task_scheduler is None:
retry_task_scheduler = asyncio.create_task
if retry_sleep_fn is None:
retry_sleep_fn = asyncio.sleep
contact_data = contact.to_radio_dict()
sent_at: int | None = None
sender_timestamp: int | None = None
@@ -328,18 +518,33 @@ async def send_direct_message_to_contact(
await contact_repository.update_last_contacted(contact.public_key.lower(), sent_at)
expected_ack = result.payload.get("expected_ack")
suggested_timeout: int = result.payload.get("suggested_timeout", 10000)
if expected_ack:
ack_code = expected_ack.hex() if isinstance(expected_ack, bytes) else expected_ack
matched_immediately = track_pending_ack_fn(ack_code, message.id, suggested_timeout) is True
logger.debug("Tracking ACK %s for message %d", ack_code, message.id)
if matched_immediately:
ack_count = await increment_ack_and_broadcast(
ack_code = _extract_expected_ack_code(result)
retry_timeout_ms = _get_direct_message_retry_timeout_ms(result)
ack_count = await _apply_direct_message_ack_tracking(
result=result,
message_id=message.id,
track_pending_ack_fn=track_pending_ack_fn,
broadcast_fn=broadcast_fn,
)
if ack_count > 0:
message.acked = ack_count
return message
if DM_SEND_MAX_ATTEMPTS > 1 and ack_code:
retry_task_scheduler(
_retry_direct_message_until_acked(
contact=contact,
text=text,
message_id=message.id,
sender_timestamp=sender_timestamp,
radio_manager=radio_manager,
track_pending_ack_fn=track_pending_ack_fn,
broadcast_fn=broadcast_fn,
wait_timeout_ms=retry_timeout_ms,
sleep_fn=retry_sleep_fn,
message_repository=message_repository,
)
message.acked = ack_count
)
return message

View File

@@ -118,6 +118,10 @@ export function ContactRoutingOverrideModal({
/>
<div className="space-y-1 text-xs text-muted-foreground">
<p>Use comma-separated 1, 2, or 3 byte hop IDs for an explicit path.</p>
<p>
Note: direct messages that do not see an ACK retry up to 3 times. The final retry is
sent as flood, even when forced routing is configured.
</p>
</div>
</div>

View File

@@ -10,6 +10,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from meshcore import EventType
import app.services.message_send as message_send_service
from app.models import SendDirectMessageRequest
from app.radio import radio_manager
from app.repository import ContactRepository
@@ -26,6 +27,12 @@ def _reset_radio_state():
radio_manager._operation_lock = prev_lock
@pytest.fixture(autouse=True)
def _disable_background_dm_retries(monkeypatch):
monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1)
yield
def _make_mc(name="TestNode"):
mc = MagicMock()
mc.self_info = {"name": name}

View File

@@ -12,6 +12,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from fastapi import HTTPException
import app.services.message_send as message_send_service
from app.radio import radio_manager
from app.repository import (
ChannelRepository,
@@ -39,6 +40,12 @@ def _reset_radio_state():
radio_manager._channel_key_by_slot = prev_key_by_slot
@pytest.fixture(autouse=True)
def _disable_background_dm_retries(monkeypatch):
monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1)
yield
def _patch_require_connected(mc=None, *, detail="Radio not connected"):
if mc is None:
return patch(

View File

@@ -201,6 +201,42 @@ class TestAckEventHandler:
assert "expected" in _pending_acks
assert "different" in _buffered_acks
@pytest.mark.asyncio
async def test_first_dm_ack_clears_sibling_retry_codes(self, test_db):
"""A DM should stop at ack_count=1 even if retry ACK codes arrive later."""
from app.event_handlers import on_ack
msg_id = await MessageRepository.create(
msg_type="PRIV",
text="Hello",
received_at=1700000000,
conversation_key="aa" * 32,
sender_timestamp=1700000000,
outgoing=True,
)
track_pending_ack("ack1", message_id=msg_id, timeout_ms=10000)
track_pending_ack("ack2", message_id=msg_id, timeout_ms=10000)
with patch("app.event_handlers.broadcast_event") as mock_broadcast:
class FirstAckEvent:
payload = {"code": "ack1"}
class SecondAckEvent:
payload = {"code": "ack2"}
await on_ack(FirstAckEvent())
await on_ack(SecondAckEvent())
ack_count, _ = await MessageRepository.get_ack_and_paths(msg_id)
assert ack_count == 1
assert "ack2" not in _pending_acks
assert "ack2" in _buffered_acks
mock_broadcast.assert_called_once_with(
"message_acked", {"message_id": msg_id, "ack_count": 1}
)
@pytest.mark.asyncio
async def test_ack_empty_code_ignored(self, test_db):
"""ACK with empty code is ignored."""

View File

@@ -8,6 +8,7 @@ import pytest
from fastapi import HTTPException
from meshcore import EventType
import app.services.message_send as message_send_service
from app.models import (
SendChannelMessageRequest,
SendDirectMessageRequest,
@@ -69,6 +70,7 @@ def _make_mc(name="TestNode"):
mc.commands.send_msg = AsyncMock(return_value=_make_radio_result())
mc.commands.send_chan_msg = AsyncMock(return_value=_make_radio_result())
mc.commands.add_contact = AsyncMock(return_value=_make_radio_result())
mc.commands.reset_path = AsyncMock(return_value=MagicMock(type=EventType.OK, payload={}))
mc.commands.set_channel = AsyncMock(return_value=_make_radio_result())
mc.get_contact_by_key_prefix = MagicMock(return_value=None)
return mc
@@ -94,6 +96,12 @@ async def _insert_contact(public_key, name="Alice", **overrides):
await ContactRepository.upsert(data)
@pytest.fixture(autouse=True)
def _disable_background_dm_retries(monkeypatch):
monkeypatch.setattr(message_send_service, "DM_SEND_MAX_ATTEMPTS", 1)
yield
class TestOutgoingDMBroadcast:
"""Test that outgoing DMs are broadcast via broadcast_event for fanout dispatch."""
@@ -272,6 +280,185 @@ class TestOutgoingDMBroadcast:
assert message.acked == 1
assert any(event_type == "message_acked" for event_type, _data in broadcasts)
@pytest.mark.asyncio
async def test_send_dm_without_expected_ack_does_not_schedule_retries(self, test_db):
mc = _make_mc()
pub_key = "fb" * 32
await _insert_contact(pub_key, "Alice")
mc.commands.send_msg = AsyncMock(return_value=_make_radio_result({}))
with (
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.routers.messages.broadcast_event"),
patch("app.services.message_send.asyncio.create_task") as mock_create_task,
):
message = await send_direct_message(
SendDirectMessageRequest(destination=pub_key, text="Hello")
)
assert message.acked == 0
mock_create_task.assert_not_called()
@pytest.mark.asyncio
async def test_send_dm_background_retries_reset_path_before_final_attempt(self, test_db):
mc = _make_mc()
pub_key = "fc" * 32
await _insert_contact(pub_key, "Alice")
mc.commands.send_msg = AsyncMock(
side_effect=[
_make_radio_result(
{"expected_ack": b"\x00\x00\x00\x01", "suggested_timeout": 8000}
),
_make_radio_result(
{"expected_ack": b"\x00\x00\x00\x02", "suggested_timeout": 7000}
),
_make_radio_result(
{"expected_ack": b"\x00\x00\x00\x03", "suggested_timeout": 6000}
),
]
)
retry_tasks = []
loop = asyncio.get_running_loop()
slept_for = []
def schedule_retry(coro):
task = loop.create_task(coro)
retry_tasks.append(task)
return task
async def no_wait(seconds):
slept_for.append(seconds)
return None
with (
patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3),
patch("app.routers.messages.track_pending_ack", return_value=False),
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.routers.messages.broadcast_event"),
patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry),
patch("app.services.message_send.asyncio.sleep", side_effect=no_wait),
):
await send_direct_message(SendDirectMessageRequest(destination=pub_key, text="Hello"))
await asyncio.gather(*retry_tasks)
assert mc.commands.send_msg.await_count == 3
assert mc.commands.add_contact.await_count == 3
assert mc.commands.send_msg.await_args_list[1].kwargs["attempt"] == 1
assert mc.commands.send_msg.await_args_list[2].kwargs["attempt"] == 2
mc.commands.reset_path.assert_awaited_once_with(pub_key)
assert slept_for == pytest.approx([9.6, 8.4])
@pytest.mark.asyncio
async def test_send_dm_background_retry_stops_after_late_ack(self, test_db):
from app.event_handlers import on_ack
mc = _make_mc()
pub_key = "fd" * 32
await _insert_contact(pub_key, "Alice")
mc.commands.send_msg = AsyncMock(
return_value=_make_radio_result(
{"expected_ack": b"\xde\xad\xbe\xef", "suggested_timeout": 8000}
)
)
retry_tasks = []
sleep_gate = asyncio.Event()
loop = asyncio.get_running_loop()
def schedule_retry(coro):
task = loop.create_task(coro)
retry_tasks.append(task)
return task
async def gated_sleep(_seconds):
await sleep_gate.wait()
class MockAckEvent:
payload = {"code": "deadbeef"}
with (
patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3),
patch("app.event_handlers.broadcast_event"),
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.routers.messages.broadcast_event"),
patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry),
patch("app.services.message_send.asyncio.sleep", side_effect=gated_sleep),
):
message = await send_direct_message(
SendDirectMessageRequest(destination=pub_key, text="Hello")
)
await on_ack(MockAckEvent())
sleep_gate.set()
await asyncio.gather(*retry_tasks)
ack_count, _ = await MessageRepository.get_ack_and_paths(message.id)
assert ack_count == 1
assert mc.commands.send_msg.await_count == 1
@pytest.mark.asyncio
async def test_buffered_retry_ack_clears_older_dm_ack_codes(self, test_db):
from app.event_handlers import on_ack
mc = _make_mc()
pub_key = "fe" * 32
await _insert_contact(pub_key, "Alice")
mc.commands.send_msg = AsyncMock(
side_effect=[
_make_radio_result(
{"expected_ack": b"\xaa\xaa\xaa\x01", "suggested_timeout": 8000}
),
_make_radio_result(
{"expected_ack": b"\xbb\xbb\xbb\x02", "suggested_timeout": 8000}
),
]
)
retry_tasks = []
sleep_gate = asyncio.Event()
loop = asyncio.get_running_loop()
def schedule_retry(coro):
task = loop.create_task(coro)
retry_tasks.append(task)
return task
async def gated_sleep(_seconds):
await sleep_gate.wait()
class RetryAckEvent:
payload = {"code": "bbbbbb02"}
class FirstAckEvent:
payload = {"code": "aaaaaa01"}
with (
patch.object(message_send_service, "DM_SEND_MAX_ATTEMPTS", 3),
patch("app.event_handlers.broadcast_event"),
patch("app.routers.messages.require_connected", return_value=mc),
patch.object(radio_manager, "_meshcore", mc),
patch("app.routers.messages.broadcast_event"),
patch("app.services.message_send.asyncio.create_task", side_effect=schedule_retry),
patch("app.services.message_send.asyncio.sleep", side_effect=gated_sleep),
):
message = await send_direct_message(
SendDirectMessageRequest(destination=pub_key, text="Hello")
)
await on_ack(RetryAckEvent())
sleep_gate.set()
await asyncio.gather(*retry_tasks)
await on_ack(FirstAckEvent())
ack_count, _ = await MessageRepository.get_ack_and_paths(message.id)
assert ack_count == 1
class TestOutgoingChannelBroadcast:
"""Test that outgoing channel messages are broadcast via broadcast_event for fanout dispatch."""