fix(dm): Retry flood DM when initial send fails with device error

When sending a DM to a no-path contact, meshcli can return
"Error sending message" if the device-level flood send fails.
Previously this was treated as a terminal failure with no retry.

Now detects the error and starts a background flood retry thread
that re-attempts up to auto_retry_flood_only times (default 3),
waiting between attempts for a PATH_UPDATE to establish the route.
If a retry succeeds, registers for normal delivery tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-02-27 10:58:48 +01:00
parent 2764e1c551
commit 66b553a8b5

View File

@@ -973,18 +973,19 @@ class MeshCLISession:
original_ack = pending['original_ack']
recipient = pending['recipient']
# Delivery confirmed via PATH — store synthetic ACK
record = {
'ack_code': original_ack,
'snr': None,
'rssi': None,
'route': 'PATH_FLOOD',
'path': '',
'ts': time.time(),
}
# Delivery confirmed via PATH — store synthetic ACK if we have an ack code
if original_ack:
record = {
'ack_code': original_ack,
'snr': None,
'rssi': None,
'route': 'PATH_FLOOD',
'path': '',
'ts': time.time(),
}
self.acks[original_ack] = record
self._save_ack(record)
self.acks[original_ack] = record
self._save_ack(record)
logger.info(f"PATH delivery confirmed for '{recipient}', ack={original_ack}")
# Signal the retry thread to stop
@@ -1017,6 +1018,115 @@ class MeshCLISession:
f"flood_only={self.auto_retry_flood_only}, "
f"timeout={suggested_timeout}ms")
def _start_flood_retry_on_error(self, recipient, text):
"""Start flood retry when initial .msg failed (e.g. no-path contact error).
Unlike _start_retry, there is no initial ack code. The retry thread
will re-send .msg up to auto_retry_flood_only times, hoping subsequent
attempts succeed after a PATH_UPDATE establishes the route.
"""
if self.auto_retry_flood_only < 2:
logger.debug("Flood retry on error: disabled (flood_only < 2)")
return
# Check if this is actually a no-path contact
path_len, public_key = self._get_contact_info(recipient)
if path_len >= 0:
# Contact has a path - this shouldn't normally fail.
# Don't start flood retry for path-known contacts.
logger.warning(f"Flood retry on error: '{recipient}' has path "
f"(len={path_len}), skipping flood retry")
return
thread = threading.Thread(
target=self._retry_flood_on_error,
args=(recipient, text, public_key),
daemon=True,
name=f"flood-err-{recipient[:12]}"
)
thread.start()
logger.info(f"Flood retry on error started for '{recipient}', "
f"max={self.auto_retry_flood_only}")
def _retry_flood_on_error(self, recipient, text, public_key):
"""Flood retry thread when initial send failed.
Re-sends .msg up to auto_retry_flood_only times. If any send succeeds
(returns an ack code), registers for PATH_UPDATE delivery confirmation.
"""
max_flood = self.auto_retry_flood_only
cancel_event = threading.Event()
original_ack = None # We don't have one yet
wait_before_retry = 5.0 # Default wait between attempts
# Register for PATH_UPDATE if we have the public key
if public_key:
self.pending_flood_acks[public_key] = {
'original_ack': None, # Will be updated when first send succeeds
'cancel_event': cancel_event,
'recipient': recipient,
'timestamp': time.time(),
}
for attempt in range(max_flood):
# Wait before each attempt (PATH_UPDATE may arrive between retries)
if cancel_event.wait(timeout=wait_before_retry):
logger.info(f"Flood retry on error: PATH delivery confirmed "
f"for '{recipient}' before attempt {attempt + 1}")
break
# Send .msg
logger.info(f"Flood retry on error: attempt {attempt + 1}/{max_flood} "
f"for '{text[:30]}' -> {recipient}")
try:
result = self.execute_command(['.msg', recipient, text],
timeout=DEFAULT_TIMEOUT)
if result.get('success'):
stdout = result.get('stdout', '')
new_ack = self._extract_ack_from_response(stdout)
new_timeout = self._extract_timeout_from_response(stdout)
if new_ack:
# Send succeeded! Register for normal delivery tracking
if original_ack is None:
original_ack = new_ack
with self.retry_lock:
self.active_retries[original_ack] = cancel_event
self.retry_groups[original_ack] = []
# Update pending_flood_acks with actual ack code
if public_key and public_key in self.pending_flood_acks:
self.pending_flood_acks[public_key]['original_ack'] = new_ack
else:
with self.retry_lock:
self.retry_ack_codes.add(new_ack)
self.retry_groups.setdefault(
original_ack, []).append(new_ack)
if new_timeout:
wait_before_retry = max(new_timeout / 1000 * 1.2, 5.0)
logger.info(f"Flood retry on error: send succeeded, "
f"ack={new_ack}")
else:
logger.warning(f"Flood retry on error: send succeeded but "
f"no ack in response")
else:
logger.warning(f"Flood retry on error: send failed again "
f"(attempt {attempt + 1}/{max_flood})")
except Exception as e:
logger.warning(f"Flood retry on error: exception: {e}")
else:
# Final wait after last attempt
if not cancel_event.is_set():
cancel_event.wait(timeout=wait_before_retry)
# Cleanup
if not cancel_event.is_set():
logger.warning(f"Flood retry on error: exhausted ({max_flood} attempts) "
f"for '{text[:30]}' -> {recipient}")
if public_key:
self.pending_flood_acks.pop(public_key, None)
if original_ack:
with self.retry_lock:
self.active_retries.pop(original_ack, None)
def _get_contact_info(self, recipient):
"""Get contact info via .ci command.
@@ -1592,8 +1702,8 @@ def execute_cli():
# Execute via persistent session
result = meshcli_session.execute_command(args, timeout)
# Auto-retry: after successful msg command, start background retry
if (result.get('success') and args and args[0] in ('.msg', '.m')
# Auto-retry: after msg command, start background retry
if (args and args[0] in ('.msg', '.m')
and len(args) >= 3 and meshcli_session.auto_retry_enabled
and meshcli_session.auto_retry_max_attempts > 1):
stdout = result.get('stdout', '')
@@ -1605,6 +1715,15 @@ def execute_cli():
meshcli_session._start_retry(
recipient, text, expected_ack, suggested_timeout
)
elif 'Error sending message' in stdout or not result.get('success'):
# Send failed (e.g. no-path contact, device error).
# Start flood retry - subsequent attempts may succeed
# after PATH_UPDATE establishes a route.
recipient = args[1]
text = args[2]
logger.info(f"Auto-retry: initial send failed, starting flood retry "
f"for '{recipient}'")
meshcli_session._start_flood_retry_on_error(recipient, text)
else:
logger.warning(f"Auto-retry: could not extract ack/timeout from msg response: "
f"{stdout[:300]}")