refactor(dm): restructure retry logic into 4-scenario matrix

Replace 3-way branching (configured_paths/has_path/else) with
4-scenario matrix based on (has_path × has_configured_paths):

- S1: No path, no configured paths → FLOOD only
- S2: Has path, no configured paths → DIRECT + optional FLOOD
- S3: No path, has configured paths → FLOOD first, then ŚD rotation
- S4: Has path, has configured paths → DIRECT on ŚK, ŚD rotation, optional FLOOD

Key changes:
- S3: FLOOD before configured paths (discover new routes)
- S4: exhaust retries on current ŚK before rotating ŚD
- S4: dedup ŚG/ŚK to skip redundant retries on same path
- Add _paths_match() helper for path deduplication
- Update tooltip text for settings clarity

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-28 11:55:17 +01:00
parent 9be7ae6cc4
commit d2e019fa0e
2 changed files with 162 additions and 81 deletions
+160 -79
View File
@@ -1233,22 +1233,39 @@ class DeviceManager:
return False
@staticmethod
def _paths_match(contact_out_path: str, contact_out_path_len: int,
configured_path: dict) -> bool:
"""Check if device's current path matches a configured path."""
if contact_out_path_len <= 0:
return False
cfg_hash_size = configured_path['hash_size']
device_hash_size = (contact_out_path_len >> 6) + 1
if device_hash_size != cfg_hash_size:
return False
hop_count = contact_out_path_len & 0x3F
meaningful_len = hop_count * device_hash_size * 2
return (contact_out_path.lower()[:meaningful_len] ==
configured_path['path_hex'].lower()[:meaningful_len])
async def _dm_retry_task(self, dm_id: int, contact, text: str,
timestamp: int, initial_ack: str,
suggested_timeout: int):
"""Background retry with same timestamp for dedup on receiver.
Strategy (in priority order):
1. PATH ROTATION: If user-configured paths exist, rotate through them.
2. DIRECT+FLOOD: If contact has device path, try direct then optionally flood.
3. FLOOD only: If no path known, flood retries.
4-scenario matrix based on (has_path × has_configured_paths):
- Scenario 1: No path, no configured paths → FLOOD only
- Scenario 2: Has path, no configured paths → DIRECT + optional FLOOD
- Scenario 3: No path, has configured paths → FLOOD first, then ŚD rotation
- Scenario 4: Has path, has configured paths → DIRECT on ŚK, ŚD rotation, optional FLOOD
The no_auto_flood per-contact flag prevents automatic DIRECT→FLOOD reset.
The no_auto_flood per-contact flag prevents automatic DIRECT→FLOOD reset
in Scenarios 2 and 4. Ignored in Scenarios 1 and 3.
Settings loaded from app_settings DB table (key: dm_retry_settings).
"""
from meshcore.events import EventType
# Load configurable retry settings from DB
# ── Load configurable retry settings from DB ──
_defaults = {
'direct_max_retries': 3, 'direct_flood_retries': 1,
'flood_max_retries': 3, 'direct_interval': 30,
@@ -1260,19 +1277,33 @@ class DeviceManager:
contact_pubkey = contact.get('public_key', '').lower()
has_path = contact.get('out_path_len', -1) > 0
# Capture original device path for dedup (contact dict may mutate)
original_out_path = contact.get('out_path', '').lower()
original_out_path_len = contact.get('out_path_len', -1)
# Load user-configured paths and no_auto_flood flag
configured_paths = self.db.get_contact_paths(contact_pubkey) if contact_pubkey else []
no_auto_flood = self.db.get_contact_no_auto_flood(contact_pubkey) if contact_pubkey else False
has_configured_paths = bool(configured_paths)
min_wait = float(cfg['direct_interval']) if has_path else float(cfg['flood_interval'])
wait_s = max(suggested_timeout / 1000 * 1.2, min_wait)
mode = "PATH_ROTATION" if configured_paths else ("DIRECT" if has_path else "FLOOD")
logger.info(f"DM retry task started: dm_id={dm_id}, mode={mode}, "
# Determine scenario for logging
if has_path and has_configured_paths:
scenario = "S4_DIRECT_SD_FLOOD"
elif has_path:
scenario = "S2_DIRECT_FLOOD"
elif has_configured_paths:
scenario = "S3_FLOOD_SD"
else:
scenario = "S1_FLOOD"
logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, "
f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
f"wait={wait_s:.0f}s")
# Wait for ACK on initial send
# ── Wait for ACK on initial send ──
if initial_ack:
logger.debug(f"DM retry: waiting {wait_s:.0f}s for initial ACK {initial_ack[:8]}...")
ack_event = await self.mc.dispatcher.wait_for_event(
@@ -1287,40 +1318,80 @@ class DeviceManager:
attempt = 0 # Global attempt counter (0 = initial send already done)
# ── Strategy 1: PATH ROTATION ──
if configured_paths:
retries_per_path = max(1, cfg['direct_max_retries'])
min_wait = float(cfg['direct_interval'])
# Separate primary (starred) path from the rest
primary_path = None
other_paths = []
for p in configured_paths:
if p.get('is_primary') and primary_path is None:
primary_path = p
def _split_primary_and_others(paths):
"""Separate primary (starred) path from the rest."""
primary = None
others = []
for p in paths:
if p.get('is_primary') and primary is None:
primary = p
else:
other_paths.append(p)
others.append(p)
return primary, others
# Phase 1: Exhaust retries on primary path first
# Initial send already used device path (assumed primary), so -1
if primary_path:
# ════════════════════════════════════════════════════════════
# Scenario 1: No path, no configured paths → FLOOD only
# ════════════════════════════════════════════════════════════
if not has_path and not has_configured_paths:
for _ in range(cfg['flood_max_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, float(cfg['flood_interval'])
):
return
# ════════════════════════════════════════════════════════════
# Scenario 2: Has path, no configured paths → DIRECT + optional FLOOD
# ════════════════════════════════════════════════════════════
elif has_path and not has_configured_paths:
# Phase 1: Direct retries on current ŚK
for _ in range(cfg['direct_max_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, float(cfg['direct_interval'])
):
return
# Phase 2: Optional FLOOD fallback (controlled by no_auto_flood)
if not no_auto_flood:
try:
await self._change_path_async(contact, primary_path['path_hex'], primary_path['hash_size'])
logger.info(f"DM retry: retrying on primary path '{primary_path.get('label', '')}' "
f"({primary_path['path_hex']})")
except Exception as e:
logger.warning(f"DM retry: failed to set primary path: {e}")
for _ in range(retries_per_path - 1):
await self.mc.commands.reset_path(contact)
logger.info("DM retry: direct exhausted, resetting to FLOOD")
except Exception:
pass
for _ in range(cfg['direct_flood_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, min_wait
suggested_timeout, float(cfg['flood_interval'])
):
return # Delivered on primary, no restore needed
return
# Phase 2: Rotate through remaining (non-primary) paths
for path_info in other_paths:
# ════════════════════════════════════════════════════════════
# Scenario 3: No path, has configured paths → FLOOD first, then ŚD rotation
# ════════════════════════════════════════════════════════════
elif not has_path and has_configured_paths:
primary_path, other_paths = _split_primary_and_others(configured_paths)
# Phase 1: FLOOD retries per NoPath settings (discover new path)
logger.info("DM retry: FLOOD first to discover new path")
for _ in range(cfg['flood_max_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, float(cfg['flood_interval'])
):
return # Firmware sets discovered path as ŚK
# Phase 2: ŚD rotation (primary first, then others by sort_order)
logger.info("DM retry: FLOOD exhausted, rotating through configured paths")
rotation_order = ([primary_path] if primary_path else []) + other_paths
retries_per_path = max(1, cfg['direct_max_retries'])
direct_interval = float(cfg['direct_interval'])
for path_info in rotation_order:
try:
await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size'])
logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' "
@@ -1333,14 +1404,61 @@ class DeviceManager:
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, min_wait
suggested_timeout, direct_interval
):
await self._restore_primary_path(contact, contact_pubkey)
return
# Phase 3: Optional FLOOD fallback
# Restore ŚG regardless of outcome
await self._restore_primary_path(contact, contact_pubkey)
# ════════════════════════════════════════════════════════════
# Scenario 4: Has path + has configured paths → DIRECT on ŚK, ŚD rotation, optional FLOOD
# ════════════════════════════════════════════════════════════
else: # has_path and has_configured_paths
primary_path, other_paths = _split_primary_and_others(configured_paths)
# Phase 1: Direct retries on current ŚK
for _ in range(cfg['direct_max_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, float(cfg['direct_interval'])
):
return # Delivered on ŚK, no path change needed
# Phase 2: ŚD rotation with dedup
logger.info("DM retry: direct on ŚK exhausted, rotating through configured paths")
rotation_order = ([primary_path] if primary_path else []) + other_paths
retries_per_path = max(1, cfg['direct_max_retries'])
direct_interval = float(cfg['direct_interval'])
for path_info in rotation_order:
# Dedup: skip if this configured path matches original ŚK
if self._paths_match(original_out_path, original_out_path_len, path_info):
logger.debug(f"DM retry: skipping path '{path_info.get('label', '')}' "
f"({path_info['path_hex']}) — matches current ŚK")
continue
try:
await self._change_path_async(contact, path_info['path_hex'], path_info['hash_size'])
logger.info(f"DM retry: switched to path '{path_info.get('label', '')}' "
f"({path_info['path_hex']})")
except Exception as e:
logger.warning(f"DM retry: failed to switch path: {e}")
continue
for _ in range(retries_per_path):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, direct_interval
):
await self._restore_primary_path(contact, contact_pubkey)
return
# Phase 3: Optional FLOOD fallback (controlled by no_auto_flood)
if not no_auto_flood:
min_wait = float(cfg['flood_interval'])
try:
await self.mc.commands.reset_path(contact)
logger.info("DM retry: all paths exhausted, falling back to FLOOD")
@@ -1350,54 +1468,17 @@ class DeviceManager:
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, min_wait
suggested_timeout, float(cfg['flood_interval'])
):
await self._restore_primary_path(contact, contact_pubkey)
return
# Restore primary path regardless of outcome
# Restore ŚG regardless of outcome
await self._restore_primary_path(contact, contact_pubkey)
# ── Strategy 2: DIRECT + optional FLOOD (no configured paths) ──
elif has_path:
# Direct retries
for _ in range(cfg['direct_max_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, float(cfg['direct_interval'])
):
return
# Switch to flood (unless no_auto_flood)
if not no_auto_flood:
min_wait = float(cfg['flood_interval'])
try:
await self.mc.commands.reset_path(contact)
logger.info("DM retry: direct exhausted, resetting to flood")
except Exception:
pass
for _ in range(cfg['direct_flood_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, min_wait
):
return
# ── Strategy 3: FLOOD only ──
else:
for _ in range(cfg['flood_max_retries']):
attempt += 1
if await self._dm_retry_send_and_wait(
contact, text, timestamp, attempt, dm_id,
suggested_timeout, float(cfg['flood_interval'])
):
return
logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, mode={mode}) "
# ── Common epilogue: grace period for late ACKs ──
logger.warning(f"DM retry exhausted ({attempt + 1} total attempts, scenario={scenario}) "
f"for dm_id={dm_id}")
# Keep pending acks for grace period so late ACKs can still be matched
self._retry_tasks.pop(dm_id, None)
await asyncio.sleep(cfg['grace_period'])
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
+2 -2
View File
@@ -388,7 +388,7 @@
<td class="pe-0" style="width:5rem"><input type="number" class="form-control form-control-sm" id="settDirectMaxRetries" min="0" max="20" value="3"></td>
</tr>
<tr>
<td class="ps-0">Flood retries <span class="badge rounded-pill text-muted" data-bs-toggle="tooltip" title="Flood attempts after direct retries exhausted"><i class="bi bi-info-circle"></i></span></td>
<td class="ps-0">Flood retries <span class="badge rounded-pill text-muted" data-bs-toggle="tooltip" title="Flood attempts after direct retries exhausted (when no configured paths)"><i class="bi bi-info-circle"></i></span></td>
<td class="pe-0"><input type="number" class="form-control form-control-sm" id="settDirectFloodRetries" min="0" max="5" value="1"></td>
</tr>
<tr>
@@ -402,7 +402,7 @@
<table class="table table-sm table-borderless mb-3 align-middle">
<tbody>
<tr>
<td class="ps-0">Max retries <span class="badge rounded-pill text-muted" data-bs-toggle="tooltip" title="Flood attempts when no path is known"><i class="bi bi-info-circle"></i></span></td>
<td class="ps-0">Max retries <span class="badge rounded-pill text-muted" data-bs-toggle="tooltip" title="Flood retry attempts (also used after path rotation)"><i class="bi bi-info-circle"></i></span></td>
<td class="pe-0" style="width:5rem"><input type="number" class="form-control form-control-sm" id="settFloodMaxRetries" min="0" max="10" value="3"></td>
</tr>
<tr>