mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-03-28 17:42:45 +01:00
fix(dm): Fix PATH_UPDATE race condition and confirm all retry acks
The retry thread was removing pending_flood_acks immediately after exhausting retries. PATH_UPDATE arriving even 1 second later would find no pending entry to match, leaving the message undelivered. Changes: - Don't clean up pending_flood_acks in retry threads, keep entries alive for late PATH_UPDATE arrivals - Add TTL-based cleanup (2 min) in _process_path_update - When PATH_UPDATE confirms delivery, save synthetic ACK for the original ack code AND all retry group ack codes so the frontend ack_status polling finds a match regardless of which code it checks Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -965,6 +965,9 @@ class MeshCLISession:
|
||||
|
||||
logger.info(f"PATH_UPDATE received for contact {public_key[:16]}...")
|
||||
|
||||
# Cleanup stale entries on every PATH_UPDATE event
|
||||
self._cleanup_stale_flood_acks()
|
||||
|
||||
# Check if we have a pending flood DM for this contact
|
||||
pending = self.pending_flood_acks.get(public_key)
|
||||
if not pending:
|
||||
@@ -973,20 +976,31 @@ class MeshCLISession:
|
||||
original_ack = pending['original_ack']
|
||||
recipient = pending['recipient']
|
||||
|
||||
# Delivery confirmed via PATH — store synthetic ACK if we have an ack code
|
||||
# Delivery confirmed via PATH — store synthetic ACK for original + all retry acks
|
||||
now = time.time()
|
||||
ack_codes_to_confirm = []
|
||||
if original_ack:
|
||||
record = {
|
||||
'ack_code': original_ack,
|
||||
'snr': None,
|
||||
'rssi': None,
|
||||
'route': 'PATH_FLOOD',
|
||||
'path': '',
|
||||
'ts': time.time(),
|
||||
}
|
||||
self.acks[original_ack] = record
|
||||
self._save_ack(record)
|
||||
ack_codes_to_confirm.append(original_ack)
|
||||
# Also confirm all retry group ack codes
|
||||
with self.retry_lock:
|
||||
retry_acks = self.retry_groups.get(original_ack, [])
|
||||
ack_codes_to_confirm.extend(retry_acks)
|
||||
|
||||
logger.info(f"PATH delivery confirmed for '{recipient}', ack={original_ack}")
|
||||
for ack_code in ack_codes_to_confirm:
|
||||
if ack_code not in self.acks:
|
||||
record = {
|
||||
'ack_code': ack_code,
|
||||
'snr': None,
|
||||
'rssi': None,
|
||||
'route': 'PATH_FLOOD',
|
||||
'path': '',
|
||||
'ts': now,
|
||||
}
|
||||
self.acks[ack_code] = record
|
||||
self._save_ack(record)
|
||||
|
||||
logger.info(f"PATH delivery confirmed for '{recipient}', "
|
||||
f"ack={original_ack}, confirmed_codes={len(ack_codes_to_confirm)}")
|
||||
|
||||
# Signal the retry thread to stop
|
||||
pending['cancel_event'].set()
|
||||
@@ -994,6 +1008,18 @@ class MeshCLISession:
|
||||
# Cleanup
|
||||
self.pending_flood_acks.pop(public_key, None)
|
||||
|
||||
def _cleanup_stale_flood_acks(self):
|
||||
"""Remove pending_flood_acks entries older than 2 minutes."""
|
||||
cutoff = time.time() - 120
|
||||
stale_keys = [
|
||||
pk for pk, entry in self.pending_flood_acks.items()
|
||||
if entry.get('timestamp', 0) < cutoff
|
||||
]
|
||||
for pk in stale_keys:
|
||||
entry = self.pending_flood_acks.pop(pk)
|
||||
logger.debug(f"Cleaned up stale pending_flood_ack for "
|
||||
f"{entry.get('recipient', 'unknown')}")
|
||||
|
||||
# =========================================================================
|
||||
# Auto-retry for DM messages
|
||||
# =========================================================================
|
||||
@@ -1117,12 +1143,12 @@ class MeshCLISession:
|
||||
if not cancel_event.is_set():
|
||||
cancel_event.wait(timeout=wait_before_retry)
|
||||
|
||||
# Cleanup
|
||||
# Cleanup active_retries but keep pending_flood_acks alive
|
||||
# for late PATH_UPDATE arrivals (cleaned up by TTL in _cleanup_stale_flood_acks)
|
||||
if not cancel_event.is_set():
|
||||
logger.warning(f"Flood retry on error: exhausted ({max_flood} attempts) "
|
||||
f"for '{text[:30]}' -> {recipient}")
|
||||
if public_key:
|
||||
self.pending_flood_acks.pop(public_key, None)
|
||||
f"for '{text[:30]}' -> {recipient}, "
|
||||
f"keeping PATH_UPDATE listener active")
|
||||
if original_ack:
|
||||
with self.retry_lock:
|
||||
self.active_retries.pop(original_ack, None)
|
||||
@@ -1249,12 +1275,12 @@ class MeshCLISession:
|
||||
if not cancel_event.is_set():
|
||||
cancel_event.wait(timeout=wait_timeout)
|
||||
|
||||
# Cleanup
|
||||
# Cleanup active_retries but keep pending_flood_acks alive
|
||||
# for late PATH_UPDATE arrivals (cleaned up by TTL in _cleanup_stale_flood_acks)
|
||||
if not cancel_event.is_set():
|
||||
logger.warning(f"Flood retry exhausted ({max_flood} attempts) "
|
||||
f"for '{text[:30]}' -> {recipient}")
|
||||
if public_key:
|
||||
self.pending_flood_acks.pop(public_key, None)
|
||||
f"for '{text[:30]}' -> {recipient}, "
|
||||
f"keeping PATH_UPDATE listener active")
|
||||
with self.retry_lock:
|
||||
self.active_retries.pop(original_ack, None)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user