diff --git a/app/meshcore/cli.py b/app/meshcore/cli.py index 26fd978..c878e53 100644 --- a/app/meshcore/cli.py +++ b/app/meshcore/cli.py @@ -461,15 +461,15 @@ def get_auto_retry_config() -> Tuple[bool, Dict]: return False, {} -def set_auto_retry_config(enabled=None, max_attempts=None, flood_after=None) -> Tuple[bool, Dict]: +def set_auto_retry_config(enabled=None, max_attempts=None, max_flood=None) -> Tuple[bool, Dict]: """Update auto-retry configuration on bridge.""" payload = {} if enabled is not None: payload['enabled'] = enabled if max_attempts is not None: payload['max_attempts'] = max_attempts - if flood_after is not None: - payload['flood_after'] = flood_after + if max_flood is not None: + payload['max_flood'] = max_flood try: response = requests.post( diff --git a/app/routes/api.py b/app/routes/api.py index d451d3e..73a25e9 100644 --- a/app/routes/api.py +++ b/app/routes/api.py @@ -1826,7 +1826,7 @@ def set_auto_retry_config(): success, result = cli.set_auto_retry_config( enabled=data.get('enabled'), max_attempts=data.get('max_attempts'), - flood_after=data.get('flood_after') + max_flood=data.get('max_flood') ) if success: return jsonify(result), 200 diff --git a/meshcore-bridge/bridge.py b/meshcore-bridge/bridge.py index fbb522f..75fd98f 100644 --- a/meshcore-bridge/bridge.py +++ b/meshcore-bridge/bridge.py @@ -159,8 +159,8 @@ class MeshCLISession: # Auto-retry for DM messages self.auto_retry_enabled = True - self.auto_retry_max_attempts = 3 # total attempts including first send - self.auto_retry_flood_after = 2 # switch to flood after N failed direct attempts + self.auto_retry_max_attempts = 5 # max direct attempts (including first send) + self.auto_retry_max_flood = 3 # max flood attempts (after reset_path) self.retry_ack_codes = set() # expected_ack codes of retry attempts (not first) self.retry_groups = {} # original_ack -> [retry_ack_1, retry_ack_2, ...] self.retry_lock = threading.Lock() @@ -833,61 +833,70 @@ class MeshCLISession: ) thread.start() logger.info(f"Auto-retry started for ack={original_ack}, " - f"max_attempts={self.auto_retry_max_attempts}, " + f"direct={self.auto_retry_max_attempts}, " + f"flood={self.auto_retry_max_flood}, " f"timeout={suggested_timeout}ms") def _retry_send(self, recipient, text, original_ack, suggested_timeout, cancel_event): """Background retry loop for a DM message. - Waits for ACK after initial send, retries up to max_attempts total. - After flood_after failed direct attempts, resets path to force flood routing. + Phase 1: Direct attempts (up to auto_retry_max_attempts) + Phase 2: Flood attempts (up to auto_retry_max_flood) after reset_path """ # Wait timeout in seconds (use suggested_timeout from device, with 1.2x margin) wait_timeout = max(suggested_timeout / 1000 * 1.2, 5.0) - # Attempt 1 was already sent by the caller. Start from attempt 2. - attempt = 1 # 0-indexed, so attempt=1 is the second try - flood_sent = False + max_direct = self.auto_retry_max_attempts # includes the first send + max_flood = self.auto_retry_max_flood + total_max = max_direct + max_flood - while attempt < self.auto_retry_max_attempts: + # Attempt 1 was already sent by the caller. Start from attempt index 1. + attempt = 1 # 0-indexed; attempt 0 was the initial send + flood_mode = False + flood_attempts = 0 + + while attempt < total_max: if cancel_event.is_set(): logger.info(f"Retry cancelled for ack={original_ack}") break - # Wait for ACK of the most recent expected_ack + # Collect all ack codes in the group to check with self.retry_lock: all_acks = [original_ack] + self.retry_groups.get(original_ack, []) - latest_ack = all_acks[-1] - # Poll for ACK (check all ack codes in the group — any ACK means delivered) + # Poll for ACK (check all ack codes — any ACK means delivered) ack_received = self._wait_for_any_ack(all_acks, wait_timeout, cancel_event) if ack_received: - logger.info(f"ACK received for ack={latest_ack} (attempt {attempt}), " + mode = "flood" if flood_mode else "direct" + logger.info(f"ACK received (attempt {attempt}, {mode}), " f"retry done for original={original_ack}") break if cancel_event.is_set(): break - # No ACK received - retry - attempt += 1 - - # Reset path to flood before the configured attempt - if attempt >= self.auto_retry_flood_after and not flood_sent: - logger.info(f"Retry: resetting path for {recipient} (switching to flood)") + # Check limits for current mode + if not flood_mode and attempt >= max_direct: + # Switch to flood mode + logger.info(f"Retry: resetting path for {recipient} (switching to flood, " + f"{max_direct} direct attempts exhausted)") try: self.execute_command(['reset_path', recipient], timeout=5) - flood_sent = True except Exception as e: logger.error(f"Retry: reset_path failed: {e}") + flood_mode = True - logger.info(f"Retry attempt {attempt + 1}/{self.auto_retry_max_attempts} " - f"for '{text[:20]}...' → {recipient}") + if flood_mode and flood_attempts >= max_flood: + break + + # Send retry + mode_str = "flood" if flood_mode else "direct" + logger.info(f"Retry {mode_str} attempt {attempt + 1}/{total_max} " + f"for '{text[:30]}' → {recipient}") try: result = self.execute_command(['msg', recipient, text], timeout=DEFAULT_TIMEOUT) if result.get('success'): - # Parse the response to get new expected_ack new_ack = self._extract_ack_from_response(result.get('stdout', '')) new_timeout = self._extract_timeout_from_response(result.get('stdout', '')) if new_ack: @@ -898,7 +907,8 @@ class MeshCLISession: wait_timeout = max(new_timeout / 1000 * 1.2, 5.0) logger.info(f"Retry sent, new ack={new_ack}") else: - logger.warning(f"Retry: could not parse expected_ack from response") + logger.warning(f"Retry: could not parse expected_ack from response: " + f"{result.get('stdout', '')[:200]}") break else: logger.error(f"Retry: msg command failed: {result.get('stderr', '')}") @@ -907,13 +917,17 @@ class MeshCLISession: logger.error(f"Retry: send failed: {e}") break + attempt += 1 + if flood_mode: + flood_attempts += 1 + # Cleanup active retry tracking with self.retry_lock: self.active_retries.pop(original_ack, None) - if attempt >= self.auto_retry_max_attempts: - logger.warning(f"Auto-retry exhausted ({self.auto_retry_max_attempts} attempts) " - f"for '{text[:20]}...' → {recipient}") + if attempt >= total_max: + logger.warning(f"Auto-retry exhausted ({max_direct} direct + {max_flood} flood) " + f"for '{text[:30]}' → {recipient}") def _wait_for_any_ack(self, ack_codes, timeout_seconds, cancel_event): """Poll self.acks dict for any of the given ack_codes with timeout.""" @@ -928,33 +942,55 @@ class MeshCLISession: cancel_event.wait(poll_interval) return any(code in self.acks for code in ack_codes) - def _extract_ack_from_response(self, stdout): - """Extract expected_ack from msg command JSON response.""" + def _parse_msg_response(self, stdout): + """Parse msg command response (may be multi-line pretty-printed JSON). + + Returns dict with expected_ack, suggested_timeout, etc. or None. + """ + if not stdout or not stdout.strip(): + return None + + # Try parsing the entire stdout as one JSON object (handles indent=4) try: - for line in stdout.strip().split('\n'): - line = line.strip() - if not line: - continue + data = json.loads(stdout.strip()) + if isinstance(data, dict) and 'expected_ack' in data: + return data + except (json.JSONDecodeError, ValueError): + pass + + # Fallback: try each line individually (single-line JSON) + for line in stdout.strip().split('\n'): + line = line.strip() + if not line: + continue + try: data = json.loads(line) if isinstance(data, dict) and 'expected_ack' in data: - return data['expected_ack'] - except (json.JSONDecodeError, ValueError): - pass + return data + except (json.JSONDecodeError, ValueError): + continue + + # Last resort: regex extraction + import re + ack_match = re.search(r'"expected_ack"\s*:\s*"([0-9a-fA-F]+)"', stdout) + timeout_match = re.search(r'"suggested_timeout"\s*:\s*(\d+)', stdout) + if ack_match: + result = {'expected_ack': ack_match.group(1)} + if timeout_match: + result['suggested_timeout'] = int(timeout_match.group(1)) + return result + return None + def _extract_ack_from_response(self, stdout): + """Extract expected_ack from msg command response.""" + data = self._parse_msg_response(stdout) + return data.get('expected_ack') if data else None + def _extract_timeout_from_response(self, stdout): - """Extract suggested_timeout from msg command JSON response.""" - try: - for line in stdout.strip().split('\n'): - line = line.strip() - if not line: - continue - data = json.loads(line) - if isinstance(data, dict) and 'suggested_timeout' in data: - return data['suggested_timeout'] - except (json.JSONDecodeError, ValueError): - pass - return None + """Extract suggested_timeout from msg command response.""" + data = self._parse_msg_response(stdout) + return data.get('suggested_timeout') if data else None def _log_advert(self, json_line): """Log advert JSON to .jsonl file with timestamp""" @@ -1673,7 +1709,7 @@ def get_auto_retry_config(): 'success': True, 'enabled': meshcli_session.auto_retry_enabled, 'max_attempts': meshcli_session.auto_retry_max_attempts, - 'flood_after': meshcli_session.auto_retry_flood_after, + 'max_flood': meshcli_session.auto_retry_max_flood, 'active_retries': len(meshcli_session.active_retries), }), 200 @@ -1693,19 +1729,19 @@ def set_auto_retry_config(): if 'max_attempts' in data: val = int(data['max_attempts']) meshcli_session.auto_retry_max_attempts = max(1, min(val, 10)) - if 'flood_after' in data: - val = int(data['flood_after']) - meshcli_session.auto_retry_flood_after = max(1, min(val, 10)) + if 'max_flood' in data: + val = int(data['max_flood']) + meshcli_session.auto_retry_max_flood = max(0, min(val, 10)) logger.info(f"Auto-retry config updated: enabled={meshcli_session.auto_retry_enabled}, " f"max_attempts={meshcli_session.auto_retry_max_attempts}, " - f"flood_after={meshcli_session.auto_retry_flood_after}") + f"max_flood={meshcli_session.auto_retry_max_flood}") return jsonify({ 'success': True, 'enabled': meshcli_session.auto_retry_enabled, 'max_attempts': meshcli_session.auto_retry_max_attempts, - 'flood_after': meshcli_session.auto_retry_flood_after, + 'max_flood': meshcli_session.auto_retry_max_flood, }), 200