diff --git a/app/database.py b/app/database.py
index fe535ef..33babd9 100644
--- a/app/database.py
+++ b/app/database.py
@@ -44,6 +44,18 @@ class Database:
conn.execute("ALTER TABLE contacts ADD COLUMN no_auto_flood INTEGER DEFAULT 0")
logger.info("Migration: added contacts.no_auto_flood column")
+ # Add delivery tracking columns to direct_messages
+ dm_columns = {r[1] for r in conn.execute("PRAGMA table_info(direct_messages)").fetchall()}
+ for col, typedef in [
+ ('delivery_status', 'TEXT'),
+ ('delivery_attempt', 'INTEGER'),
+ ('delivery_max_attempts', 'INTEGER'),
+ ('delivery_path', 'TEXT'),
+ ]:
+ if col not in dm_columns:
+ conn.execute(f"ALTER TABLE direct_messages ADD COLUMN {col} {typedef}")
+ logger.info(f"Migration: added direct_messages.{col} column")
+
@contextmanager
def _connect(self):
"""Yield a connection with auto-commit/rollback."""
@@ -660,6 +672,22 @@ class Database:
).fetchone()
return dict(row) if row else None
+ def update_dm_delivery_info(self, dm_id: int, attempt: int,
+ max_attempts: int, path: str):
+ """Store successful delivery details (attempt number, path used)."""
+ with self._connect() as conn:
+ conn.execute(
+ "UPDATE direct_messages SET delivery_attempt=?, "
+ "delivery_max_attempts=?, delivery_path=? WHERE id=?",
+ (attempt, max_attempts, path, dm_id))
+
+ def update_dm_delivery_status(self, dm_id: int, status: str):
+ """Mark message delivery as failed."""
+ with self._connect() as conn:
+ conn.execute(
+ "UPDATE direct_messages SET delivery_status=? WHERE id=?",
+ (status, dm_id))
+
def relink_orphaned_dms(self, public_key: str, name: str = '') -> int:
"""Re-link DMs with NULL contact_pubkey back to this contact.
diff --git a/app/device_manager.py b/app/device_manager.py
index 8a2aa19..b974a25 100644
--- a/app/device_manager.py
+++ b/app/device_manager.py
@@ -1233,6 +1233,25 @@ class DeviceManager:
return False
+ def _emit_retry_status(self, dm_id: int, expected_ack: str,
+ attempt: int, max_attempts: int):
+ """Notify frontend about retry progress."""
+ if self.socketio:
+ self.socketio.emit('dm_retry_status', {
+ 'dm_id': dm_id,
+ 'expected_ack': expected_ack,
+ 'attempt': attempt,
+ 'max_attempts': max_attempts,
+ }, namespace='/chat')
+
+ def _emit_retry_failed(self, dm_id: int, expected_ack: str):
+ """Notify frontend that all retry attempts were exhausted."""
+ if self.socketio:
+ self.socketio.emit('dm_retry_failed', {
+ 'dm_id': dm_id,
+ 'expected_ack': expected_ack,
+ }, namespace='/chat')
+
@staticmethod
def _paths_match(contact_out_path: str, contact_out_path_len: int,
configured_path: dict) -> bool:
@@ -1299,27 +1318,8 @@ class DeviceManager:
else:
scenario = "S1_FLOOD"
- logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, "
- f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
- f"wait={wait_s:.0f}s")
-
- # ── Wait for ACK on initial send ──
- if initial_ack:
- logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...")
- ack_event = await self.mc.dispatcher.wait_for_event(
- EventType.ACK,
- attribute_filters={"code": initial_ack},
- timeout=wait_s
- )
- if ack_event:
- self._confirm_delivery(dm_id, initial_ack, ack_event)
- return
- logger.debug(f"DM retry: initial ACK not received (timeout)")
-
- attempt = 0 # Global attempt counter (0 = initial send already done)
-
+ # ── Pre-compute path split and max_attempts ──
def _split_primary_and_others(paths):
- """Separate primary (starred) path from the rest."""
primary = None
others = []
for p in paths:
@@ -1329,16 +1329,76 @@ class DeviceManager:
others.append(p)
return primary, others
+ primary_path = None
+ other_paths = []
+ rotation_order = []
+ if has_configured_paths:
+ primary_path, other_paths = _split_primary_and_others(configured_paths)
+ rotation_order = ([primary_path] if primary_path else []) + other_paths
+
+ retries_per_path = max(1, cfg['direct_max_retries'])
+
+ if scenario == "S1_FLOOD":
+ max_attempts = 1 + cfg['flood_max_retries']
+ elif scenario == "S2_DIRECT_FLOOD":
+ max_attempts = 1 + cfg['direct_max_retries']
+ if not no_auto_flood:
+ max_attempts += cfg['direct_flood_retries']
+ elif scenario == "S3_FLOOD_SD":
+ max_attempts = (1 + cfg['flood_max_retries']
+ + len(rotation_order) * retries_per_path)
+ else: # S4
+ deduped = sum(1 for p in rotation_order
+ if self._paths_match(original_out_path, original_out_path_len, p))
+ effective_sd = len(rotation_order) - deduped
+ max_attempts = 1 + cfg['direct_max_retries'] + effective_sd * retries_per_path
+ if not no_auto_flood:
+ max_attempts += cfg['flood_max_retries']
+
+ # Track current path description for delivery info
+ path_desc = "FLOOD" if not has_path else "DIRECT"
+
+ logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, "
+ f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
+ f"max_attempts={max_attempts}, wait={wait_s:.0f}s")
+
+ # ── Local helper: emit status, send, store delivery info on success ──
+ async def _retry(attempt_num, min_wait_s):
+ display = attempt_num + 1 # attempt 0 = initial send = display 1
+ self._emit_retry_status(dm_id, initial_ack, display, max_attempts)
+ delivered = await self._dm_retry_send_and_wait(
+ contact, text, timestamp, attempt_num, dm_id,
+ suggested_timeout, min_wait_s
+ )
+ if delivered:
+ self.db.update_dm_delivery_info(
+ dm_id, display, max_attempts, path_desc)
+ return delivered
+
+ # ── Emit status for initial send (attempt 1) and wait for ACK ──
+ self._emit_retry_status(dm_id, initial_ack, 1, max_attempts)
+ if initial_ack:
+ logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...")
+ ack_event = await self.mc.dispatcher.wait_for_event(
+ EventType.ACK,
+ attribute_filters={"code": initial_ack},
+ timeout=wait_s
+ )
+ if ack_event:
+ self._confirm_delivery(dm_id, initial_ack, ack_event)
+ self.db.update_dm_delivery_info(dm_id, 1, max_attempts, path_desc)
+ return
+ logger.debug(f"DM retry: initial ACK not received (timeout)")
+
+ attempt = 0 # Global attempt counter (0 = initial send already done)
+
# ════════════════════════════════════════════════════════════
# Scenario 1: No path, no configured paths → FLOOD only
# ════════════════════════════════════════════════════════════
if not has_path and not has_configured_paths:
for _ in range(cfg['flood_max_retries']):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, float(cfg['flood_interval'])
- ):
+ if await _retry(attempt, float(cfg['flood_interval'])):
return
# ════════════════════════════════════════════════════════════
@@ -1348,10 +1408,7 @@ class DeviceManager:
# Phase 1: Direct retries on current ŚK
for _ in range(cfg['direct_max_retries']):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, float(cfg['direct_interval'])
- ):
+ if await _retry(attempt, float(cfg['direct_interval'])):
return
# Phase 2: Optional FLOOD fallback (controlled by no_auto_flood)
@@ -1361,51 +1418,40 @@ class DeviceManager:
logger.info("DM retry: direct exhausted, resetting to FLOOD")
except Exception:
pass
+ path_desc = "FLOOD"
for _ in range(cfg['direct_flood_retries']):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, float(cfg['flood_interval'])
- ):
+ if await _retry(attempt, float(cfg['flood_interval'])):
return
# ════════════════════════════════════════════════════════════
# Scenario 3: No path, has configured paths → FLOOD first, then ŚD rotation
# ════════════════════════════════════════════════════════════
elif not has_path and has_configured_paths:
- primary_path, other_paths = _split_primary_and_others(configured_paths)
-
# Phase 1: FLOOD retries per NoPath settings (discover new path)
logger.info("DM retry: FLOOD first to discover new path")
for _ in range(cfg['flood_max_retries']):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, float(cfg['flood_interval'])
- ):
+ if await _retry(attempt, float(cfg['flood_interval'])):
return # Firmware sets discovered path as ŚK
# Phase 2: ŚD rotation (primary first, then others by sort_order)
logger.info("DM retry: FLOOD exhausted, rotating through configured paths")
- rotation_order = ([primary_path] if primary_path else []) + other_paths
- retries_per_path = max(1, cfg['direct_max_retries'])
direct_interval = float(cfg['direct_interval'])
for path_info in rotation_order:
try:
await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size'])
- logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' "
- f"({path_info['path_hex']})")
+ label = path_info.get('label', '')
+ path_desc = f"{label} ({path_info['path_hex']})" if label else path_info['path_hex']
+ logger.info(f"DM retry: switched to path '{label}' ({path_info['path_hex']})")
except Exception as e:
logger.warning(f"DM retry: failed to switch path: {e}")
continue
for _ in range(retries_per_path):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, direct_interval
- ):
+ if await _retry(attempt, direct_interval):
await self._restore_primary_path(contact, contact_pubkey)
return
@@ -1416,21 +1462,14 @@ class DeviceManager:
# Scenario 4: Has path + has configured paths → DIRECT on ŚK, ŚD rotation, optional FLOOD
# ════════════════════════════════════════════════════════════
else: # has_path and has_configured_paths
- primary_path, other_paths = _split_primary_and_others(configured_paths)
-
# Phase 1: Direct retries on current ŚK
for _ in range(cfg['direct_max_retries']):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, float(cfg['direct_interval'])
- ):
+ if await _retry(attempt, float(cfg['direct_interval'])):
return # Delivered on ŚK, no path change needed
# Phase 2: ŚD rotation with dedup
logger.info("DM retry: direct on ŚK exhausted, rotating through configured paths")
- rotation_order = ([primary_path] if primary_path else []) + other_paths
- retries_per_path = max(1, cfg['direct_max_retries'])
direct_interval = float(cfg['direct_interval'])
for path_info in rotation_order:
@@ -1442,18 +1481,16 @@ class DeviceManager:
try:
await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size'])
- logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' "
- f"({path_info['path_hex']})")
+ label = path_info.get('label', '')
+ path_desc = f"{label} ({path_info['path_hex']})" if label else path_info['path_hex']
+ logger.info(f"DM retry: switched to path '{label}' ({path_info['path_hex']})")
except Exception as e:
logger.warning(f"DM retry: failed to switch path: {e}")
continue
for _ in range(retries_per_path):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, direct_interval
- ):
+ if await _retry(attempt, direct_interval):
await self._restore_primary_path(contact, contact_pubkey)
return
@@ -1464,19 +1501,19 @@ class DeviceManager:
logger.info("DM retry: all paths exhausted, falling back to FLOOD")
except Exception:
pass
+ path_desc = "FLOOD"
for _ in range(cfg['flood_max_retries']):
attempt += 1
- if await self._dm_retry_send_and_wait(
- contact, text, timestamp, attempt, dm_id,
- suggested_timeout, float(cfg['flood_interval'])
- ):
+ if await _retry(attempt, float(cfg['flood_interval'])):
await self._restore_primary_path(contact, contact_pubkey)
return
# Restore ŚG regardless of outcome
await self._restore_primary_path(contact, contact_pubkey)
- # ── Common epilogue: grace period for late ACKs ──
+ # ── Common epilogue: mark failed, grace period for late ACKs ──
+ self.db.update_dm_delivery_status(dm_id, 'failed')
+ self._emit_retry_failed(dm_id, initial_ack)
logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, scenario={scenario}) "
f"for dm_id={dm_id}")
self._retry_tasks.pop(dm_id, None)
diff --git a/app/routes/api.py b/app/routes/api.py
index 6472899..3692e13 100644
--- a/app/routes/api.py
+++ b/app/routes/api.py
@@ -1980,6 +1980,7 @@ def get_dm_messages():
for row in db_msgs:
messages.append({
'type': 'dm',
+ 'id': row['id'],
'direction': 'incoming' if row['direction'] == 'in' else 'outgoing',
'sender': row.get('contact_pubkey', ''),
'content': row.get('content', ''),
@@ -1989,6 +1990,10 @@ def get_dm_messages():
'snr': row.get('snr'),
'path_len': row.get('path_len'),
'expected_ack': row.get('expected_ack'),
+ 'delivery_status': row.get('delivery_status'),
+ 'delivery_attempt': row.get('delivery_attempt'),
+ 'delivery_max_attempts': row.get('delivery_max_attempts'),
+ 'delivery_path': row.get('delivery_path'),
'conversation_id': conversation_id,
})
else:
@@ -2040,6 +2045,12 @@ def get_dm_messages():
except Exception as e:
logger.debug(f"ACK status fetch failed (non-critical): {e}")
+ # Set failed status for messages without ACK but marked failed in DB
+ for msg in messages:
+ if msg.get('direction') == 'outgoing' and msg.get('status') != 'delivered':
+ if msg.get('delivery_status') == 'failed':
+ msg['status'] = 'failed'
+
return jsonify({
'success': True,
'conversation_id': conversation_id,
diff --git a/app/static/css/style.css b/app/static/css/style.css
index f6f9d36..0d9e8a5 100644
--- a/app/static/css/style.css
+++ b/app/static/css/style.css
@@ -738,6 +738,12 @@ main {
position: relative;
}
+.dm-retry-info {
+ font-size: 0.6rem;
+ color: var(--text-meta);
+ margin-left: 0.15rem;
+}
+
.dm-delivery-popup {
position: absolute;
bottom: 100%;
diff --git a/app/static/js/dm.js b/app/static/js/dm.js
index 4a387a4..4451bc7 100644
--- a/app/static/js/dm.js
+++ b/app/static/js/dm.js
@@ -112,10 +112,44 @@ function connectChatSocket() {
if (data.snr != null) tooltip.push(`SNR: ${data.snr}`);
if (data.route_type) tooltip.push(`Route: ${data.route_type}`);
statusEl.title = tooltip.length > 0 ? tooltip.join(', ') : 'Delivered';
+ // Remove retry counter if present
+ const wrapper = statusEl.closest('[data-dm-id]');
+ if (wrapper) {
+ const info = wrapper.querySelector('.dm-retry-info');
+ if (info) info.remove();
+ // Unwrap: replace wrapper span with just the icon
+ wrapper.replaceWith(statusEl);
+ }
}
});
});
+ // Real-time DM retry progress
+ chatSocket.on('dm_retry_status', (data) => {
+ if (!data.dm_id) return;
+ const wrapper = document.querySelector(`[data-dm-id="${data.dm_id}"]`);
+ if (!wrapper) return;
+ const info = wrapper.querySelector('.dm-retry-info');
+ if (info) info.textContent = `${data.attempt}/${data.max_attempts}`;
+ });
+
+ // DM retry exhausted — mark as failed
+ chatSocket.on('dm_retry_failed', (data) => {
+ if (!data.dm_id) return;
+ const wrapper = document.querySelector(`[data-dm-id="${data.dm_id}"]`);
+ if (!wrapper) return;
+ const icon = wrapper.querySelector('.dm-status');
+ if (icon) {
+ icon.className = 'bi bi-x-circle dm-status timeout';
+ icon.title = 'Delivery failed — all retries exhausted';
+ }
+ const info = wrapper.querySelector('.dm-retry-info');
+ if (info) info.remove();
+ // Remove onclick
+ wrapper.removeAttribute('onclick');
+ wrapper.classList.remove('dm-status-unknown');
+ });
+
// Real-time device status
chatSocket.on('device_status', (data) => {
updateStatus(data.connected ? 'connected' : 'disconnected');
@@ -1089,18 +1123,25 @@ function displayMessages(messages) {
let statusIcon = '';
if (msg.is_own) {
const ackAttr = msg.expected_ack ? ` data-ack="${msg.expected_ack}"` : '';
+ const dmIdAttr = msg.id ? ` data-dm-id="${msg.id}"` : '';
if (msg.status === 'delivered') {
let title = 'Delivered';
+ if (msg.delivery_attempt && msg.delivery_max_attempts) {
+ title += ` (${msg.delivery_attempt}/${msg.delivery_max_attempts})`;
+ }
+ if (msg.delivery_path) title += `, Path: ${msg.delivery_path}`;
if (msg.delivery_snr !== null && msg.delivery_snr !== undefined) {
title += `, SNR: ${msg.delivery_snr.toFixed(1)} dB`;
}
if (msg.delivery_route) title += ` (${msg.delivery_route})`;
statusIcon = ``;
+ } else if (msg.status === 'failed') {
+ statusIcon = ``;
} else if (msg.status === 'pending') {
statusIcon = ``;
} else {
- // No ACK received — show clickable "?" with explanation
- statusIcon = ``;
+ // No ACK received — show clickable "?" with retry counter
+ statusIcon = ``;
}
}