feat(dm): add delivery confirmation, retry, and receiver-side dedup

- Fix ACK handler bug: read 'code' field instead of 'expected_ack'
- Add DM retry (up to 3 attempts) with same timestamp for receiver dedup
- Add receiver-side dedup in _on_dm_received() (sender_timestamp or time-window)
- Add PATH_UPDATE as backup delivery signal for flood DMs
- Track pending acks with dm_id for proper ACK→DM linkage
- Return dm_id and expected_ack from POST /dm/messages API
- Add find_dm_duplicate() and get_dm_by_id() database helpers

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-07 07:02:58 +01:00
parent c1b0085710
commit 5b757e9548
4 changed files with 258 additions and 40 deletions

View File

@@ -7,6 +7,7 @@ Synchronous wrapper with WAL mode. Thread-safe via connection-per-call pattern.
import sqlite3
import shutil
import logging
import time
from pathlib import Path
from contextlib import contextmanager
from datetime import datetime, timedelta
@@ -355,6 +356,43 @@ class Database:
).fetchone()
return dict(row) if row else None
def get_dm_by_id(self, dm_id: int) -> Optional[Dict]:
"""Fetch a direct message by its ID."""
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM direct_messages WHERE id = ?", (dm_id,)
).fetchone()
return dict(row) if row else None
def find_dm_duplicate(self, contact_pubkey: str, content: str,
sender_timestamp: int = None,
window_seconds: int = 300) -> Optional[Dict]:
"""Check for duplicate incoming DM (for receiver-side dedup).
If sender_timestamp is provided, matches exact (sender, timestamp, text).
Otherwise falls back to time-window match (same sender + text within window).
"""
contact_pubkey = contact_pubkey.lower()
with self._connect() as conn:
if sender_timestamp is not None:
row = conn.execute(
"""SELECT id FROM direct_messages
WHERE contact_pubkey = ? AND direction = 'in'
AND content = ? AND sender_timestamp = ?
LIMIT 1""",
(contact_pubkey, content, sender_timestamp)
).fetchone()
else:
cutoff = int(time.time()) - window_seconds
row = conn.execute(
"""SELECT id FROM direct_messages
WHERE contact_pubkey = ? AND direction = 'in'
AND content = ? AND timestamp > ?
LIMIT 1""",
(contact_pubkey, content, cutoff)
).fetchone()
return dict(row) if row else None
# ================================================================
# Echoes
# ================================================================

View File

@@ -52,6 +52,8 @@ class DeviceManager:
self._max_channels = 8 # updated from device_info at connect
self._pending_echo = None # {'timestamp': float, 'channel_idx': int, 'msg_id': int, 'pkt_payload': str|None}
self._echo_lock = threading.Lock()
self._pending_acks = {} # {ack_code_hex: dm_id} — maps retry acks to DM
self._retry_tasks = {} # {dm_id: asyncio.Task} — active retry coroutines
@property
def is_connected(self) -> bool:
@@ -391,6 +393,20 @@ class DeviceManager:
full_key = contact.get('public_key', '')
if full_key:
sender_key = full_key
# Receiver-side dedup: skip duplicate retries
sender_ts = data.get('sender_timestamp')
if sender_key and content:
if sender_ts:
existing = self.db.find_dm_duplicate(sender_key, content,
sender_timestamp=sender_ts)
else:
existing = self.db.find_dm_duplicate(sender_key, content,
window_seconds=300)
if existing:
logger.info(f"DM dedup: skipping retry from {sender_key[:8]}...")
return
if sender_key:
# Only upsert with name if we have a real name (not just a prefix)
self.db.upsert_contact(
@@ -445,25 +461,39 @@ class DeviceManager:
"""Handle ACK (delivery confirmation for DM)."""
try:
data = getattr(event, 'payload', {})
expected_ack = _to_str(data.get('expected_ack'))
# FIX: ACK event payload uses 'code', not 'expected_ack'
ack_code = _to_str(data.get('code', data.get('expected_ack')))
if expected_ack:
self.db.insert_ack(
expected_ack=expected_ack,
snr=data.get('snr'),
rssi=data.get('rssi'),
route_type=data.get('route_type', ''),
)
if not ack_code:
return
logger.info(f"ACK received: {expected_ack}")
# Check if this ACK belongs to a pending DM retry
dm_id = self._pending_acks.get(ack_code)
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': expected_ack,
'snr': data.get('snr'),
'rssi': data.get('rssi'),
'route_type': data.get('route_type', ''),
}, namespace='/chat')
# Only store if not already stored (retry task may have handled it)
existing = self.db.get_ack_for_dm(ack_code)
if existing:
return
self.db.insert_ack(
expected_ack=ack_code,
snr=data.get('snr'),
rssi=data.get('rssi'),
route_type=data.get('route_type', ''),
dm_id=dm_id,
)
logger.info(f"ACK received: {ack_code}" +
(f" (dm_id={dm_id})" if dm_id else ""))
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': ack_code,
'dm_id': dm_id,
'snr': data.get('snr'),
'rssi': data.get('rssi'),
'route_type': data.get('route_type', ''),
}, namespace='/chat')
except Exception as e:
logger.error(f"Error handling ACK: {e}")
@@ -526,19 +556,47 @@ class DeviceManager:
logger.error(f"Error handling advertisement: {e}")
async def _on_path_update(self, event):
"""Handle path update for a contact."""
"""Handle path update for a contact.
Also serves as backup delivery confirmation: when firmware sends
piggybacked ACK via flood, it fires both ACK and PATH_UPDATE events.
If the ACK event was missed, PATH_UPDATE can confirm delivery.
"""
try:
data = getattr(event, 'payload', {})
pubkey = data.get('public_key', '')
if pubkey:
self.db.insert_path(
contact_pubkey=pubkey,
path=data.get('path', ''),
snr=data.get('snr'),
path_len=data.get('path_len'),
)
logger.debug(f"Path update for {pubkey[:8]}...")
if not pubkey:
return
# Store path record (existing behavior)
self.db.insert_path(
contact_pubkey=pubkey,
path=data.get('path', ''),
snr=data.get('snr'),
path_len=data.get('path_len'),
)
logger.debug(f"Path update for {pubkey[:8]}...")
# Backup: check for pending DM to this contact
for ack_code, dm_id in list(self._pending_acks.items()):
dm = self.db.get_dm_by_id(dm_id)
if dm and dm.get('contact_pubkey') == pubkey and dm.get('direction') == 'out':
existing_ack = self.db.get_ack_for_dm(ack_code)
if not existing_ack:
self.db.insert_ack(
expected_ack=ack_code,
route_type='PATH_FLOOD',
dm_id=dm_id,
)
logger.info(f"PATH delivery confirmed for dm_id={dm_id}")
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': ack_code,
'dm_id': dm_id,
'route_type': 'PATH_FLOOD',
}, namespace='/chat')
break # Only confirm the most recent pending DM to this contact
except Exception as e:
logger.error(f"Error handling path update: {e}")
@@ -759,7 +817,7 @@ class DeviceManager:
return {'success': False, 'error': str(e)}
def send_dm(self, recipient_pubkey: str, text: str) -> Dict:
"""Send a direct message. Returns result dict."""
"""Send a direct message with background retry. Returns result dict."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
@@ -772,21 +830,47 @@ class DeviceManager:
if not contact:
return {'success': False, 'error': f'Contact not found: {recipient_pubkey}'}
event = self.execute(self.mc.commands.send_msg(contact, text))
# Generate timestamp once — same for all retries (enables receiver dedup)
timestamp = int(time.time())
# Store sent DM in database
ts = int(time.time())
event = self.execute(
self.mc.commands.send_msg(contact, text,
timestamp=timestamp, attempt=0)
)
from meshcore.events import EventType
event_data = getattr(event, 'payload', {})
if event.type == EventType.ERROR:
return {'success': False, 'error': 'Device error sending DM'}
ack = _to_str(event_data.get('expected_ack'))
suggested_timeout = event_data.get('suggested_timeout', 15000)
# Store sent DM in database (single record, not per-retry)
dm_id = self.db.insert_direct_message(
contact_pubkey=recipient_pubkey.lower(),
direction='out',
content=text,
timestamp=ts,
timestamp=timestamp,
expected_ack=ack or None,
pkt_payload=_to_str(event_data.get('pkt_payload')) or None,
)
# Register ack → dm_id mapping for _on_ack handler
if ack:
self._pending_acks[ack] = dm_id
# Launch background retry task
task = asyncio.run_coroutine_threadsafe(
self._dm_retry_task(
dm_id, contact, text, timestamp,
ack, suggested_timeout
),
self._loop
)
self._retry_tasks[dm_id] = task
return {
'success': True,
'message': 'DM sent',
@@ -798,6 +882,100 @@ class DeviceManager:
logger.error(f"Failed to send DM: {e}")
return {'success': False, 'error': str(e)}
async def _dm_retry_task(self, dm_id: int, contact, text: str,
timestamp: int, initial_ack: str,
suggested_timeout: int, max_attempts: int = 3):
"""Background retry with same timestamp for dedup on receiver."""
from meshcore.events import EventType
wait_s = max(suggested_timeout / 1000 * 1.2, 5.0)
# Wait for ACK on initial send
if initial_ack:
ack_event = await self.mc.dispatcher.wait_for_event(
EventType.ACK,
attribute_filters={"code": initial_ack},
timeout=wait_s
)
if ack_event:
self._confirm_delivery(dm_id, initial_ack, ack_event)
return
# Retry with same timestamp, incrementing attempt
for attempt in range(1, max_attempts):
# After 2 failed direct attempts, reset path to flood
if attempt >= 2:
try:
await self.mc.commands.reset_path(contact)
logger.info(f"DM retry {attempt}: reset path to flood")
except Exception:
pass
try:
result = await self.mc.commands.send_msg(
contact, text, timestamp=timestamp, attempt=attempt
)
except Exception as e:
logger.warning(f"DM retry {attempt}/{max_attempts}: send error: {e}")
continue
if result.type == EventType.ERROR:
logger.warning(f"DM retry {attempt}/{max_attempts}: device error")
continue
retry_ack = _to_str(result.payload.get('expected_ack'))
if retry_ack:
self._pending_acks[retry_ack] = dm_id
new_timeout = result.payload.get('suggested_timeout', suggested_timeout)
wait_s = max(new_timeout / 1000 * 1.2, 5.0)
if retry_ack:
ack_event = await self.mc.dispatcher.wait_for_event(
EventType.ACK,
attribute_filters={"code": retry_ack},
timeout=wait_s
)
if ack_event:
self._confirm_delivery(dm_id, retry_ack, ack_event)
return
logger.warning(f"DM retry exhausted ({max_attempts} attempts) for dm_id={dm_id}")
# Cleanup stale pending acks for this DM
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale:
self._pending_acks.pop(k, None)
self._retry_tasks.pop(dm_id, None)
def _confirm_delivery(self, dm_id: int, ack_code: str, ack_event):
"""Store ACK and notify frontend."""
data = getattr(ack_event, 'payload', {})
# Only store if not already stored by _on_ack handler
existing = self.db.get_ack_for_dm(ack_code)
if not existing:
self.db.insert_ack(
expected_ack=ack_code,
snr=data.get('snr'),
rssi=data.get('rssi'),
route_type=data.get('route_type', ''),
dm_id=dm_id,
)
logger.info(f"DM delivery confirmed: dm_id={dm_id}, ack={ack_code}")
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': ack_code,
'dm_id': dm_id,
'snr': data.get('snr'),
}, namespace='/chat')
# Cleanup pending acks for this DM
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale:
self._pending_acks.pop(k, None)
self._retry_tasks.pop(dm_id, None)
def get_contacts_from_device(self) -> List[Dict]:
"""Refresh contacts from device and return the list."""
if not self.is_connected:

View File

@@ -367,12 +367,12 @@ def floodadv() -> Tuple[bool, str]:
# Direct Messages
# =============================================================================
def send_dm(recipient: str, text: str) -> Tuple[bool, str]:
"""Send a direct message."""
def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]:
"""Send a direct message. Returns (success, result_dict)."""
if not recipient or not recipient.strip():
return False, "Recipient is required"
return False, {'error': "Recipient is required"}
if not text or not text.strip():
return False, "Message text is required"
return False, {'error': "Message text is required"}
try:
dm = _get_dm()
@@ -387,9 +387,9 @@ def send_dm(recipient: str, text: str) -> Tuple[bool, str]:
pubkey = recipient.strip()
result = dm.send_dm(pubkey, text.strip())
return result['success'], result.get('message', result.get('error', ''))
return result['success'], result
except Exception as e:
return False, str(e)
return False, {'error': str(e)}
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:

View File

@@ -1894,7 +1894,7 @@ def get_dm_messages():
ack_info = acks[ack_code]
msg['status'] = 'delivered'
msg['delivery_snr'] = ack_info.get('snr')
msg['delivery_route'] = ack_info.get('route')
msg['delivery_route'] = ack_info.get('route_type', ack_info.get('route'))
except Exception as e:
logger.debug(f"ACK status fetch failed (non-critical): {e}")
@@ -1965,19 +1965,21 @@ def send_dm_message():
}), 400
# Send via CLI
success, message = cli.send_dm(recipient, text)
success, result = cli.send_dm(recipient, text)
if success:
return jsonify({
'success': True,
'message': 'DM sent',
'recipient': recipient,
'status': 'pending'
'status': 'pending',
'dm_id': result.get('id'),
'expected_ack': result.get('expected_ack'),
}), 200
else:
return jsonify({
'success': False,
'error': message
'error': result.get('error', 'Send failed')
}), 500
except Exception as e: