From e473cbf4958950abda5a4366a5b90bb9b293676a Mon Sep 17 00:00:00 2001 From: MarekWo Date: Mon, 16 Mar 2026 11:10:10 +0100 Subject: [PATCH] fix(dm): cancel retry on early ACK/PATH, add 60s grace period for late ACKs - Cancel retry task immediately when _on_ack or PATH handler confirms delivery - Keep pending_acks for 60s after retry exhaustion so late ACKs are matched - Prevents orphaned ACKs (no dm_id) when ACK arrives shortly after exhaustion Co-Authored-By: Claude Opus 4.6 --- app/device_manager.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/app/device_manager.py b/app/device_manager.py index dca9c8b..83e3cc5 100644 --- a/app/device_manager.py +++ b/app/device_manager.py @@ -547,6 +547,18 @@ class DeviceManager: 'route_type': data.get('route_type', ''), }, namespace='/chat') + # Cancel retry task if ACK confirms delivery for a pending DM + if dm_id: + task = self._retry_tasks.get(dm_id) + if task and not task.done(): + task.cancel() + logger.info(f"Cancelled retry task for dm_id={dm_id} (ACK received)") + # Cleanup all pending acks for this DM + stale = [k for k, v in self._pending_acks.items() if v == dm_id] + for k in stale: + self._pending_acks.pop(k, None) + self._retry_tasks.pop(dm_id, None) + except Exception as e: logger.error(f"Error handling ACK: {e}") @@ -655,6 +667,15 @@ class DeviceManager: 'dm_id': dm_id, 'route_type': 'PATH_FLOOD', }, namespace='/chat') + # Cancel retry task — delivery already confirmed + task = self._retry_tasks.get(dm_id) + if task and not task.done(): + task.cancel() + logger.info(f"Cancelled retry task for dm_id={dm_id} (PATH confirmed)") + stale_acks = [k for k, v in self._pending_acks.items() if v == dm_id] + for k in stale_acks: + self._pending_acks.pop(k, None) + self._retry_tasks.pop(dm_id, None) break # Only confirm the most recent pending DM to this contact except Exception as e: @@ -1053,11 +1074,14 @@ class DeviceManager: return logger.warning(f"DM retry exhausted ({max_attempts} attempts) for dm_id={dm_id}") - # Cleanup stale pending acks for this DM - stale = [k for k, v in self._pending_acks.items() if v == dm_id] - for k in stale: - self._pending_acks.pop(k, None) + # Keep pending acks for grace period so late ACKs can still be matched self._retry_tasks.pop(dm_id, None) + await asyncio.sleep(60) + stale = [k for k, v in self._pending_acks.items() if v == dm_id] + if stale: + for k in stale: + self._pending_acks.pop(k, None) + logger.debug(f"Grace period expired, cleaned {len(stale)} pending acks for dm_id={dm_id}") def _confirm_delivery(self, dm_id: int, ack_code: str, ack_event): """Store ACK and notify frontend."""