diff --git a/app/database.py b/app/database.py index 7a61e4d..e27a3d4 100644 --- a/app/database.py +++ b/app/database.py @@ -7,6 +7,7 @@ Synchronous wrapper with WAL mode. Thread-safe via connection-per-call pattern. import sqlite3 import shutil import logging +import time from pathlib import Path from contextlib import contextmanager from datetime import datetime, timedelta @@ -355,6 +356,43 @@ class Database: ).fetchone() return dict(row) if row else None + def get_dm_by_id(self, dm_id: int) -> Optional[Dict]: + """Fetch a direct message by its ID.""" + with self._connect() as conn: + row = conn.execute( + "SELECT * FROM direct_messages WHERE id = ?", (dm_id,) + ).fetchone() + return dict(row) if row else None + + def find_dm_duplicate(self, contact_pubkey: str, content: str, + sender_timestamp: int = None, + window_seconds: int = 300) -> Optional[Dict]: + """Check for duplicate incoming DM (for receiver-side dedup). + + If sender_timestamp is provided, matches exact (sender, timestamp, text). + Otherwise falls back to time-window match (same sender + text within window). + """ + contact_pubkey = contact_pubkey.lower() + with self._connect() as conn: + if sender_timestamp is not None: + row = conn.execute( + """SELECT id FROM direct_messages + WHERE contact_pubkey = ? AND direction = 'in' + AND content = ? AND sender_timestamp = ? + LIMIT 1""", + (contact_pubkey, content, sender_timestamp) + ).fetchone() + else: + cutoff = int(time.time()) - window_seconds + row = conn.execute( + """SELECT id FROM direct_messages + WHERE contact_pubkey = ? AND direction = 'in' + AND content = ? AND timestamp > ? + LIMIT 1""", + (contact_pubkey, content, cutoff) + ).fetchone() + return dict(row) if row else None + # ================================================================ # Echoes # ================================================================ diff --git a/app/device_manager.py b/app/device_manager.py index 8c1b1e8..53298c3 100644 --- a/app/device_manager.py +++ b/app/device_manager.py @@ -52,6 +52,8 @@ class DeviceManager: self._max_channels = 8 # updated from device_info at connect self._pending_echo = None # {'timestamp': float, 'channel_idx': int, 'msg_id': int, 'pkt_payload': str|None} self._echo_lock = threading.Lock() + self._pending_acks = {} # {ack_code_hex: dm_id} — maps retry acks to DM + self._retry_tasks = {} # {dm_id: asyncio.Task} — active retry coroutines @property def is_connected(self) -> bool: @@ -391,6 +393,20 @@ class DeviceManager: full_key = contact.get('public_key', '') if full_key: sender_key = full_key + + # Receiver-side dedup: skip duplicate retries + sender_ts = data.get('sender_timestamp') + if sender_key and content: + if sender_ts: + existing = self.db.find_dm_duplicate(sender_key, content, + sender_timestamp=sender_ts) + else: + existing = self.db.find_dm_duplicate(sender_key, content, + window_seconds=300) + if existing: + logger.info(f"DM dedup: skipping retry from {sender_key[:8]}...") + return + if sender_key: # Only upsert with name if we have a real name (not just a prefix) self.db.upsert_contact( @@ -445,25 +461,39 @@ class DeviceManager: """Handle ACK (delivery confirmation for DM).""" try: data = getattr(event, 'payload', {}) - expected_ack = _to_str(data.get('expected_ack')) + # FIX: ACK event payload uses 'code', not 'expected_ack' + ack_code = _to_str(data.get('code', data.get('expected_ack'))) - if expected_ack: - self.db.insert_ack( - expected_ack=expected_ack, - snr=data.get('snr'), - rssi=data.get('rssi'), - route_type=data.get('route_type', ''), - ) + if not ack_code: + return - logger.info(f"ACK received: {expected_ack}") + # Check if this ACK belongs to a pending DM retry + dm_id = self._pending_acks.get(ack_code) - if self.socketio: - self.socketio.emit('ack', { - 'expected_ack': expected_ack, - 'snr': data.get('snr'), - 'rssi': data.get('rssi'), - 'route_type': data.get('route_type', ''), - }, namespace='/chat') + # Only store if not already stored (retry task may have handled it) + existing = self.db.get_ack_for_dm(ack_code) + if existing: + return + + self.db.insert_ack( + expected_ack=ack_code, + snr=data.get('snr'), + rssi=data.get('rssi'), + route_type=data.get('route_type', ''), + dm_id=dm_id, + ) + + logger.info(f"ACK received: {ack_code}" + + (f" (dm_id={dm_id})" if dm_id else "")) + + if self.socketio: + self.socketio.emit('ack', { + 'expected_ack': ack_code, + 'dm_id': dm_id, + 'snr': data.get('snr'), + 'rssi': data.get('rssi'), + 'route_type': data.get('route_type', ''), + }, namespace='/chat') except Exception as e: logger.error(f"Error handling ACK: {e}") @@ -526,19 +556,47 @@ class DeviceManager: logger.error(f"Error handling advertisement: {e}") async def _on_path_update(self, event): - """Handle path update for a contact.""" + """Handle path update for a contact. + + Also serves as backup delivery confirmation: when firmware sends + piggybacked ACK via flood, it fires both ACK and PATH_UPDATE events. + If the ACK event was missed, PATH_UPDATE can confirm delivery. + """ try: data = getattr(event, 'payload', {}) pubkey = data.get('public_key', '') - if pubkey: - self.db.insert_path( - contact_pubkey=pubkey, - path=data.get('path', ''), - snr=data.get('snr'), - path_len=data.get('path_len'), - ) - logger.debug(f"Path update for {pubkey[:8]}...") + if not pubkey: + return + + # Store path record (existing behavior) + self.db.insert_path( + contact_pubkey=pubkey, + path=data.get('path', ''), + snr=data.get('snr'), + path_len=data.get('path_len'), + ) + logger.debug(f"Path update for {pubkey[:8]}...") + + # Backup: check for pending DM to this contact + for ack_code, dm_id in list(self._pending_acks.items()): + dm = self.db.get_dm_by_id(dm_id) + if dm and dm.get('contact_pubkey') == pubkey and dm.get('direction') == 'out': + existing_ack = self.db.get_ack_for_dm(ack_code) + if not existing_ack: + self.db.insert_ack( + expected_ack=ack_code, + route_type='PATH_FLOOD', + dm_id=dm_id, + ) + logger.info(f"PATH delivery confirmed for dm_id={dm_id}") + if self.socketio: + self.socketio.emit('ack', { + 'expected_ack': ack_code, + 'dm_id': dm_id, + 'route_type': 'PATH_FLOOD', + }, namespace='/chat') + break # Only confirm the most recent pending DM to this contact except Exception as e: logger.error(f"Error handling path update: {e}") @@ -759,7 +817,7 @@ class DeviceManager: return {'success': False, 'error': str(e)} def send_dm(self, recipient_pubkey: str, text: str) -> Dict: - """Send a direct message. Returns result dict.""" + """Send a direct message with background retry. Returns result dict.""" if not self.is_connected: return {'success': False, 'error': 'Device not connected'} @@ -772,21 +830,47 @@ class DeviceManager: if not contact: return {'success': False, 'error': f'Contact not found: {recipient_pubkey}'} - event = self.execute(self.mc.commands.send_msg(contact, text)) + # Generate timestamp once — same for all retries (enables receiver dedup) + timestamp = int(time.time()) - # Store sent DM in database - ts = int(time.time()) + event = self.execute( + self.mc.commands.send_msg(contact, text, + timestamp=timestamp, attempt=0) + ) + + from meshcore.events import EventType event_data = getattr(event, 'payload', {}) + + if event.type == EventType.ERROR: + return {'success': False, 'error': 'Device error sending DM'} + ack = _to_str(event_data.get('expected_ack')) + suggested_timeout = event_data.get('suggested_timeout', 15000) + + # Store sent DM in database (single record, not per-retry) dm_id = self.db.insert_direct_message( contact_pubkey=recipient_pubkey.lower(), direction='out', content=text, - timestamp=ts, + timestamp=timestamp, expected_ack=ack or None, pkt_payload=_to_str(event_data.get('pkt_payload')) or None, ) + # Register ack → dm_id mapping for _on_ack handler + if ack: + self._pending_acks[ack] = dm_id + + # Launch background retry task + task = asyncio.run_coroutine_threadsafe( + self._dm_retry_task( + dm_id, contact, text, timestamp, + ack, suggested_timeout + ), + self._loop + ) + self._retry_tasks[dm_id] = task + return { 'success': True, 'message': 'DM sent', @@ -798,6 +882,100 @@ class DeviceManager: logger.error(f"Failed to send DM: {e}") return {'success': False, 'error': str(e)} + async def _dm_retry_task(self, dm_id: int, contact, text: str, + timestamp: int, initial_ack: str, + suggested_timeout: int, max_attempts: int = 3): + """Background retry with same timestamp for dedup on receiver.""" + from meshcore.events import EventType + + wait_s = max(suggested_timeout / 1000 * 1.2, 5.0) + + # Wait for ACK on initial send + if initial_ack: + ack_event = await self.mc.dispatcher.wait_for_event( + EventType.ACK, + attribute_filters={"code": initial_ack}, + timeout=wait_s + ) + if ack_event: + self._confirm_delivery(dm_id, initial_ack, ack_event) + return + + # Retry with same timestamp, incrementing attempt + for attempt in range(1, max_attempts): + # After 2 failed direct attempts, reset path to flood + if attempt >= 2: + try: + await self.mc.commands.reset_path(contact) + logger.info(f"DM retry {attempt}: reset path to flood") + except Exception: + pass + + try: + result = await self.mc.commands.send_msg( + contact, text, timestamp=timestamp, attempt=attempt + ) + except Exception as e: + logger.warning(f"DM retry {attempt}/{max_attempts}: send error: {e}") + continue + + if result.type == EventType.ERROR: + logger.warning(f"DM retry {attempt}/{max_attempts}: device error") + continue + + retry_ack = _to_str(result.payload.get('expected_ack')) + if retry_ack: + self._pending_acks[retry_ack] = dm_id + new_timeout = result.payload.get('suggested_timeout', suggested_timeout) + wait_s = max(new_timeout / 1000 * 1.2, 5.0) + + if retry_ack: + ack_event = await self.mc.dispatcher.wait_for_event( + EventType.ACK, + attribute_filters={"code": retry_ack}, + timeout=wait_s + ) + if ack_event: + self._confirm_delivery(dm_id, retry_ack, ack_event) + return + + logger.warning(f"DM retry exhausted ({max_attempts} attempts) for dm_id={dm_id}") + # Cleanup stale pending acks for this DM + stale = [k for k, v in self._pending_acks.items() if v == dm_id] + for k in stale: + self._pending_acks.pop(k, None) + self._retry_tasks.pop(dm_id, None) + + def _confirm_delivery(self, dm_id: int, ack_code: str, ack_event): + """Store ACK and notify frontend.""" + data = getattr(ack_event, 'payload', {}) + + # Only store if not already stored by _on_ack handler + existing = self.db.get_ack_for_dm(ack_code) + if not existing: + self.db.insert_ack( + expected_ack=ack_code, + snr=data.get('snr'), + rssi=data.get('rssi'), + route_type=data.get('route_type', ''), + dm_id=dm_id, + ) + + logger.info(f"DM delivery confirmed: dm_id={dm_id}, ack={ack_code}") + + if self.socketio: + self.socketio.emit('ack', { + 'expected_ack': ack_code, + 'dm_id': dm_id, + 'snr': data.get('snr'), + }, namespace='/chat') + + # Cleanup pending acks for this DM + stale = [k for k, v in self._pending_acks.items() if v == dm_id] + for k in stale: + self._pending_acks.pop(k, None) + self._retry_tasks.pop(dm_id, None) + def get_contacts_from_device(self) -> List[Dict]: """Refresh contacts from device and return the list.""" if not self.is_connected: diff --git a/app/meshcore/cli.py b/app/meshcore/cli.py index 4595c40..3d8ddea 100644 --- a/app/meshcore/cli.py +++ b/app/meshcore/cli.py @@ -367,12 +367,12 @@ def floodadv() -> Tuple[bool, str]: # Direct Messages # ============================================================================= -def send_dm(recipient: str, text: str) -> Tuple[bool, str]: - """Send a direct message.""" +def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]: + """Send a direct message. Returns (success, result_dict).""" if not recipient or not recipient.strip(): - return False, "Recipient is required" + return False, {'error': "Recipient is required"} if not text or not text.strip(): - return False, "Message text is required" + return False, {'error': "Message text is required"} try: dm = _get_dm() @@ -387,9 +387,9 @@ def send_dm(recipient: str, text: str) -> Tuple[bool, str]: pubkey = recipient.strip() result = dm.send_dm(pubkey, text.strip()) - return result['success'], result.get('message', result.get('error', '')) + return result['success'], result except Exception as e: - return False, str(e) + return False, {'error': str(e)} def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]: diff --git a/app/routes/api.py b/app/routes/api.py index 0a6c303..e0213e6 100644 --- a/app/routes/api.py +++ b/app/routes/api.py @@ -1894,7 +1894,7 @@ def get_dm_messages(): ack_info = acks[ack_code] msg['status'] = 'delivered' msg['delivery_snr'] = ack_info.get('snr') - msg['delivery_route'] = ack_info.get('route') + msg['delivery_route'] = ack_info.get('route_type', ack_info.get('route')) except Exception as e: logger.debug(f"ACK status fetch failed (non-critical): {e}") @@ -1965,19 +1965,21 @@ def send_dm_message(): }), 400 # Send via CLI - success, message = cli.send_dm(recipient, text) + success, result = cli.send_dm(recipient, text) if success: return jsonify({ 'success': True, 'message': 'DM sent', 'recipient': recipient, - 'status': 'pending' + 'status': 'pending', + 'dm_id': result.get('id'), + 'expected_ack': result.get('expected_ack'), }), 200 else: return jsonify({ 'success': False, - 'error': message + 'error': result.get('error', 'Send failed') }), 500 except Exception as e: