fix(dm): cancel retry on early ACK/PATH, add 60s grace period for late ACKs

- Cancel retry task immediately when _on_ack or PATH handler confirms delivery
- Keep pending_acks for 60s after retry exhaustion so late ACKs are matched
- Prevents orphaned ACKs (no dm_id) when ACK arrives shortly after exhaustion

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-16 11:10:10 +01:00
parent 3a26da18fd
commit e473cbf495

View File

@@ -547,6 +547,18 @@ class DeviceManager:
'route_type': data.get('route_type', ''),
}, namespace='/chat')
# Cancel retry task if ACK confirms delivery for a pending DM
if dm_id:
task = self._retry_tasks.get(dm_id)
if task and not task.done():
task.cancel()
logger.info(f"Cancelled retry task for dm_id={dm_id} (ACK received)")
# Cleanup all pending acks for this DM
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale:
self._pending_acks.pop(k, None)
self._retry_tasks.pop(dm_id, None)
except Exception as e:
logger.error(f"Error handling ACK: {e}")
@@ -655,6 +667,15 @@ class DeviceManager:
'dm_id': dm_id,
'route_type': 'PATH_FLOOD',
}, namespace='/chat')
# Cancel retry task — delivery already confirmed
task = self._retry_tasks.get(dm_id)
if task and not task.done():
task.cancel()
logger.info(f"Cancelled retry task for dm_id={dm_id} (PATH confirmed)")
stale_acks = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale_acks:
self._pending_acks.pop(k, None)
self._retry_tasks.pop(dm_id, None)
break # Only confirm the most recent pending DM to this contact
except Exception as e:
@@ -1053,11 +1074,14 @@ class DeviceManager:
return
logger.warning(f"DM retry exhausted ({max_attempts} attempts) for dm_id={dm_id}")
# Cleanup stale pending acks for this DM
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale:
self._pending_acks.pop(k, None)
# Keep pending acks for grace period so late ACKs can still be matched
self._retry_tasks.pop(dm_id, None)
await asyncio.sleep(60)
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
if stale:
for k in stale:
self._pending_acks.pop(k, None)
logger.debug(f"Grace period expired, cleaned {len(stale)} pending acks for dm_id={dm_id}")
def _confirm_delivery(self, dm_id: int, ack_code: str, ack_event):
"""Store ACK and notify frontend."""