From 66b553a8b56d2bd3db967fac00229008e2c21964 Mon Sep 17 00:00:00 2001 From: MarekWo Date: Fri, 27 Feb 2026 10:58:48 +0100 Subject: [PATCH] fix(dm): Retry flood DM when initial send fails with device error When sending a DM to a no-path contact, meshcli can return "Error sending message" if the device-level flood send fails. Previously this was treated as a terminal failure with no retry. Now detects the error and starts a background flood retry thread that re-attempts up to auto_retry_flood_only times (default 3), waiting between attempts for a PATH_UPDATE to establish the route. If a retry succeeds, registers for normal delivery tracking. Co-Authored-By: Claude Opus 4.6 --- meshcore-bridge/bridge.py | 145 ++++++++++++++++++++++++++++++++++---- 1 file changed, 132 insertions(+), 13 deletions(-) diff --git a/meshcore-bridge/bridge.py b/meshcore-bridge/bridge.py index f961d01..af264fe 100644 --- a/meshcore-bridge/bridge.py +++ b/meshcore-bridge/bridge.py @@ -973,18 +973,19 @@ class MeshCLISession: original_ack = pending['original_ack'] recipient = pending['recipient'] - # Delivery confirmed via PATH — store synthetic ACK - record = { - 'ack_code': original_ack, - 'snr': None, - 'rssi': None, - 'route': 'PATH_FLOOD', - 'path': '', - 'ts': time.time(), - } + # Delivery confirmed via PATH — store synthetic ACK if we have an ack code + if original_ack: + record = { + 'ack_code': original_ack, + 'snr': None, + 'rssi': None, + 'route': 'PATH_FLOOD', + 'path': '', + 'ts': time.time(), + } + self.acks[original_ack] = record + self._save_ack(record) - self.acks[original_ack] = record - self._save_ack(record) logger.info(f"PATH delivery confirmed for '{recipient}', ack={original_ack}") # Signal the retry thread to stop @@ -1017,6 +1018,115 @@ class MeshCLISession: f"flood_only={self.auto_retry_flood_only}, " f"timeout={suggested_timeout}ms") + def _start_flood_retry_on_error(self, recipient, text): + """Start flood retry when initial .msg failed (e.g. no-path contact error). + + Unlike _start_retry, there is no initial ack code. The retry thread + will re-send .msg up to auto_retry_flood_only times, hoping subsequent + attempts succeed after a PATH_UPDATE establishes the route. + """ + if self.auto_retry_flood_only < 2: + logger.debug("Flood retry on error: disabled (flood_only < 2)") + return + + # Check if this is actually a no-path contact + path_len, public_key = self._get_contact_info(recipient) + if path_len >= 0: + # Contact has a path - this shouldn't normally fail. + # Don't start flood retry for path-known contacts. + logger.warning(f"Flood retry on error: '{recipient}' has path " + f"(len={path_len}), skipping flood retry") + return + + thread = threading.Thread( + target=self._retry_flood_on_error, + args=(recipient, text, public_key), + daemon=True, + name=f"flood-err-{recipient[:12]}" + ) + thread.start() + logger.info(f"Flood retry on error started for '{recipient}', " + f"max={self.auto_retry_flood_only}") + + def _retry_flood_on_error(self, recipient, text, public_key): + """Flood retry thread when initial send failed. + + Re-sends .msg up to auto_retry_flood_only times. If any send succeeds + (returns an ack code), registers for PATH_UPDATE delivery confirmation. + """ + max_flood = self.auto_retry_flood_only + cancel_event = threading.Event() + original_ack = None # We don't have one yet + wait_before_retry = 5.0 # Default wait between attempts + + # Register for PATH_UPDATE if we have the public key + if public_key: + self.pending_flood_acks[public_key] = { + 'original_ack': None, # Will be updated when first send succeeds + 'cancel_event': cancel_event, + 'recipient': recipient, + 'timestamp': time.time(), + } + + for attempt in range(max_flood): + # Wait before each attempt (PATH_UPDATE may arrive between retries) + if cancel_event.wait(timeout=wait_before_retry): + logger.info(f"Flood retry on error: PATH delivery confirmed " + f"for '{recipient}' before attempt {attempt + 1}") + break + + # Send .msg + logger.info(f"Flood retry on error: attempt {attempt + 1}/{max_flood} " + f"for '{text[:30]}' -> {recipient}") + try: + result = self.execute_command(['.msg', recipient, text], + timeout=DEFAULT_TIMEOUT) + if result.get('success'): + stdout = result.get('stdout', '') + new_ack = self._extract_ack_from_response(stdout) + new_timeout = self._extract_timeout_from_response(stdout) + if new_ack: + # Send succeeded! Register for normal delivery tracking + if original_ack is None: + original_ack = new_ack + with self.retry_lock: + self.active_retries[original_ack] = cancel_event + self.retry_groups[original_ack] = [] + # Update pending_flood_acks with actual ack code + if public_key and public_key in self.pending_flood_acks: + self.pending_flood_acks[public_key]['original_ack'] = new_ack + else: + with self.retry_lock: + self.retry_ack_codes.add(new_ack) + self.retry_groups.setdefault( + original_ack, []).append(new_ack) + if new_timeout: + wait_before_retry = max(new_timeout / 1000 * 1.2, 5.0) + logger.info(f"Flood retry on error: send succeeded, " + f"ack={new_ack}") + else: + logger.warning(f"Flood retry on error: send succeeded but " + f"no ack in response") + else: + logger.warning(f"Flood retry on error: send failed again " + f"(attempt {attempt + 1}/{max_flood})") + except Exception as e: + logger.warning(f"Flood retry on error: exception: {e}") + else: + # Final wait after last attempt + if not cancel_event.is_set(): + cancel_event.wait(timeout=wait_before_retry) + + # Cleanup + if not cancel_event.is_set(): + logger.warning(f"Flood retry on error: exhausted ({max_flood} attempts) " + f"for '{text[:30]}' -> {recipient}") + if public_key: + self.pending_flood_acks.pop(public_key, None) + if original_ack: + with self.retry_lock: + self.active_retries.pop(original_ack, None) + def _get_contact_info(self, recipient): """Get contact info via .ci command. @@ -1592,8 +1702,8 @@ def execute_cli(): # Execute via persistent session result = meshcli_session.execute_command(args, timeout) - # Auto-retry: after successful msg command, start background retry - if (result.get('success') and args and args[0] in ('.msg', '.m') + # Auto-retry: after msg command, start background retry + if (args and args[0] in ('.msg', '.m') and len(args) >= 3 and meshcli_session.auto_retry_enabled and meshcli_session.auto_retry_max_attempts > 1): stdout = result.get('stdout', '') @@ -1605,6 +1715,15 @@ def execute_cli(): meshcli_session._start_retry( recipient, text, expected_ack, suggested_timeout ) + elif 'Error sending message' in stdout or not result.get('success'): + # Send failed (e.g. no-path contact, device error). + # Start flood retry - subsequent attempts may succeed + # after PATH_UPDATE establishes a route. + recipient = args[1] + text = args[2] + logger.info(f"Auto-retry: initial send failed, starting flood retry " + f"for '{recipient}'") + meshcli_session._start_flood_retry_on_error(recipient, text) else: logger.warning(f"Auto-retry: could not extract ack/timeout from msg response: " f"{stdout[:300]}")