mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-05-05 13:02:34 +02:00
feat(dm): add real-time retry status and persistent delivery info
Show retry progress in DM message bubble via WebSocket: - "attempt X/Y" counter updates in real-time during retries - Failed icon (✗) when all retries exhausted - Delivery info persisted in DB (attempt number, path used) Backend: emit dm_retry_status/dm_retry_failed socket events, store delivery_attempt/delivery_path in direct_messages table. Frontend: socket listeners update status icon and counter, delivered tooltip shows attempt info and path. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -44,6 +44,18 @@ class Database:
|
||||
conn.execute("ALTER TABLE contacts ADD COLUMN no_auto_flood INTEGER DEFAULT 0")
|
||||
logger.info("Migration: added contacts.no_auto_flood column")
|
||||
|
||||
# Add delivery tracking columns to direct_messages
|
||||
dm_columns = {r[1] for r in conn.execute("PRAGMA table_info(direct_messages)").fetchall()}
|
||||
for col, typedef in [
|
||||
('delivery_status', 'TEXT'),
|
||||
('delivery_attempt', 'INTEGER'),
|
||||
('delivery_max_attempts', 'INTEGER'),
|
||||
('delivery_path', 'TEXT'),
|
||||
]:
|
||||
if col not in dm_columns:
|
||||
conn.execute(f"ALTER TABLE direct_messages ADD COLUMN {col} {typedef}")
|
||||
logger.info(f"Migration: added direct_messages.{col} column")
|
||||
|
||||
@contextmanager
|
||||
def _connect(self):
|
||||
"""Yield a connection with auto-commit/rollback."""
|
||||
@@ -660,6 +672,22 @@ class Database:
|
||||
).fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
def update_dm_delivery_info(self, dm_id: int, attempt: int,
|
||||
max_attempts: int, path: str):
|
||||
"""Store successful delivery details (attempt number, path used)."""
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"UPDATE direct_messages SET delivery_attempt=?, "
|
||||
"delivery_max_attempts=?, delivery_path=? WHERE id=?",
|
||||
(attempt, max_attempts, path, dm_id))
|
||||
|
||||
def update_dm_delivery_status(self, dm_id: int, status: str):
|
||||
"""Mark message delivery as failed."""
|
||||
with self._connect() as conn:
|
||||
conn.execute(
|
||||
"UPDATE direct_messages SET delivery_status=? WHERE id=?",
|
||||
(status, dm_id))
|
||||
|
||||
def relink_orphaned_dms(self, public_key: str, name: str = '') -> int:
|
||||
"""Re-link DMs with NULL contact_pubkey back to this contact.
|
||||
|
||||
|
||||
@@ -1233,6 +1233,25 @@ class DeviceManager:
|
||||
|
||||
return False
|
||||
|
||||
def _emit_retry_status(self, dm_id: int, expected_ack: str,
|
||||
attempt: int, max_attempts: int):
|
||||
"""Notify frontend about retry progress."""
|
||||
if self.socketio:
|
||||
self.socketio.emit('dm_retry_status', {
|
||||
'dm_id': dm_id,
|
||||
'expected_ack': expected_ack,
|
||||
'attempt': attempt,
|
||||
'max_attempts': max_attempts,
|
||||
}, namespace='/chat')
|
||||
|
||||
def _emit_retry_failed(self, dm_id: int, expected_ack: str):
|
||||
"""Notify frontend that all retry attempts were exhausted."""
|
||||
if self.socketio:
|
||||
self.socketio.emit('dm_retry_failed', {
|
||||
'dm_id': dm_id,
|
||||
'expected_ack': expected_ack,
|
||||
}, namespace='/chat')
|
||||
|
||||
@staticmethod
|
||||
def _paths_match(contact_out_path: str, contact_out_path_len: int,
|
||||
configured_path: dict) -> bool:
|
||||
@@ -1299,27 +1318,8 @@ class DeviceManager:
|
||||
else:
|
||||
scenario = "S1_FLOOD"
|
||||
|
||||
logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, "
|
||||
f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
|
||||
f"wait={wait_s:.0f}s")
|
||||
|
||||
# ── Wait for ACK on initial send ──
|
||||
if initial_ack:
|
||||
logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...")
|
||||
ack_event = await self.mc.dispatcher.wait_for_event(
|
||||
EventType.ACK,
|
||||
attribute_filters={"code": initial_ack},
|
||||
timeout=wait_s
|
||||
)
|
||||
if ack_event:
|
||||
self._confirm_delivery(dm_id, initial_ack, ack_event)
|
||||
return
|
||||
logger.debug(f"DM retry: initial ACK not received (timeout)")
|
||||
|
||||
attempt = 0 # Global attempt counter (0 = initial send already done)
|
||||
|
||||
# ── Pre-compute path split and max_attempts ──
|
||||
def _split_primary_and_others(paths):
|
||||
"""Separate primary (starred) path from the rest."""
|
||||
primary = None
|
||||
others = []
|
||||
for p in paths:
|
||||
@@ -1329,16 +1329,76 @@ class DeviceManager:
|
||||
others.append(p)
|
||||
return primary, others
|
||||
|
||||
primary_path = None
|
||||
other_paths = []
|
||||
rotation_order = []
|
||||
if has_configured_paths:
|
||||
primary_path, other_paths = _split_primary_and_others(configured_paths)
|
||||
rotation_order = ([primary_path] if primary_path else []) + other_paths
|
||||
|
||||
retries_per_path = max(1, cfg['direct_max_retries'])
|
||||
|
||||
if scenario == "S1_FLOOD":
|
||||
max_attempts = 1 + cfg['flood_max_retries']
|
||||
elif scenario == "S2_DIRECT_FLOOD":
|
||||
max_attempts = 1 + cfg['direct_max_retries']
|
||||
if not no_auto_flood:
|
||||
max_attempts += cfg['direct_flood_retries']
|
||||
elif scenario == "S3_FLOOD_SD":
|
||||
max_attempts = (1 + cfg['flood_max_retries']
|
||||
+ len(rotation_order) * retries_per_path)
|
||||
else: # S4
|
||||
deduped = sum(1 for p in rotation_order
|
||||
if self._paths_match(original_out_path, original_out_path_len, p))
|
||||
effective_sd = len(rotation_order) - deduped
|
||||
max_attempts = 1 + cfg['direct_max_retries'] + effective_sd * retries_per_path
|
||||
if not no_auto_flood:
|
||||
max_attempts += cfg['flood_max_retries']
|
||||
|
||||
# Track current path description for delivery info
|
||||
path_desc = "FLOOD" if not has_path else "DIRECT"
|
||||
|
||||
logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, "
|
||||
f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
|
||||
f"max_attempts={max_attempts}, wait={wait_s:.0f}s")
|
||||
|
||||
# ── Local helper: emit status, send, store delivery info on success ──
|
||||
async def _retry(attempt_num, min_wait_s):
|
||||
display = attempt_num + 1 # attempt 0 = initial send = display 1
|
||||
self._emit_retry_status(dm_id, initial_ack, display, max_attempts)
|
||||
delivered = await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt_num, dm_id,
|
||||
suggested_timeout, min_wait_s
|
||||
)
|
||||
if delivered:
|
||||
self.db.update_dm_delivery_info(
|
||||
dm_id, display, max_attempts, path_desc)
|
||||
return delivered
|
||||
|
||||
# ── Emit status for initial send (attempt 1) and wait for ACK ──
|
||||
self._emit_retry_status(dm_id, initial_ack, 1, max_attempts)
|
||||
if initial_ack:
|
||||
logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...")
|
||||
ack_event = await self.mc.dispatcher.wait_for_event(
|
||||
EventType.ACK,
|
||||
attribute_filters={"code": initial_ack},
|
||||
timeout=wait_s
|
||||
)
|
||||
if ack_event:
|
||||
self._confirm_delivery(dm_id, initial_ack, ack_event)
|
||||
self.db.update_dm_delivery_info(dm_id, 1, max_attempts, path_desc)
|
||||
return
|
||||
logger.debug(f"DM retry: initial ACK not received (timeout)")
|
||||
|
||||
attempt = 0 # Global attempt counter (0 = initial send already done)
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# Scenario 1: No path, no configured paths → FLOOD only
|
||||
# ════════════════════════════════════════════════════════════
|
||||
if not has_path and not has_configured_paths:
|
||||
for _ in range(cfg['flood_max_retries']):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, float(cfg['flood_interval'])
|
||||
):
|
||||
if await _retry(attempt, float(cfg['flood_interval'])):
|
||||
return
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
@@ -1348,10 +1408,7 @@ class DeviceManager:
|
||||
# Phase 1: Direct retries on current ŚK
|
||||
for _ in range(cfg['direct_max_retries']):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, float(cfg['direct_interval'])
|
||||
):
|
||||
if await _retry(attempt, float(cfg['direct_interval'])):
|
||||
return
|
||||
|
||||
# Phase 2: Optional FLOOD fallback (controlled by no_auto_flood)
|
||||
@@ -1361,51 +1418,40 @@ class DeviceManager:
|
||||
logger.info("DM retry: direct exhausted, resetting to FLOOD")
|
||||
except Exception:
|
||||
pass
|
||||
path_desc = "FLOOD"
|
||||
for _ in range(cfg['direct_flood_retries']):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, float(cfg['flood_interval'])
|
||||
):
|
||||
if await _retry(attempt, float(cfg['flood_interval'])):
|
||||
return
|
||||
|
||||
# ════════════════════════════════════════════════════════════
|
||||
# Scenario 3: No path, has configured paths → FLOOD first, then ŚD rotation
|
||||
# ════════════════════════════════════════════════════════════
|
||||
elif not has_path and has_configured_paths:
|
||||
primary_path, other_paths = _split_primary_and_others(configured_paths)
|
||||
|
||||
# Phase 1: FLOOD retries per NoPath settings (discover new path)
|
||||
logger.info("DM retry: FLOOD first to discover new path")
|
||||
for _ in range(cfg['flood_max_retries']):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, float(cfg['flood_interval'])
|
||||
):
|
||||
if await _retry(attempt, float(cfg['flood_interval'])):
|
||||
return # Firmware sets discovered path as ŚK
|
||||
|
||||
# Phase 2: ŚD rotation (primary first, then others by sort_order)
|
||||
logger.info("DM retry: FLOOD exhausted, rotating through configured paths")
|
||||
rotation_order = ([primary_path] if primary_path else []) + other_paths
|
||||
retries_per_path = max(1, cfg['direct_max_retries'])
|
||||
direct_interval = float(cfg['direct_interval'])
|
||||
|
||||
for path_info in rotation_order:
|
||||
try:
|
||||
await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size'])
|
||||
logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' "
|
||||
f"({path_info['path_hex']})")
|
||||
label = path_info.get('label', '')
|
||||
path_desc = f"{label} ({path_info['path_hex']})" if label else path_info['path_hex']
|
||||
logger.info(f"DM retry: switched to path '{label}' ({path_info['path_hex']})")
|
||||
except Exception as e:
|
||||
logger.warning(f"DM retry: failed to switch path: {e}")
|
||||
continue
|
||||
|
||||
for _ in range(retries_per_path):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, direct_interval
|
||||
):
|
||||
if await _retry(attempt, direct_interval):
|
||||
await self._restore_primary_path(contact, contact_pubkey)
|
||||
return
|
||||
|
||||
@@ -1416,21 +1462,14 @@ class DeviceManager:
|
||||
# Scenario 4: Has path + has configured paths → DIRECT on ŚK, ŚD rotation, optional FLOOD
|
||||
# ════════════════════════════════════════════════════════════
|
||||
else: # has_path and has_configured_paths
|
||||
primary_path, other_paths = _split_primary_and_others(configured_paths)
|
||||
|
||||
# Phase 1: Direct retries on current ŚK
|
||||
for _ in range(cfg['direct_max_retries']):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, float(cfg['direct_interval'])
|
||||
):
|
||||
if await _retry(attempt, float(cfg['direct_interval'])):
|
||||
return # Delivered on ŚK, no path change needed
|
||||
|
||||
# Phase 2: ŚD rotation with dedup
|
||||
logger.info("DM retry: direct on ŚK exhausted, rotating through configured paths")
|
||||
rotation_order = ([primary_path] if primary_path else []) + other_paths
|
||||
retries_per_path = max(1, cfg['direct_max_retries'])
|
||||
direct_interval = float(cfg['direct_interval'])
|
||||
|
||||
for path_info in rotation_order:
|
||||
@@ -1442,18 +1481,16 @@ class DeviceManager:
|
||||
|
||||
try:
|
||||
await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size'])
|
||||
logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' "
|
||||
f"({path_info['path_hex']})")
|
||||
label = path_info.get('label', '')
|
||||
path_desc = f"{label} ({path_info['path_hex']})" if label else path_info['path_hex']
|
||||
logger.info(f"DM retry: switched to path '{label}' ({path_info['path_hex']})")
|
||||
except Exception as e:
|
||||
logger.warning(f"DM retry: failed to switch path: {e}")
|
||||
continue
|
||||
|
||||
for _ in range(retries_per_path):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, direct_interval
|
||||
):
|
||||
if await _retry(attempt, direct_interval):
|
||||
await self._restore_primary_path(contact, contact_pubkey)
|
||||
return
|
||||
|
||||
@@ -1464,19 +1501,19 @@ class DeviceManager:
|
||||
logger.info("DM retry: all paths exhausted, falling back to FLOOD")
|
||||
except Exception:
|
||||
pass
|
||||
path_desc = "FLOOD"
|
||||
for _ in range(cfg['flood_max_retries']):
|
||||
attempt += 1
|
||||
if await self._dm_retry_send_and_wait(
|
||||
contact, text, timestamp, attempt, dm_id,
|
||||
suggested_timeout, float(cfg['flood_interval'])
|
||||
):
|
||||
if await _retry(attempt, float(cfg['flood_interval'])):
|
||||
await self._restore_primary_path(contact, contact_pubkey)
|
||||
return
|
||||
|
||||
# Restore ŚG regardless of outcome
|
||||
await self._restore_primary_path(contact, contact_pubkey)
|
||||
|
||||
# ── Common epilogue: grace period for late ACKs ──
|
||||
# ── Common epilogue: mark failed, grace period for late ACKs ──
|
||||
self.db.update_dm_delivery_status(dm_id, 'failed')
|
||||
self._emit_retry_failed(dm_id, initial_ack)
|
||||
logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, scenario={scenario}) "
|
||||
f"for dm_id={dm_id}")
|
||||
self._retry_tasks.pop(dm_id, None)
|
||||
|
||||
@@ -1980,6 +1980,7 @@ def get_dm_messages():
|
||||
for row in db_msgs:
|
||||
messages.append({
|
||||
'type': 'dm',
|
||||
'id': row['id'],
|
||||
'direction': 'incoming' if row['direction'] == 'in' else 'outgoing',
|
||||
'sender': row.get('contact_pubkey', ''),
|
||||
'content': row.get('content', ''),
|
||||
@@ -1989,6 +1990,10 @@ def get_dm_messages():
|
||||
'snr': row.get('snr'),
|
||||
'path_len': row.get('path_len'),
|
||||
'expected_ack': row.get('expected_ack'),
|
||||
'delivery_status': row.get('delivery_status'),
|
||||
'delivery_attempt': row.get('delivery_attempt'),
|
||||
'delivery_max_attempts': row.get('delivery_max_attempts'),
|
||||
'delivery_path': row.get('delivery_path'),
|
||||
'conversation_id': conversation_id,
|
||||
})
|
||||
else:
|
||||
@@ -2040,6 +2045,12 @@ def get_dm_messages():
|
||||
except Exception as e:
|
||||
logger.debug(f"ACK status fetch failed (non-critical): {e}")
|
||||
|
||||
# Set failed status for messages without ACK but marked failed in DB
|
||||
for msg in messages:
|
||||
if msg.get('direction') == 'outgoing' and msg.get('status') != 'delivered':
|
||||
if msg.get('delivery_status') == 'failed':
|
||||
msg['status'] = 'failed'
|
||||
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'conversation_id': conversation_id,
|
||||
|
||||
@@ -738,6 +738,12 @@ main {
|
||||
position: relative;
|
||||
}
|
||||
|
||||
.dm-retry-info {
|
||||
font-size: 0.6rem;
|
||||
color: var(--text-meta);
|
||||
margin-left: 0.15rem;
|
||||
}
|
||||
|
||||
.dm-delivery-popup {
|
||||
position: absolute;
|
||||
bottom: 100%;
|
||||
|
||||
@@ -112,10 +112,44 @@ function connectChatSocket() {
|
||||
if (data.snr != null) tooltip.push(`SNR: ${data.snr}`);
|
||||
if (data.route_type) tooltip.push(`Route: ${data.route_type}`);
|
||||
statusEl.title = tooltip.length > 0 ? tooltip.join(', ') : 'Delivered';
|
||||
// Remove retry counter if present
|
||||
const wrapper = statusEl.closest('[data-dm-id]');
|
||||
if (wrapper) {
|
||||
const info = wrapper.querySelector('.dm-retry-info');
|
||||
if (info) info.remove();
|
||||
// Unwrap: replace wrapper span with just the icon
|
||||
wrapper.replaceWith(statusEl);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// Real-time DM retry progress
|
||||
chatSocket.on('dm_retry_status', (data) => {
|
||||
if (!data.dm_id) return;
|
||||
const wrapper = document.querySelector(`[data-dm-id="${data.dm_id}"]`);
|
||||
if (!wrapper) return;
|
||||
const info = wrapper.querySelector('.dm-retry-info');
|
||||
if (info) info.textContent = `${data.attempt}/${data.max_attempts}`;
|
||||
});
|
||||
|
||||
// DM retry exhausted — mark as failed
|
||||
chatSocket.on('dm_retry_failed', (data) => {
|
||||
if (!data.dm_id) return;
|
||||
const wrapper = document.querySelector(`[data-dm-id="${data.dm_id}"]`);
|
||||
if (!wrapper) return;
|
||||
const icon = wrapper.querySelector('.dm-status');
|
||||
if (icon) {
|
||||
icon.className = 'bi bi-x-circle dm-status timeout';
|
||||
icon.title = 'Delivery failed — all retries exhausted';
|
||||
}
|
||||
const info = wrapper.querySelector('.dm-retry-info');
|
||||
if (info) info.remove();
|
||||
// Remove onclick
|
||||
wrapper.removeAttribute('onclick');
|
||||
wrapper.classList.remove('dm-status-unknown');
|
||||
});
|
||||
|
||||
// Real-time device status
|
||||
chatSocket.on('device_status', (data) => {
|
||||
updateStatus(data.connected ? 'connected' : 'disconnected');
|
||||
@@ -1089,18 +1123,25 @@ function displayMessages(messages) {
|
||||
let statusIcon = '';
|
||||
if (msg.is_own) {
|
||||
const ackAttr = msg.expected_ack ? ` data-ack="${msg.expected_ack}"` : '';
|
||||
const dmIdAttr = msg.id ? ` data-dm-id="${msg.id}"` : '';
|
||||
if (msg.status === 'delivered') {
|
||||
let title = 'Delivered';
|
||||
if (msg.delivery_attempt && msg.delivery_max_attempts) {
|
||||
title += ` (${msg.delivery_attempt}/${msg.delivery_max_attempts})`;
|
||||
}
|
||||
if (msg.delivery_path) title += `, Path: ${msg.delivery_path}`;
|
||||
if (msg.delivery_snr !== null && msg.delivery_snr !== undefined) {
|
||||
title += `, SNR: ${msg.delivery_snr.toFixed(1)} dB`;
|
||||
}
|
||||
if (msg.delivery_route) title += ` (${msg.delivery_route})`;
|
||||
statusIcon = `<i class="bi bi-check2 dm-status delivered"${ackAttr} title="${title}"></i>`;
|
||||
} else if (msg.status === 'failed') {
|
||||
statusIcon = `<span${dmIdAttr}><i class="bi bi-x-circle dm-status timeout"${ackAttr} title="Delivery failed — all retries exhausted"></i></span>`;
|
||||
} else if (msg.status === 'pending') {
|
||||
statusIcon = `<i class="bi bi-clock dm-status pending"${ackAttr} title="Sending..."></i>`;
|
||||
} else {
|
||||
// No ACK received — show clickable "?" with explanation
|
||||
statusIcon = `<span class="dm-status-unknown" onclick="showDeliveryInfo(this)"><i class="bi bi-question-circle dm-status unknown"${ackAttr}></i></span>`;
|
||||
// No ACK received — show clickable "?" with retry counter
|
||||
statusIcon = `<span class="dm-status-unknown"${dmIdAttr} onclick="showDeliveryInfo(this)"><i class="bi bi-question-circle dm-status unknown"${ackAttr}></i><span class="dm-retry-info"></span></span>`;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user