diff --git a/app/device_manager.py b/app/device_manager.py index e3065d5..df6afef 100644 --- a/app/device_manager.py +++ b/app/device_manager.py @@ -99,6 +99,7 @@ class DeviceManager: self._echo_lock = threading.Lock() self._pending_acks = {} # {ack_code_hex: dm_id} — maps retry acks to DM self._retry_tasks = {} # {dm_id: asyncio.Task} — active retry coroutines + self._retry_context = {} # {dm_id: {attempt, max_attempts, path}} — for _on_ack @property def is_connected(self) -> bool: @@ -610,8 +611,21 @@ class DeviceManager: 'route_type': data.get('route_type', ''), }, namespace='/chat') - # Cancel retry task if ACK confirms delivery for a pending DM + # Store delivery info and cancel retry task if dm_id: + # Store delivery info from retry context (before cancel races) + ctx = self._retry_context.pop(dm_id, None) + if ctx: + self.db.update_dm_delivery_info( + dm_id, ctx['attempt'], ctx['max_attempts'], ctx['path']) + if self.socketio: + self.socketio.emit('dm_delivered_info', { + 'dm_id': dm_id, + 'attempt': ctx['attempt'], + 'max_attempts': ctx['max_attempts'], + 'path': ctx['path'], + }, namespace='/chat') + task = self._retry_tasks.get(dm_id) if task and not task.done(): task.cancel() @@ -771,6 +785,18 @@ class DeviceManager: 'dm_id': dm_id, 'route_type': 'PATH_FLOOD', }, namespace='/chat') + # Store delivery info from retry context + ctx = self._retry_context.pop(dm_id, None) + if ctx: + self.db.update_dm_delivery_info( + dm_id, ctx['attempt'], ctx['max_attempts'], ctx['path']) + if self.socketio: + self.socketio.emit('dm_delivered_info', { + 'dm_id': dm_id, + 'attempt': ctx['attempt'], + 'max_attempts': ctx['max_attempts'], + 'path': ctx['path'], + }, namespace='/chat') # Cancel retry task — delivery already confirmed task = self._retry_tasks.get(dm_id) if task and not task.done(): @@ -1371,27 +1397,24 @@ class DeviceManager: f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, " f"max_attempts={max_attempts}, wait={wait_s:.0f}s") - # ── Local helper: emit status, send, store delivery info on success ── + # ── Local helper: update context, emit status, send ── + # Delivery info is stored by _on_ack() using _retry_context (avoids cancel race) async def _retry(attempt_num, min_wait_s): display = attempt_num + 1 # attempt 0 = initial send = display 1 + self._retry_context[dm_id] = { + 'attempt': display, 'max_attempts': max_attempts, 'path': path_desc, + } self._emit_retry_status(dm_id, initial_ack, display, max_attempts) - delivered = await self._dm_retry_send_and_wait( + return await self._dm_retry_send_and_wait( contact, text, timestamp, attempt_num, dm_id, suggested_timeout, min_wait_s ) - if delivered: - self.db.update_dm_delivery_info( - dm_id, display, max_attempts, path_desc) - if self.socketio: - self.socketio.emit('dm_delivered_info', { - 'dm_id': dm_id, - 'attempt': display, - 'max_attempts': max_attempts, - 'path': path_desc, - }, namespace='/chat') - return delivered - # ── Emit status for initial send (attempt 1) and wait for ACK ── + # ── Wait for initial ACK (attempt 1) ── + # Delivery info stored by _on_ack() via _retry_context (avoids cancel race) + self._retry_context[dm_id] = { + 'attempt': 1, 'max_attempts': max_attempts, 'path': path_desc, + } self._emit_retry_status(dm_id, initial_ack, 1, max_attempts) if initial_ack: logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...") @@ -1402,12 +1425,6 @@ class DeviceManager: ) if ack_event: self._confirm_delivery(dm_id, initial_ack, ack_event) - self.db.update_dm_delivery_info(dm_id, 1, max_attempts, path_desc) - if self.socketio: - self.socketio.emit('dm_delivered_info', { - 'dm_id': dm_id, 'attempt': 1, - 'max_attempts': max_attempts, 'path': path_desc, - }, namespace='/chat') return logger.debug(f"DM retry: initial ACK not received (timeout)") @@ -1539,6 +1556,7 @@ class DeviceManager: logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, scenario={scenario}) " f"for dm_id={dm_id}") self._retry_tasks.pop(dm_id, None) + self._retry_context.pop(dm_id, None) await asyncio.sleep(cfg['grace_period']) stale = [k for k, v in self._pending_acks.items() if v == dm_id] if stale: