mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-03-28 17:42:45 +01:00
fix(dm): delayed path backfill for FLOOD-delivered messages
When FLOOD delivery is confirmed, the PATH_UPDATE event payload often has empty path data because firmware updates the contact's out_path asynchronously. After 3s delay, read the contact's updated path from the meshcore library's in-memory contacts dict and backfill the DB. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -625,6 +625,13 @@ class DeviceManager:
|
||||
'max_attempts': ctx['max_attempts'],
|
||||
'path': ctx['path'],
|
||||
}, namespace='/chat')
|
||||
# If path is empty (FLOOD delivery), schedule delayed read from device
|
||||
if not ctx['path']:
|
||||
dm_rec = self.db.get_dm_by_id(dm_id)
|
||||
contact_pk = dm_rec.get('contact_pubkey', '') if dm_rec else ''
|
||||
if contact_pk:
|
||||
asyncio.ensure_future(
|
||||
self._delayed_path_backfill(dm_id, contact_pk))
|
||||
|
||||
task = self._retry_tasks.get(dm_id)
|
||||
if task and not task.done():
|
||||
@@ -798,6 +805,10 @@ class DeviceManager:
|
||||
'max_attempts': ctx['max_attempts'],
|
||||
'path': discovered_path,
|
||||
}, namespace='/chat')
|
||||
# If path still empty, schedule delayed read from device contacts
|
||||
if not discovered_path and pubkey:
|
||||
asyncio.ensure_future(
|
||||
self._delayed_path_backfill(dm_id, pubkey))
|
||||
# Cancel retry task — delivery already confirmed
|
||||
task = self._retry_tasks.get(dm_id)
|
||||
if task and not task.done():
|
||||
@@ -812,21 +823,28 @@ class DeviceManager:
|
||||
# Update delivery_path for recently-delivered DMs where _on_ack
|
||||
# stored empty path (FLOOD mode) before PATH_UPDATE could provide it
|
||||
discovered_path = data.get('path', '')
|
||||
if discovered_path and pubkey:
|
||||
recent = self.db.get_recent_delivered_dm_with_empty_path(pubkey)
|
||||
if recent:
|
||||
self.db.update_dm_delivery_info(
|
||||
recent['id'], recent['delivery_attempt'],
|
||||
recent['delivery_max_attempts'], discovered_path)
|
||||
if self.socketio:
|
||||
self.socketio.emit('dm_delivered_info', {
|
||||
'dm_id': recent['id'],
|
||||
'attempt': recent['delivery_attempt'],
|
||||
'max_attempts': recent['delivery_max_attempts'],
|
||||
'path': discovered_path,
|
||||
}, namespace='/chat')
|
||||
logger.debug(f"Updated delivery path for dm_id={recent['id']} "
|
||||
f"with discovered path {discovered_path[:16]}")
|
||||
if pubkey:
|
||||
if discovered_path:
|
||||
recent = self.db.get_recent_delivered_dm_with_empty_path(pubkey)
|
||||
if recent:
|
||||
self.db.update_dm_delivery_info(
|
||||
recent['id'], recent['delivery_attempt'],
|
||||
recent['delivery_max_attempts'], discovered_path)
|
||||
if self.socketio:
|
||||
self.socketio.emit('dm_delivered_info', {
|
||||
'dm_id': recent['id'],
|
||||
'attempt': recent['delivery_attempt'],
|
||||
'max_attempts': recent['delivery_max_attempts'],
|
||||
'path': discovered_path,
|
||||
}, namespace='/chat')
|
||||
logger.debug(f"Updated delivery path for dm_id={recent['id']} "
|
||||
f"with discovered path {discovered_path[:16]}")
|
||||
else:
|
||||
# PATH event had no path data — schedule delayed read from device
|
||||
recent = self.db.get_recent_delivered_dm_with_empty_path(pubkey)
|
||||
if recent:
|
||||
asyncio.ensure_future(
|
||||
self._delayed_path_backfill(recent['id'], pubkey))
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling path update: {e}")
|
||||
@@ -1313,6 +1331,53 @@ class DeviceManager:
|
||||
return (contact_out_path.lower()[:meaningful_len] ==
|
||||
configured_path['path_hex'].lower()[:meaningful_len])
|
||||
|
||||
@staticmethod
|
||||
def _extract_path_hex(out_path: str, out_path_len: int) -> str:
|
||||
"""Extract meaningful hex portion from a device contact path."""
|
||||
if out_path_len <= 0 or not out_path:
|
||||
return ''
|
||||
hop_count = out_path_len & 0x3F
|
||||
hash_size = (out_path_len >> 6) + 1
|
||||
meaningful_len = hop_count * hash_size * 2
|
||||
return out_path[:meaningful_len].lower() if meaningful_len > 0 else ''
|
||||
|
||||
async def _delayed_path_backfill(self, dm_id: int, pubkey: str, delay: float = 3.0):
|
||||
"""After a FLOOD delivery with empty path, wait and read the contact's updated path."""
|
||||
try:
|
||||
await asyncio.sleep(delay)
|
||||
if not self.mc or not self.mc.contacts:
|
||||
return
|
||||
contact = self.mc.contacts.get(pubkey)
|
||||
if not contact:
|
||||
return
|
||||
out_path = contact.get('out_path', '')
|
||||
out_path_len = contact.get('out_path_len', -1)
|
||||
path_hex = self._extract_path_hex(out_path, out_path_len)
|
||||
if not path_hex:
|
||||
logger.debug(f"Delayed path backfill: still no path for dm_id={dm_id}")
|
||||
return
|
||||
# Check DB — only update if delivery_path is still empty
|
||||
dm = self.db.get_dm_by_id(dm_id)
|
||||
if not dm or dm.get('delivery_path'):
|
||||
return # already has a path, skip
|
||||
self.db.update_dm_delivery_info(
|
||||
dm_id,
|
||||
dm.get('delivery_attempt') or 1,
|
||||
dm.get('delivery_max_attempts') or 1,
|
||||
path_hex)
|
||||
if self.socketio:
|
||||
self.socketio.emit('dm_delivered_info', {
|
||||
'dm_id': dm_id,
|
||||
'attempt': dm.get('delivery_attempt') or 1,
|
||||
'max_attempts': dm.get('delivery_max_attempts') or 1,
|
||||
'path': path_hex,
|
||||
}, namespace='/chat')
|
||||
logger.info(f"Delayed path backfill: updated dm_id={dm_id} with path {path_hex[:16]}")
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as e:
|
||||
logger.debug(f"Delayed path backfill failed for dm_id={dm_id}: {e}")
|
||||
|
||||
async def _dm_retry_task(self, dm_id: int, contact, text: str,
|
||||
timestamp: int, initial_ack: str,
|
||||
suggested_timeout: int):
|
||||
@@ -1402,16 +1467,7 @@ class DeviceManager:
|
||||
max_attempts += cfg['flood_max_retries']
|
||||
|
||||
# Track current path hex for delivery info (actual route, not label)
|
||||
def _extract_path_hex(out_path, out_path_len):
|
||||
"""Extract meaningful hex portion from device path."""
|
||||
if out_path_len <= 0 or not out_path:
|
||||
return ''
|
||||
hop_count = out_path_len & 0x3F
|
||||
hash_size = (out_path_len >> 6) + 1
|
||||
meaningful_len = hop_count * hash_size * 2
|
||||
return out_path[:meaningful_len].lower() if meaningful_len > 0 else ''
|
||||
|
||||
path_desc = _extract_path_hex(original_out_path, original_out_path_len) if has_path else ''
|
||||
path_desc = self._extract_path_hex(original_out_path, original_out_path_len) if has_path else ''
|
||||
|
||||
logger.info(f"DM retry task started: dm_id={dm_id}, scenario={scenario}, "
|
||||
f"configured_paths={len(configured_paths)}, no_auto_flood={no_auto_flood}, "
|
||||
|
||||
Reference in New Issue
Block a user