fix(dm): resolve race condition — delivery info stored before task cancel

The _on_ack handler cancels the retry task before _retry() can store
delivery info (attempt count, path). Fix by maintaining a _retry_context
dict updated before each send. _on_ack reads context and stores delivery
info + emits dm_delivered_info BEFORE cancelling the task. Same fix
applied to PATH_UPDATE backup delivery handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-28 14:11:05 +01:00
parent 885a967348
commit 7a44d3b95d

View File

@@ -99,6 +99,7 @@ class DeviceManager:
self._echo_lock = threading.Lock()
self._pending_acks = {} # {ack_code_hex: dm_id} — maps retry acks to DM
self._retry_tasks = {} # {dm_id: asyncio.Task} — active retry coroutines
self._retry_context = {} # {dm_id: {attempt, max_attempts, path}} — for _on_ack
@property
def is_connected(self) -> bool:
@@ -610,8 +611,21 @@ class DeviceManager:
'route_type': data.get('route_type', ''),
}, namespace='/chat')
# Cancel retry task if ACK confirms delivery for a pending DM
# Store delivery info and cancel retry task
if dm_id:
# Store delivery info from retry context (before cancel races)
ctx = self._retry_context.pop(dm_id, None)
if ctx:
self.db.update_dm_delivery_info(
dm_id, ctx['attempt'], ctx['max_attempts'], ctx['path'])
if self.socketio:
self.socketio.emit('dm_delivered_info', {
'dm_id': dm_id,
'attempt': ctx['attempt'],
'max_attempts': ctx['max_attempts'],
'path': ctx['path'],
}, namespace='/chat')
task = self._retry_tasks.get(dm_id)
if task and not task.done():
task.cancel()
@@ -771,6 +785,18 @@ class DeviceManager:
'dm_id': dm_id,
'route_type': 'PATH_FLOOD',
}, namespace='/chat')
# Store delivery info from retry context
ctx = self._retry_context.pop(dm_id, None)
if ctx:
self.db.update_dm_delivery_info(
dm_id, ctx['attempt'], ctx['max_attempts'], ctx['path'])
if self.socketio:
self.socketio.emit('dm_delivered_info', {
'dm_id': dm_id,
'attempt': ctx['attempt'],
'max_attempts': ctx['max_attempts'],
'path': ctx['path'],
}, namespace='/chat')
# Cancel retry task — delivery already confirmed
task = self._retry_tasks.get(dm_id)
if task and not task.done():
@@ -1371,27 +1397,24 @@ class DeviceManager:
f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
f"max_attempts={max_attempts}, wait={wait_s:.0f}s")
# ── Local helper: emit status, send, store delivery info on success ──
# ── Local helper: update context, emit status, send ──
# Delivery info is stored by _on_ack() using _retry_context (avoids cancel race)
async def _retry(attempt_num, min_wait_s):
display = attempt_num + 1 # attempt 0 = initial send = display 1
self._retry_context[dm_id] = {
'attempt': display, 'max_attempts': max_attempts, 'path': path_desc,
}
self._emit_retry_status(dm_id, initial_ack, display, max_attempts)
delivered = await self._dm_retry_send_and_wait(
return await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt_num, dm_id,
suggested_timeout, min_wait_s
)
if delivered:
self.db.update_dm_delivery_info(
dm_id, display, max_attempts, path_desc)
if self.socketio:
self.socketio.emit('dm_delivered_info', {
'dm_id': dm_id,
'attempt': display,
'max_attempts': max_attempts,
'path': path_desc,
}, namespace='/chat')
return delivered
# ── Emit status for initial send (attempt 1) and wait for ACK ──
# ── Wait for initial ACK (attempt 1) ──
# Delivery info stored by _on_ack() via _retry_context (avoids cancel race)
self._retry_context[dm_id] = {
'attempt': 1, 'max_attempts': max_attempts, 'path': path_desc,
}
self._emit_retry_status(dm_id, initial_ack, 1, max_attempts)
if initial_ack:
logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...")
@@ -1402,12 +1425,6 @@ class DeviceManager:
)
if ack_event:
self._confirm_delivery(dm_id, initial_ack, ack_event)
self.db.update_dm_delivery_info(dm_id, 1, max_attempts, path_desc)
if self.socketio:
self.socketio.emit('dm_delivered_info', {
'dm_id': dm_id, 'attempt': 1,
'max_attempts': max_attempts, 'path': path_desc,
}, namespace='/chat')
return
logger.debug(f"DM retry: initial ACK not received (timeout)")
@@ -1539,6 +1556,7 @@ class DeviceManager:
logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, scenario={scenario}) "
f"for dm_id={dm_id}")
self._retry_tasks.pop(dm_id, None)
self._retry_context.pop(dm_id, None)
await asyncio.sleep(cfg['grace_period'])
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
if stale: