From 7dbbba57b921cc6880c848e0f28232b2de4fbd57 Mon Sep 17 00:00:00 2001 From: MarekWo Date: Sat, 28 Mar 2026 12:25:35 +0100 Subject: [PATCH] feat(dm): add real-time retry status and persistent delivery info MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Show retry progress in DM message bubble via WebSocket: - "attempt X/Y" counter updates in real-time during retries - Failed icon (✗) when all retries exhausted - Delivery info persisted in DB (attempt number, path used) Backend: emit dm_retry_status/dm_retry_failed socket events, store delivery_attempt/delivery_path in direct_messages table. Frontend: socket listeners update status icon and counter, delivered tooltip shows attempt info and path. Co-Authored-By: Claude Opus 4.6 --- app/database.py | 28 +++++++ app/device_manager.py | 167 ++++++++++++++++++++++++--------------- app/routes/api.py | 11 +++ app/static/css/style.css | 6 ++ app/static/js/dm.js | 45 ++++++++++- 5 files changed, 190 insertions(+), 67 deletions(-) diff --git a/app/database.py b/app/database.py index fe535ef..33babd9 100644 --- a/app/database.py +++ b/app/database.py @@ -44,6 +44,18 @@ class Database: conn.execute("ALTER TABLE contacts ADD COLUMN no_auto_flood INTEGER DEFAULT 0") logger.info("Migration: added contacts.no_auto_flood column") + # Add delivery tracking columns to direct_messages + dm_columns = {r[1] for r in conn.execute("PRAGMA table_info(direct_messages)").fetchall()} + for col, typedef in [ + ('delivery_status', 'TEXT'), + ('delivery_attempt', 'INTEGER'), + ('delivery_max_attempts', 'INTEGER'), + ('delivery_path', 'TEXT'), + ]: + if col not in dm_columns: + conn.execute(f"ALTER TABLE direct_messages ADD COLUMN {col} {typedef}") + logger.info(f"Migration: added direct_messages.{col} column") + @contextmanager def _connect(self): """Yield a connection with auto-commit/rollback.""" @@ -660,6 +672,22 @@ class Database: ).fetchone() return dict(row) if row else None + def update_dm_delivery_info(self, dm_id: int, attempt: int, + max_attempts: int, path: str): + """Store successful delivery details (attempt number, path used).""" + with self._connect() as conn: + conn.execute( + "UPDATE direct_messages SET delivery_attempt=?, " + "delivery_max_attempts=?, delivery_path=? WHERE id=?", + (attempt, max_attempts, path, dm_id)) + + def update_dm_delivery_status(self, dm_id: int, status: str): + """Mark message delivery as failed.""" + with self._connect() as conn: + conn.execute( + "UPDATE direct_messages SET delivery_status=? WHERE id=?", + (status, dm_id)) + def relink_orphaned_dms(self, public_key: str, name: str = '') -> int: """Re-link DMs with NULL contact_pubkey back to this contact. diff --git a/app/device_manager.py b/app/device_manager.py index 8a2aa19..b974a25 100644 --- a/app/device_manager.py +++ b/app/device_manager.py @@ -1233,6 +1233,25 @@ class DeviceManager: return False + def _emit_retry_status(self, dm_id: int, expected_ack: str, + attempt: int, max_attempts: int): + """Notify frontend about retry progress.""" + if self.socketio: + self.socketio.emit('dm_retry_status', { + 'dm_id': dm_id, + 'expected_ack': expected_ack, + 'attempt': attempt, + 'max_attempts': max_attempts, + }, namespace='/chat') + + def _emit_retry_failed(self, dm_id: int, expected_ack: str): + """Notify frontend that all retry attempts were exhausted.""" + if self.socketio: + self.socketio.emit('dm_retry_failed', { + 'dm_id': dm_id, + 'expected_ack': expected_ack, + }, namespace='/chat') + @staticmethod def _paths_match(contact_out_path: str, contact_out_path_len: int, configured_path: dict) -> bool: @@ -1299,27 +1318,8 @@ class DeviceManager: else: scenario = "S1_FLOOD" - logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, " - f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, " - f"wait={wait_s:.0f}s") - - # ── Wait for ACK on initial send ── - if initial_ack: - logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...") - ack_event = await self.mc.dispatcher.wait_for_event( - EventType.ACK, - attribute_filters={"code": initial_ack}, - timeout=wait_s - ) - if ack_event: - self._confirm_delivery(dm_id, initial_ack, ack_event) - return - logger.debug(f"DM retry: initial ACK not received (timeout)") - - attempt = 0 # Global attempt counter (0 = initial send already done) - + # ── Pre-compute path split and max_attempts ── def _split_primary_and_others(paths): - """Separate primary (starred) path from the rest.""" primary = None others = [] for p in paths: @@ -1329,16 +1329,76 @@ class DeviceManager: others.append(p) return primary, others + primary_path = None + other_paths = [] + rotation_order = [] + if has_configured_paths: + primary_path, other_paths = _split_primary_and_others(configured_paths) + rotation_order = ([primary_path] if primary_path else []) + other_paths + + retries_per_path = max(1, cfg['direct_max_retries']) + + if scenario == "S1_FLOOD": + max_attempts = 1 + cfg['flood_max_retries'] + elif scenario == "S2_DIRECT_FLOOD": + max_attempts = 1 + cfg['direct_max_retries'] + if not no_auto_flood: + max_attempts += cfg['direct_flood_retries'] + elif scenario == "S3_FLOOD_SD": + max_attempts = (1 + cfg['flood_max_retries'] + + len(rotation_order) * retries_per_path) + else: # S4 + deduped = sum(1 for p in rotation_order + if self._paths_match(original_out_path, original_out_path_len, p)) + effective_sd = len(rotation_order) - deduped + max_attempts = 1 + cfg['direct_max_retries'] + effective_sd * retries_per_path + if not no_auto_flood: + max_attempts += cfg['flood_max_retries'] + + # Track current path description for delivery info + path_desc = "FLOOD" if not has_path else "DIRECT" + + logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, " + f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, " + f"max_attempts={max_attempts}, wait={wait_s:.0f}s") + + # ── Local helper: emit status, send, store delivery info on success ── + async def _retry(attempt_num, min_wait_s): + display = attempt_num + 1 # attempt 0 = initial send = display 1 + self._emit_retry_status(dm_id, initial_ack, display, max_attempts) + delivered = await self._dm_retry_send_and_wait( + contact, text, timestamp, attempt_num, dm_id, + suggested_timeout, min_wait_s + ) + if delivered: + self.db.update_dm_delivery_info( + dm_id, display, max_attempts, path_desc) + return delivered + + # ── Emit status for initial send (attempt 1) and wait for ACK ── + self._emit_retry_status(dm_id, initial_ack, 1, max_attempts) + if initial_ack: + logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...") + ack_event = await self.mc.dispatcher.wait_for_event( + EventType.ACK, + attribute_filters={"code": initial_ack}, + timeout=wait_s + ) + if ack_event: + self._confirm_delivery(dm_id, initial_ack, ack_event) + self.db.update_dm_delivery_info(dm_id, 1, max_attempts, path_desc) + return + logger.debug(f"DM retry: initial ACK not received (timeout)") + + attempt = 0 # Global attempt counter (0 = initial send already done) + # ════════════════════════════════════════════════════════════ # Scenario 1: No path, no configured paths → FLOOD only # ════════════════════════════════════════════════════════════ if not has_path and not has_configured_paths: for _ in range(cfg['flood_max_retries']): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, float(cfg['flood_interval']) - ): + if await _retry(attempt, float(cfg['flood_interval'])): return # ════════════════════════════════════════════════════════════ @@ -1348,10 +1408,7 @@ class DeviceManager: # Phase 1: Direct retries on current ŚK for _ in range(cfg['direct_max_retries']): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, float(cfg['direct_interval']) - ): + if await _retry(attempt, float(cfg['direct_interval'])): return # Phase 2: Optional FLOOD fallback (controlled by no_auto_flood) @@ -1361,51 +1418,40 @@ class DeviceManager: logger.info("DM retry: direct exhausted, resetting to FLOOD") except Exception: pass + path_desc = "FLOOD" for _ in range(cfg['direct_flood_retries']): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, float(cfg['flood_interval']) - ): + if await _retry(attempt, float(cfg['flood_interval'])): return # ════════════════════════════════════════════════════════════ # Scenario 3: No path, has configured paths → FLOOD first, then ŚD rotation # ════════════════════════════════════════════════════════════ elif not has_path and has_configured_paths: - primary_path, other_paths = _split_primary_and_others(configured_paths) - # Phase 1: FLOOD retries per NoPath settings (discover new path) logger.info("DM retry: FLOOD first to discover new path") for _ in range(cfg['flood_max_retries']): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, float(cfg['flood_interval']) - ): + if await _retry(attempt, float(cfg['flood_interval'])): return # Firmware sets discovered path as ŚK # Phase 2: ŚD rotation (primary first, then others by sort_order) logger.info("DM retry: FLOOD exhausted, rotating through configured paths") - rotation_order = ([primary_path] if primary_path else []) + other_paths - retries_per_path = max(1, cfg['direct_max_retries']) direct_interval = float(cfg['direct_interval']) for path_info in rotation_order: try: await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size']) - logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' " - f"({path_info['path_hex']})") + label = path_info.get('label', '') + path_desc = f"{label} ({path_info['path_hex']})" if label else path_info['path_hex'] + logger.info(f"DM retry: switched to path '{label}' ({path_info['path_hex']})") except Exception as e: logger.warning(f"DM retry: failed to switch path: {e}") continue for _ in range(retries_per_path): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, direct_interval - ): + if await _retry(attempt, direct_interval): await self._restore_primary_path(contact, contact_pubkey) return @@ -1416,21 +1462,14 @@ class DeviceManager: # Scenario 4: Has path + has configured paths → DIRECT on ŚK, ŚD rotation, optional FLOOD # ════════════════════════════════════════════════════════════ else: # has_path and has_configured_paths - primary_path, other_paths = _split_primary_and_others(configured_paths) - # Phase 1: Direct retries on current ŚK for _ in range(cfg['direct_max_retries']): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, float(cfg['direct_interval']) - ): + if await _retry(attempt, float(cfg['direct_interval'])): return # Delivered on ŚK, no path change needed # Phase 2: ŚD rotation with dedup logger.info("DM retry: direct on ŚK exhausted, rotating through configured paths") - rotation_order = ([primary_path] if primary_path else []) + other_paths - retries_per_path = max(1, cfg['direct_max_retries']) direct_interval = float(cfg['direct_interval']) for path_info in rotation_order: @@ -1442,18 +1481,16 @@ class DeviceManager: try: await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size']) - logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' " - f"({path_info['path_hex']})") + label = path_info.get('label', '') + path_desc = f"{label} ({path_info['path_hex']})" if label else path_info['path_hex'] + logger.info(f"DM retry: switched to path '{label}' ({path_info['path_hex']})") except Exception as e: logger.warning(f"DM retry: failed to switch path: {e}") continue for _ in range(retries_per_path): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, direct_interval - ): + if await _retry(attempt, direct_interval): await self._restore_primary_path(contact, contact_pubkey) return @@ -1464,19 +1501,19 @@ class DeviceManager: logger.info("DM retry: all paths exhausted, falling back to FLOOD") except Exception: pass + path_desc = "FLOOD" for _ in range(cfg['flood_max_retries']): attempt += 1 - if await self._dm_retry_send_and_wait( - contact, text, timestamp, attempt, dm_id, - suggested_timeout, float(cfg['flood_interval']) - ): + if await _retry(attempt, float(cfg['flood_interval'])): await self._restore_primary_path(contact, contact_pubkey) return # Restore ŚG regardless of outcome await self._restore_primary_path(contact, contact_pubkey) - # ── Common epilogue: grace period for late ACKs ── + # ── Common epilogue: mark failed, grace period for late ACKs ── + self.db.update_dm_delivery_status(dm_id, 'failed') + self._emit_retry_failed(dm_id, initial_ack) logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, scenario={scenario}) " f"for dm_id={dm_id}") self._retry_tasks.pop(dm_id, None) diff --git a/app/routes/api.py b/app/routes/api.py index 6472899..3692e13 100644 --- a/app/routes/api.py +++ b/app/routes/api.py @@ -1980,6 +1980,7 @@ def get_dm_messages(): for row in db_msgs: messages.append({ 'type': 'dm', + 'id': row['id'], 'direction': 'incoming' if row['direction'] == 'in' else 'outgoing', 'sender': row.get('contact_pubkey', ''), 'content': row.get('content', ''), @@ -1989,6 +1990,10 @@ def get_dm_messages(): 'snr': row.get('snr'), 'path_len': row.get('path_len'), 'expected_ack': row.get('expected_ack'), + 'delivery_status': row.get('delivery_status'), + 'delivery_attempt': row.get('delivery_attempt'), + 'delivery_max_attempts': row.get('delivery_max_attempts'), + 'delivery_path': row.get('delivery_path'), 'conversation_id': conversation_id, }) else: @@ -2040,6 +2045,12 @@ def get_dm_messages(): except Exception as e: logger.debug(f"ACK status fetch failed (non-critical): {e}") + # Set failed status for messages without ACK but marked failed in DB + for msg in messages: + if msg.get('direction') == 'outgoing' and msg.get('status') != 'delivered': + if msg.get('delivery_status') == 'failed': + msg['status'] = 'failed' + return jsonify({ 'success': True, 'conversation_id': conversation_id, diff --git a/app/static/css/style.css b/app/static/css/style.css index f6f9d36..0d9e8a5 100644 --- a/app/static/css/style.css +++ b/app/static/css/style.css @@ -738,6 +738,12 @@ main { position: relative; } +.dm-retry-info { + font-size: 0.6rem; + color: var(--text-meta); + margin-left: 0.15rem; +} + .dm-delivery-popup { position: absolute; bottom: 100%; diff --git a/app/static/js/dm.js b/app/static/js/dm.js index 4a387a4..4451bc7 100644 --- a/app/static/js/dm.js +++ b/app/static/js/dm.js @@ -112,10 +112,44 @@ function connectChatSocket() { if (data.snr != null) tooltip.push(`SNR: ${data.snr}`); if (data.route_type) tooltip.push(`Route: ${data.route_type}`); statusEl.title = tooltip.length > 0 ? tooltip.join(', ') : 'Delivered'; + // Remove retry counter if present + const wrapper = statusEl.closest('[data-dm-id]'); + if (wrapper) { + const info = wrapper.querySelector('.dm-retry-info'); + if (info) info.remove(); + // Unwrap: replace wrapper span with just the icon + wrapper.replaceWith(statusEl); + } } }); }); + // Real-time DM retry progress + chatSocket.on('dm_retry_status', (data) => { + if (!data.dm_id) return; + const wrapper = document.querySelector(`[data-dm-id="${data.dm_id}"]`); + if (!wrapper) return; + const info = wrapper.querySelector('.dm-retry-info'); + if (info) info.textContent = `${data.attempt}/${data.max_attempts}`; + }); + + // DM retry exhausted — mark as failed + chatSocket.on('dm_retry_failed', (data) => { + if (!data.dm_id) return; + const wrapper = document.querySelector(`[data-dm-id="${data.dm_id}"]`); + if (!wrapper) return; + const icon = wrapper.querySelector('.dm-status'); + if (icon) { + icon.className = 'bi bi-x-circle dm-status timeout'; + icon.title = 'Delivery failed — all retries exhausted'; + } + const info = wrapper.querySelector('.dm-retry-info'); + if (info) info.remove(); + // Remove onclick + wrapper.removeAttribute('onclick'); + wrapper.classList.remove('dm-status-unknown'); + }); + // Real-time device status chatSocket.on('device_status', (data) => { updateStatus(data.connected ? 'connected' : 'disconnected'); @@ -1089,18 +1123,25 @@ function displayMessages(messages) { let statusIcon = ''; if (msg.is_own) { const ackAttr = msg.expected_ack ? ` data-ack="${msg.expected_ack}"` : ''; + const dmIdAttr = msg.id ? ` data-dm-id="${msg.id}"` : ''; if (msg.status === 'delivered') { let title = 'Delivered'; + if (msg.delivery_attempt && msg.delivery_max_attempts) { + title += ` (${msg.delivery_attempt}/${msg.delivery_max_attempts})`; + } + if (msg.delivery_path) title += `, Path: ${msg.delivery_path}`; if (msg.delivery_snr !== null && msg.delivery_snr !== undefined) { title += `, SNR: ${msg.delivery_snr.toFixed(1)} dB`; } if (msg.delivery_route) title += ` (${msg.delivery_route})`; statusIcon = ``; + } else if (msg.status === 'failed') { + statusIcon = ``; } else if (msg.status === 'pending') { statusIcon = ``; } else { - // No ACK received — show clickable "?" with explanation - statusIcon = ``; + // No ACK received — show clickable "?" with retry counter + statusIcon = ``; } }