From 39f4a715384db27cd6e810ca7af023be67d39a5d Mon Sep 17 00:00:00 2001 From: MarekWo Date: Fri, 27 Feb 2026 12:02:42 +0100 Subject: [PATCH] fix(dm): Fix PATH_UPDATE race condition and confirm all retry acks The retry thread was removing pending_flood_acks immediately after exhausting retries. PATH_UPDATE arriving even 1 second later would find no pending entry to match, leaving the message undelivered. Changes: - Don't clean up pending_flood_acks in retry threads, keep entries alive for late PATH_UPDATE arrivals - Add TTL-based cleanup (2 min) in _process_path_update - When PATH_UPDATE confirms delivery, save synthetic ACK for the original ack code AND all retry group ack codes so the frontend ack_status polling finds a match regardless of which code it checks Co-Authored-By: Claude Opus 4.6 --- meshcore-bridge/bridge.py | 66 +++++++++++++++++++++++++++------------ 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/meshcore-bridge/bridge.py b/meshcore-bridge/bridge.py index af264fe..d1e8d20 100644 --- a/meshcore-bridge/bridge.py +++ b/meshcore-bridge/bridge.py @@ -965,6 +965,9 @@ class MeshCLISession: logger.info(f"PATH_UPDATE received for contact {public_key[:16]}...") + # Cleanup stale entries on every PATH_UPDATE event + self._cleanup_stale_flood_acks() + # Check if we have a pending flood DM for this contact pending = self.pending_flood_acks.get(public_key) if not pending: @@ -973,20 +976,31 @@ class MeshCLISession: original_ack = pending['original_ack'] recipient = pending['recipient'] - # Delivery confirmed via PATH — store synthetic ACK if we have an ack code + # Delivery confirmed via PATH — store synthetic ACK for original + all retry acks + now = time.time() + ack_codes_to_confirm = [] if original_ack: - record = { - 'ack_code': original_ack, - 'snr': None, - 'rssi': None, - 'route': 'PATH_FLOOD', - 'path': '', - 'ts': time.time(), - } - self.acks[original_ack] = record - self._save_ack(record) + ack_codes_to_confirm.append(original_ack) + # Also confirm all retry group ack codes + with self.retry_lock: + retry_acks = self.retry_groups.get(original_ack, []) + ack_codes_to_confirm.extend(retry_acks) - logger.info(f"PATH delivery confirmed for '{recipient}', ack={original_ack}") + for ack_code in ack_codes_to_confirm: + if ack_code not in self.acks: + record = { + 'ack_code': ack_code, + 'snr': None, + 'rssi': None, + 'route': 'PATH_FLOOD', + 'path': '', + 'ts': now, + } + self.acks[ack_code] = record + self._save_ack(record) + + logger.info(f"PATH delivery confirmed for '{recipient}', " + f"ack={original_ack}, confirmed_codes={len(ack_codes_to_confirm)}") # Signal the retry thread to stop pending['cancel_event'].set() @@ -994,6 +1008,18 @@ class MeshCLISession: # Cleanup self.pending_flood_acks.pop(public_key, None) + def _cleanup_stale_flood_acks(self): + """Remove pending_flood_acks entries older than 2 minutes.""" + cutoff = time.time() - 120 + stale_keys = [ + pk for pk, entry in self.pending_flood_acks.items() + if entry.get('timestamp', 0) < cutoff + ] + for pk in stale_keys: + entry = self.pending_flood_acks.pop(pk) + logger.debug(f"Cleaned up stale pending_flood_ack for " + f"{entry.get('recipient', 'unknown')}") + # ========================================================================= # Auto-retry for DM messages # ========================================================================= @@ -1117,12 +1143,12 @@ class MeshCLISession: if not cancel_event.is_set(): cancel_event.wait(timeout=wait_before_retry) - # Cleanup + # Cleanup active_retries but keep pending_flood_acks alive + # for late PATH_UPDATE arrivals (cleaned up by TTL in _cleanup_stale_flood_acks) if not cancel_event.is_set(): logger.warning(f"Flood retry on error: exhausted ({max_flood} attempts) " - f"for '{text[:30]}' -> {recipient}") - if public_key: - self.pending_flood_acks.pop(public_key, None) + f"for '{text[:30]}' -> {recipient}, " + f"keeping PATH_UPDATE listener active") if original_ack: with self.retry_lock: self.active_retries.pop(original_ack, None) @@ -1249,12 +1275,12 @@ class MeshCLISession: if not cancel_event.is_set(): cancel_event.wait(timeout=wait_timeout) - # Cleanup + # Cleanup active_retries but keep pending_flood_acks alive + # for late PATH_UPDATE arrivals (cleaned up by TTL in _cleanup_stale_flood_acks) if not cancel_event.is_set(): logger.warning(f"Flood retry exhausted ({max_flood} attempts) " - f"for '{text[:30]}' -> {recipient}") - if public_key: - self.pending_flood_acks.pop(public_key, None) + f"for '{text[:30]}' -> {recipient}, " + f"keeping PATH_UPDATE listener active") with self.retry_lock: self.active_retries.pop(original_ack, None)