feat(dm): Auto-retry for undelivered DM messages

Implement automatic retry for DM messages when ACK is not received,
similar to the MeshCore mobile app's Auto Retry feature. The bridge
monitors for ACK after each send and retries up to 3 times, switching
to flood routing after 2 failed direct attempts via reset_path.

- Bridge: background retry engine with configurable max_attempts,
  flood_after; retry group tracking to prevent duplicate messages
- Bridge: enhanced ACK status checks retry groups so delivery is
  detected even if only a retry attempt's ACK arrives
- Backend: filter retry SENT_MSG duplicates from message list
- Frontend: extended ACK polling window, auto-retry toggle in DM bar

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-02-24 21:23:32 +01:00
parent fd4818cfad
commit c37a7d3b23
5 changed files with 396 additions and 7 deletions
+58
View File
@@ -427,6 +427,64 @@ def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
return False, {}, str(e)
# =============================================================================
# Auto-retry helpers
# =============================================================================
def get_retry_ack_codes() -> set:
"""Get set of expected_ack codes belonging to retry attempts (not first send)."""
try:
response = requests.get(
f"{config.MC_BRIDGE_URL.replace('/cli', '/retry_ack_codes')}",
timeout=DEFAULT_TIMEOUT
)
if response.status_code == 200:
data = response.json()
return set(data.get('retry_ack_codes', []))
except Exception as e:
logger.debug(f"Failed to fetch retry_ack_codes: {e}")
return set()
def get_auto_retry_config() -> Tuple[bool, Dict]:
"""Get current auto-retry configuration from bridge."""
try:
response = requests.get(
f"{config.MC_BRIDGE_URL.replace('/cli', '/auto_retry/config')}",
timeout=DEFAULT_TIMEOUT
)
if response.status_code == 200:
data = response.json()
return True, data
except Exception as e:
logger.debug(f"Failed to fetch auto_retry config: {e}")
return False, {}
def set_auto_retry_config(enabled=None, max_attempts=None, flood_after=None) -> Tuple[bool, Dict]:
"""Update auto-retry configuration on bridge."""
payload = {}
if enabled is not None:
payload['enabled'] = enabled
if max_attempts is not None:
payload['max_attempts'] = max_attempts
if flood_after is not None:
payload['flood_after'] = flood_after
try:
response = requests.post(
f"{config.MC_BRIDGE_URL.replace('/cli', '/auto_retry/config')}",
json=payload,
timeout=DEFAULT_TIMEOUT
)
if response.status_code == 200:
data = response.json()
return True, data
except Exception as e:
logger.debug(f"Failed to set auto_retry config: {e}")
return False, {}
# =============================================================================
# Contact Management (Existing & Pending Contacts)
# =============================================================================
+42
View File
@@ -1670,6 +1670,16 @@ def get_dm_messages():
days=days
)
# Filter out retry duplicate messages (keep only the first send)
try:
retry_codes = cli.get_retry_ack_codes()
if retry_codes:
messages = [msg for msg in messages
if not (msg.get('direction') == 'outgoing'
and msg.get('expected_ack') in retry_codes)]
except Exception as e:
logger.debug(f"Retry dedup failed (non-critical): {e}")
# Determine display name from conversation_id or messages
display_name = 'Unknown'
if conversation_id.startswith('pk_'):
@@ -1793,6 +1803,38 @@ def send_dm_message():
}), 500
@api_bp.route('/dm/auto_retry', methods=['GET'])
def get_auto_retry_config():
"""Get auto-retry configuration."""
try:
success, data = cli.get_auto_retry_config()
if success:
return jsonify(data), 200
return jsonify({'success': False, 'error': 'Failed to get config'}), 500
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@api_bp.route('/dm/auto_retry', methods=['POST'])
def set_auto_retry_config():
"""Update auto-retry configuration."""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'Missing JSON body'}), 400
success, result = cli.set_auto_retry_config(
enabled=data.get('enabled'),
max_attempts=data.get('max_attempts'),
flood_after=data.get('flood_after')
)
if success:
return jsonify(result), 200
return jsonify({'success': False, 'error': 'Failed to update config'}), 500
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@api_bp.route('/dm/updates', methods=['GET'])
def get_dm_updates():
"""
+49 -2
View File
@@ -58,6 +58,9 @@ document.addEventListener('DOMContentLoaded', async function() {
// Initialize FAB toggle
initializeDmFabToggle();
// Load auto-retry config
loadAutoRetryConfig();
// Setup auto-refresh
setupAutoRefresh();
});
@@ -518,8 +521,8 @@ async function sendMessage() {
showNotification('Message sent', 'success');
// Reload messages to show sent message + ACK delivery status
// Stop early once the last own message gets a delivery checkmark
const ackRefreshDelays = [1000, 6000, 15000];
// Extended delays to cover auto-retry window (retries happen in background)
const ackRefreshDelays = [2000, 8000, 20000, 40000, 60000];
let ackRefreshIdx = 0;
const scheduleAckRefresh = () => {
if (ackRefreshIdx >= ackRefreshDelays.length) return;
@@ -1169,3 +1172,47 @@ function clearDmFilterState() {
}, 50);
}
}
// =============================================================================
// Auto-retry configuration
// =============================================================================
/**
* Load auto-retry config from bridge and sync toggle state
*/
async function loadAutoRetryConfig() {
const toggle = document.getElementById('dmAutoRetryToggle');
if (!toggle) return;
try {
const response = await fetch('/api/dm/auto_retry');
const data = await response.json();
if (data.success) {
toggle.checked = data.enabled;
}
} catch (e) {
console.debug('Failed to load auto-retry config:', e);
}
// Setup change handler
toggle.addEventListener('change', async function() {
try {
const response = await fetch('/api/dm/auto_retry', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ enabled: this.checked })
});
const data = await response.json();
if (data.success) {
showNotification(
data.enabled ? 'Auto Retry enabled' : 'Auto Retry disabled',
'info'
);
}
} catch (e) {
console.error('Failed to update auto-retry config:', e);
// Revert toggle on error
this.checked = !this.checked;
}
});
}
+10 -4
View File
@@ -66,10 +66,16 @@
<!-- Conversation Selector Bar -->
<div class="row border-bottom bg-light">
<div class="col-12 p-2">
<select id="dmConversationSelector" class="form-select" title="Select conversation">
<option value="">Select chat...</option>
<!-- Conversations loaded dynamically via JavaScript -->
</select>
<div class="d-flex align-items-center gap-2">
<select id="dmConversationSelector" class="form-select" title="Select conversation">
<option value="">Select chat...</option>
<!-- Conversations loaded dynamically via JavaScript -->
</select>
<div class="form-check form-switch flex-shrink-0" title="Auto Retry: resend DM if no ACK received">
<input class="form-check-input" type="checkbox" id="dmAutoRetryToggle" checked>
<label class="form-check-label small text-nowrap" for="dmAutoRetryToggle">Retry</label>
</div>
</div>
</div>
</div>
<!-- Messages Container -->
+237 -1
View File
@@ -157,6 +157,15 @@ class MeshCLISession:
self.acks = {} # ack_code -> {snr, rssi, route, path, ts}
self.acks_file = self.config_dir / f"{device_name}.acks.jsonl"
# Auto-retry for DM messages
self.auto_retry_enabled = True
self.auto_retry_max_attempts = 3 # total attempts including first send
self.auto_retry_flood_after = 2 # switch to flood after N failed direct attempts
self.retry_ack_codes = set() # expected_ack codes of retry attempts (not first)
self.retry_groups = {} # original_ack -> [retry_ack_1, retry_ack_2, ...]
self.retry_lock = threading.Lock()
self.active_retries = {} # original_ack -> threading.Event (cancel signal)
# Load persisted data from disk
self._load_echoes()
self._load_acks()
@@ -805,6 +814,148 @@ class MeshCLISession:
except Exception as e:
logger.error(f"Failed to load ACKs: {e}")
# =========================================================================
# Auto-retry for DM messages
# =========================================================================
def _start_retry(self, recipient, text, original_ack, suggested_timeout):
"""Start background retry thread for a DM message."""
cancel_event = threading.Event()
with self.retry_lock:
self.active_retries[original_ack] = cancel_event
self.retry_groups[original_ack] = []
thread = threading.Thread(
target=self._retry_send,
args=(recipient, text, original_ack, suggested_timeout, cancel_event),
daemon=True,
name=f"retry-{original_ack[:8]}"
)
thread.start()
logger.info(f"Auto-retry started for ack={original_ack}, "
f"max_attempts={self.auto_retry_max_attempts}, "
f"timeout={suggested_timeout}ms")
def _retry_send(self, recipient, text, original_ack, suggested_timeout, cancel_event):
"""Background retry loop for a DM message.
Waits for ACK after initial send, retries up to max_attempts total.
After flood_after failed direct attempts, resets path to force flood routing.
"""
# Wait timeout in seconds (use suggested_timeout from device, with 1.2x margin)
wait_timeout = max(suggested_timeout / 1000 * 1.2, 5.0)
# Attempt 1 was already sent by the caller. Start from attempt 2.
attempt = 1 # 0-indexed, so attempt=1 is the second try
flood_sent = False
while attempt < self.auto_retry_max_attempts:
if cancel_event.is_set():
logger.info(f"Retry cancelled for ack={original_ack}")
break
# Wait for ACK of the most recent expected_ack
with self.retry_lock:
all_acks = [original_ack] + self.retry_groups.get(original_ack, [])
latest_ack = all_acks[-1]
# Poll for ACK (check all ack codes in the group — any ACK means delivered)
ack_received = self._wait_for_any_ack(all_acks, wait_timeout, cancel_event)
if ack_received:
logger.info(f"ACK received for ack={latest_ack} (attempt {attempt}), "
f"retry done for original={original_ack}")
break
if cancel_event.is_set():
break
# No ACK received - retry
attempt += 1
# Reset path to flood before the configured attempt
if attempt >= self.auto_retry_flood_after and not flood_sent:
logger.info(f"Retry: resetting path for {recipient} (switching to flood)")
try:
self.execute_command(['reset_path', recipient], timeout=5)
flood_sent = True
except Exception as e:
logger.error(f"Retry: reset_path failed: {e}")
logger.info(f"Retry attempt {attempt + 1}/{self.auto_retry_max_attempts} "
f"for '{text[:20]}...'{recipient}")
try:
result = self.execute_command(['msg', recipient, text], timeout=DEFAULT_TIMEOUT)
if result.get('success'):
# Parse the response to get new expected_ack
new_ack = self._extract_ack_from_response(result.get('stdout', ''))
new_timeout = self._extract_timeout_from_response(result.get('stdout', ''))
if new_ack:
with self.retry_lock:
self.retry_ack_codes.add(new_ack)
self.retry_groups.setdefault(original_ack, []).append(new_ack)
if new_timeout:
wait_timeout = max(new_timeout / 1000 * 1.2, 5.0)
logger.info(f"Retry sent, new ack={new_ack}")
else:
logger.warning(f"Retry: could not parse expected_ack from response")
break
else:
logger.error(f"Retry: msg command failed: {result.get('stderr', '')}")
break
except Exception as e:
logger.error(f"Retry: send failed: {e}")
break
# Cleanup active retry tracking
with self.retry_lock:
self.active_retries.pop(original_ack, None)
if attempt >= self.auto_retry_max_attempts:
logger.warning(f"Auto-retry exhausted ({self.auto_retry_max_attempts} attempts) "
f"for '{text[:20]}...'{recipient}")
def _wait_for_any_ack(self, ack_codes, timeout_seconds, cancel_event):
"""Poll self.acks dict for any of the given ack_codes with timeout."""
start = time.time()
poll_interval = 0.5 # Check every 500ms
while time.time() - start < timeout_seconds:
for code in ack_codes:
if code in self.acks:
return True
if cancel_event.is_set():
return False
cancel_event.wait(poll_interval)
return any(code in self.acks for code in ack_codes)
def _extract_ack_from_response(self, stdout):
"""Extract expected_ack from msg command JSON response."""
try:
for line in stdout.strip().split('\n'):
line = line.strip()
if not line:
continue
data = json.loads(line)
if isinstance(data, dict) and 'expected_ack' in data:
return data['expected_ack']
except (json.JSONDecodeError, ValueError):
pass
return None
def _extract_timeout_from_response(self, stdout):
"""Extract suggested_timeout from msg command JSON response."""
try:
for line in stdout.strip().split('\n'):
line = line.strip()
if not line:
continue
data = json.loads(line)
if isinstance(data, dict) and 'suggested_timeout' in data:
return data['suggested_timeout']
except (json.JSONDecodeError, ValueError):
pass
return None
def _log_advert(self, json_line):
"""Log advert JSON to .jsonl file with timestamp"""
try:
@@ -1086,6 +1237,20 @@ def execute_cli():
# Execute via persistent session
result = meshcli_session.execute_command(args, timeout)
# Auto-retry: after successful msg command, start background retry
if (result.get('success') and args and args[0] in ('msg', 'm')
and len(args) >= 3 and meshcli_session.auto_retry_enabled
and meshcli_session.auto_retry_max_attempts > 1):
stdout = result.get('stdout', '')
expected_ack = meshcli_session._extract_ack_from_response(stdout)
suggested_timeout = meshcli_session._extract_timeout_from_response(stdout)
if expected_ack and suggested_timeout:
recipient = args[1]
text = args[2]
meshcli_session._start_retry(
recipient, text, expected_ack, suggested_timeout
)
return jsonify(result), 200
except Exception as e:
@@ -1468,11 +1633,82 @@ def get_ack_status():
result = {}
for code in codes:
result[code] = meshcli_session.acks.get(code)
ack_info = meshcli_session.acks.get(code)
# If no ACK for this code, check retry group (maybe a retry attempt got the ACK)
if ack_info is None:
with meshcli_session.retry_lock:
retry_codes = meshcli_session.retry_groups.get(code, [])
for retry_code in retry_codes:
ack_info = meshcli_session.acks.get(retry_code)
if ack_info is not None:
break
result[code] = ack_info
return jsonify({'success': True, 'acks': result}), 200
# =============================================================================
# Auto-retry endpoints
# =============================================================================
@app.route('/retry_ack_codes', methods=['GET'])
def get_retry_ack_codes():
"""Return set of expected_ack codes that belong to retry attempts (not first send)."""
if not meshcli_session:
return jsonify({'success': False, 'error': 'Not initialized'}), 503
with meshcli_session.retry_lock:
codes = list(meshcli_session.retry_ack_codes)
return jsonify({'success': True, 'retry_ack_codes': codes}), 200
@app.route('/auto_retry/config', methods=['GET'])
def get_auto_retry_config():
"""Get current auto-retry configuration."""
if not meshcli_session:
return jsonify({'success': False, 'error': 'Not initialized'}), 503
return jsonify({
'success': True,
'enabled': meshcli_session.auto_retry_enabled,
'max_attempts': meshcli_session.auto_retry_max_attempts,
'flood_after': meshcli_session.auto_retry_flood_after,
'active_retries': len(meshcli_session.active_retries),
}), 200
@app.route('/auto_retry/config', methods=['POST'])
def set_auto_retry_config():
"""Update auto-retry configuration."""
if not meshcli_session:
return jsonify({'success': False, 'error': 'Not initialized'}), 503
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'Missing JSON body'}), 400
if 'enabled' in data:
meshcli_session.auto_retry_enabled = bool(data['enabled'])
if 'max_attempts' in data:
val = int(data['max_attempts'])
meshcli_session.auto_retry_max_attempts = max(1, min(val, 10))
if 'flood_after' in data:
val = int(data['flood_after'])
meshcli_session.auto_retry_flood_after = max(1, min(val, 10))
logger.info(f"Auto-retry config updated: enabled={meshcli_session.auto_retry_enabled}, "
f"max_attempts={meshcli_session.auto_retry_max_attempts}, "
f"flood_after={meshcli_session.auto_retry_flood_after}")
return jsonify({
'success': True,
'enabled': meshcli_session.auto_retry_enabled,
'max_attempts': meshcli_session.auto_retry_max_attempts,
'flood_after': meshcli_session.auto_retry_flood_after,
}), 200
# =============================================================================
# WebSocket handlers for console
# =============================================================================