fix(dm): Fix auto-retry not triggering and increase retry limits

- Fix JSON parsing: msg command outputs multi-line pretty-printed JSON
  (indent=4), but parser tried line-by-line. Now tries full-text parse
  first, then line-by-line, then regex fallback.
- Change retry limits: 5 direct + 3 flood attempts (was 3 total)
- Separate max_attempts (direct) and max_flood parameters
- Add debug logging when ack extraction fails

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-02-24 21:39:58 +01:00
parent c37a7d3b23
commit c0f93029cd
3 changed files with 94 additions and 58 deletions

View File

@@ -461,15 +461,15 @@ def get_auto_retry_config() -> Tuple[bool, Dict]:
return False, {}
def set_auto_retry_config(enabled=None, max_attempts=None, flood_after=None) -> Tuple[bool, Dict]:
def set_auto_retry_config(enabled=None, max_attempts=None, max_flood=None) -> Tuple[bool, Dict]:
"""Update auto-retry configuration on bridge."""
payload = {}
if enabled is not None:
payload['enabled'] = enabled
if max_attempts is not None:
payload['max_attempts'] = max_attempts
if flood_after is not None:
payload['flood_after'] = flood_after
if max_flood is not None:
payload['max_flood'] = max_flood
try:
response = requests.post(

View File

@@ -1826,7 +1826,7 @@ def set_auto_retry_config():
success, result = cli.set_auto_retry_config(
enabled=data.get('enabled'),
max_attempts=data.get('max_attempts'),
flood_after=data.get('flood_after')
max_flood=data.get('max_flood')
)
if success:
return jsonify(result), 200

View File

@@ -159,8 +159,8 @@ class MeshCLISession:
# Auto-retry for DM messages
self.auto_retry_enabled = True
self.auto_retry_max_attempts = 3 # total attempts including first send
self.auto_retry_flood_after = 2 # switch to flood after N failed direct attempts
self.auto_retry_max_attempts = 5 # max direct attempts (including first send)
self.auto_retry_max_flood = 3 # max flood attempts (after reset_path)
self.retry_ack_codes = set() # expected_ack codes of retry attempts (not first)
self.retry_groups = {} # original_ack -> [retry_ack_1, retry_ack_2, ...]
self.retry_lock = threading.Lock()
@@ -833,61 +833,70 @@ class MeshCLISession:
)
thread.start()
logger.info(f"Auto-retry started for ack={original_ack}, "
f"max_attempts={self.auto_retry_max_attempts}, "
f"direct={self.auto_retry_max_attempts}, "
f"flood={self.auto_retry_max_flood}, "
f"timeout={suggested_timeout}ms")
def _retry_send(self, recipient, text, original_ack, suggested_timeout, cancel_event):
"""Background retry loop for a DM message.
Waits for ACK after initial send, retries up to max_attempts total.
After flood_after failed direct attempts, resets path to force flood routing.
Phase 1: Direct attempts (up to auto_retry_max_attempts)
Phase 2: Flood attempts (up to auto_retry_max_flood) after reset_path
"""
# Wait timeout in seconds (use suggested_timeout from device, with 1.2x margin)
wait_timeout = max(suggested_timeout / 1000 * 1.2, 5.0)
# Attempt 1 was already sent by the caller. Start from attempt 2.
attempt = 1 # 0-indexed, so attempt=1 is the second try
flood_sent = False
max_direct = self.auto_retry_max_attempts # includes the first send
max_flood = self.auto_retry_max_flood
total_max = max_direct + max_flood
while attempt < self.auto_retry_max_attempts:
# Attempt 1 was already sent by the caller. Start from attempt index 1.
attempt = 1 # 0-indexed; attempt 0 was the initial send
flood_mode = False
flood_attempts = 0
while attempt < total_max:
if cancel_event.is_set():
logger.info(f"Retry cancelled for ack={original_ack}")
break
# Wait for ACK of the most recent expected_ack
# Collect all ack codes in the group to check
with self.retry_lock:
all_acks = [original_ack] + self.retry_groups.get(original_ack, [])
latest_ack = all_acks[-1]
# Poll for ACK (check all ack codes in the group — any ACK means delivered)
# Poll for ACK (check all ack codes — any ACK means delivered)
ack_received = self._wait_for_any_ack(all_acks, wait_timeout, cancel_event)
if ack_received:
logger.info(f"ACK received for ack={latest_ack} (attempt {attempt}), "
mode = "flood" if flood_mode else "direct"
logger.info(f"ACK received (attempt {attempt}, {mode}), "
f"retry done for original={original_ack}")
break
if cancel_event.is_set():
break
# No ACK received - retry
attempt += 1
# Reset path to flood before the configured attempt
if attempt >= self.auto_retry_flood_after and not flood_sent:
logger.info(f"Retry: resetting path for {recipient} (switching to flood)")
# Check limits for current mode
if not flood_mode and attempt >= max_direct:
# Switch to flood mode
logger.info(f"Retry: resetting path for {recipient} (switching to flood, "
f"{max_direct} direct attempts exhausted)")
try:
self.execute_command(['reset_path', recipient], timeout=5)
flood_sent = True
except Exception as e:
logger.error(f"Retry: reset_path failed: {e}")
flood_mode = True
logger.info(f"Retry attempt {attempt + 1}/{self.auto_retry_max_attempts} "
f"for '{text[:20]}...'{recipient}")
if flood_mode and flood_attempts >= max_flood:
break
# Send retry
mode_str = "flood" if flood_mode else "direct"
logger.info(f"Retry {mode_str} attempt {attempt + 1}/{total_max} "
f"for '{text[:30]}'{recipient}")
try:
result = self.execute_command(['msg', recipient, text], timeout=DEFAULT_TIMEOUT)
if result.get('success'):
# Parse the response to get new expected_ack
new_ack = self._extract_ack_from_response(result.get('stdout', ''))
new_timeout = self._extract_timeout_from_response(result.get('stdout', ''))
if new_ack:
@@ -898,7 +907,8 @@ class MeshCLISession:
wait_timeout = max(new_timeout / 1000 * 1.2, 5.0)
logger.info(f"Retry sent, new ack={new_ack}")
else:
logger.warning(f"Retry: could not parse expected_ack from response")
logger.warning(f"Retry: could not parse expected_ack from response: "
f"{result.get('stdout', '')[:200]}")
break
else:
logger.error(f"Retry: msg command failed: {result.get('stderr', '')}")
@@ -907,13 +917,17 @@ class MeshCLISession:
logger.error(f"Retry: send failed: {e}")
break
attempt += 1
if flood_mode:
flood_attempts += 1
# Cleanup active retry tracking
with self.retry_lock:
self.active_retries.pop(original_ack, None)
if attempt >= self.auto_retry_max_attempts:
logger.warning(f"Auto-retry exhausted ({self.auto_retry_max_attempts} attempts) "
f"for '{text[:20]}...'{recipient}")
if attempt >= total_max:
logger.warning(f"Auto-retry exhausted ({max_direct} direct + {max_flood} flood) "
f"for '{text[:30]}'{recipient}")
def _wait_for_any_ack(self, ack_codes, timeout_seconds, cancel_event):
"""Poll self.acks dict for any of the given ack_codes with timeout."""
@@ -928,33 +942,55 @@ class MeshCLISession:
cancel_event.wait(poll_interval)
return any(code in self.acks for code in ack_codes)
def _extract_ack_from_response(self, stdout):
"""Extract expected_ack from msg command JSON response."""
def _parse_msg_response(self, stdout):
"""Parse msg command response (may be multi-line pretty-printed JSON).
Returns dict with expected_ack, suggested_timeout, etc. or None.
"""
if not stdout or not stdout.strip():
return None
# Try parsing the entire stdout as one JSON object (handles indent=4)
try:
for line in stdout.strip().split('\n'):
line = line.strip()
if not line:
continue
data = json.loads(stdout.strip())
if isinstance(data, dict) and 'expected_ack' in data:
return data
except (json.JSONDecodeError, ValueError):
pass
# Fallback: try each line individually (single-line JSON)
for line in stdout.strip().split('\n'):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
if isinstance(data, dict) and 'expected_ack' in data:
return data['expected_ack']
except (json.JSONDecodeError, ValueError):
pass
return data
except (json.JSONDecodeError, ValueError):
continue
# Last resort: regex extraction
import re
ack_match = re.search(r'"expected_ack"\s*:\s*"([0-9a-fA-F]+)"', stdout)
timeout_match = re.search(r'"suggested_timeout"\s*:\s*(\d+)', stdout)
if ack_match:
result = {'expected_ack': ack_match.group(1)}
if timeout_match:
result['suggested_timeout'] = int(timeout_match.group(1))
return result
return None
def _extract_ack_from_response(self, stdout):
"""Extract expected_ack from msg command response."""
data = self._parse_msg_response(stdout)
return data.get('expected_ack') if data else None
def _extract_timeout_from_response(self, stdout):
"""Extract suggested_timeout from msg command JSON response."""
try:
for line in stdout.strip().split('\n'):
line = line.strip()
if not line:
continue
data = json.loads(line)
if isinstance(data, dict) and 'suggested_timeout' in data:
return data['suggested_timeout']
except (json.JSONDecodeError, ValueError):
pass
return None
"""Extract suggested_timeout from msg command response."""
data = self._parse_msg_response(stdout)
return data.get('suggested_timeout') if data else None
def _log_advert(self, json_line):
"""Log advert JSON to .jsonl file with timestamp"""
@@ -1673,7 +1709,7 @@ def get_auto_retry_config():
'success': True,
'enabled': meshcli_session.auto_retry_enabled,
'max_attempts': meshcli_session.auto_retry_max_attempts,
'flood_after': meshcli_session.auto_retry_flood_after,
'max_flood': meshcli_session.auto_retry_max_flood,
'active_retries': len(meshcli_session.active_retries),
}), 200
@@ -1693,19 +1729,19 @@ def set_auto_retry_config():
if 'max_attempts' in data:
val = int(data['max_attempts'])
meshcli_session.auto_retry_max_attempts = max(1, min(val, 10))
if 'flood_after' in data:
val = int(data['flood_after'])
meshcli_session.auto_retry_flood_after = max(1, min(val, 10))
if 'max_flood' in data:
val = int(data['max_flood'])
meshcli_session.auto_retry_max_flood = max(0, min(val, 10))
logger.info(f"Auto-retry config updated: enabled={meshcli_session.auto_retry_enabled}, "
f"max_attempts={meshcli_session.auto_retry_max_attempts}, "
f"flood_after={meshcli_session.auto_retry_flood_after}")
f"max_flood={meshcli_session.auto_retry_max_flood}")
return jsonify({
'success': True,
'enabled': meshcli_session.auto_retry_enabled,
'max_attempts': meshcli_session.auto_retry_max_attempts,
'flood_after': meshcli_session.auto_retry_flood_after,
'max_flood': meshcli_session.auto_retry_max_flood,
}), 200