""" REST API endpoints for mc-webui """ import hashlib import hmac as hmac_mod import logging import json import re import base64 import struct import threading import time import requests from Crypto.Cipher import AES from datetime import datetime from io import BytesIO from pathlib import Path from flask import Blueprint, jsonify, request, send_file, current_app from app.meshcore import cli, parser from app.meshcore.regions import derive_scope_key_hex, is_valid_region_name from app.config import config, runtime_config from app.device_manager import decode_path_len, LETSMESH_ANALYZER_URL_TEMPLATE from app.archiver import manager as archive_manager from app.contacts_cache import get_all_names, get_all_contacts logger = logging.getLogger(__name__) def _get_db(): """Get Database instance from app context.""" return getattr(current_app, 'db', None) def _get_dm(): """Get DeviceManager instance from app context.""" return getattr(current_app, 'device_manager', None) api_bp = Blueprint('api', __name__, url_prefix='/api') # Simple cache for get_channels() to reduce USB/meshcli calls # Channels don't change frequently, so caching for 30s is safe _channels_cache = None _channels_cache_timestamp = 0 CHANNELS_CACHE_TTL = 30 # seconds # Serializes channel create/join to avoid duplicates from concurrent requests. # Without this, two near-simultaneous POSTs both find the same "first free" # slot (or adjacent slots) and each succeeds, producing duplicate entries. _channel_write_lock = threading.Lock() # Cache for contacts/detailed to reduce USB calls (4 calls per request!) # Contacts change infrequently, 60s cache is safe _contacts_detailed_cache = None _contacts_detailed_cache_timestamp = 0 CONTACTS_DETAILED_CACHE_TTL = 60 # seconds GRP_TXT_TYPE_BYTE = 0x05 ANALYZER_PLACEHOLDER = '{packetHash}' def compute_packet_hash(pkt_payload): """Compute MeshCore Analyzer packet hash (16 uppercase hex chars) from a hex-encoded pkt_payload.""" try: raw = bytes([GRP_TXT_TYPE_BYTE]) + bytes.fromhex(pkt_payload) return hashlib.sha256(raw).hexdigest()[:16].upper() except (ValueError, TypeError): return None def compute_pkt_payload(channel_secret_hex, sender_timestamp, txt_type, text, attempt=0): """Compute pkt_payload from message data + channel secret. Reconstructs the encrypted GRP_TXT payload: channel_hash(1) + HMAC-MAC(2) + AES-128-ECB(plaintext) where plaintext = timestamp(4 LE) + flags(1) + text(UTF-8) [+ null + zero-pad]. Firmware omits null+padding when header+text exactly fills an AES block boundary. """ secret = bytes.fromhex(channel_secret_hex) flags = ((txt_type & 0x3F) << 2) | (attempt & 0x03) core = struct.pack(' list: """ Get list of protected contact public keys from database. Returns: List of public_key strings (64 hex chars, lowercase) """ db = _get_db() if db: return list(db.get_protected_keys()) return [] # ============================================================================= # Cleanup Settings Management # ============================================================================= CLEANUP_DEFAULTS = { 'enabled': False, 'types': [1, 2, 3, 4], 'date_field': 'last_advert', 'days': 30, 'name_filter': '', 'hour': 1 } RETENTION_DEFAULTS = { 'enabled': True, 'days': 90, # channel_messages 'days_dms': 90, # direct_messages 'days_adverts': 60, # advertisements 'days_diagnostics': 30, # echoes, paths, acks (high-volume debug data) 'include_dms': True, 'include_adverts': True, 'include_diagnostics': True, 'hour': 3 } def get_cleanup_settings() -> dict: """Get auto-cleanup settings from database.""" db = _get_db() if db: saved = db.get_setting_json('cleanup_settings', {}) return {**CLEANUP_DEFAULTS, **saved} return dict(CLEANUP_DEFAULTS) def save_cleanup_settings(cleanup_settings: dict) -> bool: """Save auto-cleanup settings to database.""" db = _get_db() if not db: return False try: db.set_setting_json('cleanup_settings', cleanup_settings) return True except Exception as e: logger.error(f"Failed to save cleanup settings: {e}") return False def get_retention_settings() -> dict: """Get message retention settings from database.""" db = _get_db() if db: saved = db.get_setting_json('retention_settings', {}) return {**RETENTION_DEFAULTS, **saved} return dict(RETENTION_DEFAULTS) def save_retention_settings(retention_settings: dict) -> bool: """Save message retention settings to database.""" db = _get_db() if not db: return False try: db.set_setting_json('retention_settings', retention_settings) return True except Exception as e: logger.error(f"Failed to save retention settings: {e}") return False # ============================================================================= # DM Retry Settings # ============================================================================= DM_RETRY_DEFAULTS = { 'direct_max_retries': 3, # DIRECT retries before switching to FLOOD 'direct_flood_retries': 1, # FLOOD retries after DIRECT exhausted 'flood_max_retries': 3, # FLOOD retries when no path known 'direct_interval': 30, # seconds between DIRECT retries 'flood_interval': 60, # seconds between FLOOD retries 'grace_period': 60, # seconds to wait for late ACKs after exhaustion } CHAT_SETTINGS_DEFAULTS = { 'quote_max_bytes': 20, # max UTF-8 bytes for truncated quote 'path_popup_timeout_sec': 8, # auto-close timeout for route popup (channel + DM) 'path_popup_no_autoclose': False, # when True, route popup stays open until click-outside } # Keys in chat_settings that carry a boolean value CHAT_SETTINGS_BOOL_KEYS = {'path_popup_no_autoclose'} # ============================================================================= # UI Settings (app-wide interface behavior — toast notifications, etc.) # ============================================================================= UI_SETTINGS_DEFAULTS = { 'toast_timeout_sec': 2.0, # auto-hide delay for notification toasts 'toast_no_autoclose': False, # when True, toasts stay until dismissed 'toast_position': 'top-left', # one of TOAST_POSITIONS } TOAST_POSITIONS = {'top-left', 'top-right', 'bottom-left', 'bottom-right', 'center'} def get_dm_retry_settings() -> dict: """Get DM retry settings from database.""" db = _get_db() if db: saved = db.get_setting_json('dm_retry_settings', {}) return {**DM_RETRY_DEFAULTS, **saved} return dict(DM_RETRY_DEFAULTS) def save_dm_retry_settings(settings: dict) -> bool: """Save DM retry settings to database.""" db = _get_db() if not db: return False try: db.set_setting_json('dm_retry_settings', settings) return True except Exception as e: logger.error(f"Failed to save DM retry settings: {e}") return False def get_chat_settings() -> dict: """Get chat settings from database.""" db = _get_db() if db: saved = db.get_setting_json('chat_settings', {}) return {**CHAT_SETTINGS_DEFAULTS, **saved} return dict(CHAT_SETTINGS_DEFAULTS) def save_chat_settings(settings: dict) -> bool: """Save chat settings to database.""" db = _get_db() if not db: return False try: db.set_setting_json('chat_settings', settings) return True except Exception as e: logger.error(f"Failed to save chat settings: {e}") return False def get_ui_settings() -> dict: """Get UI (interface) settings from database.""" db = _get_db() if db: saved = db.get_setting_json('ui_settings', {}) return {**UI_SETTINGS_DEFAULTS, **saved} return dict(UI_SETTINGS_DEFAULTS) def save_ui_settings(settings: dict) -> bool: """Save UI settings to database.""" db = _get_db() if not db: return False try: db.set_setting_json('ui_settings', settings) return True except Exception as e: logger.error(f"Failed to save UI settings: {e}") return False @api_bp.route('/messages', methods=['GET']) def get_messages(): """ Get list of messages from specific channel or archive. Query parameters: limit (int): Maximum number of messages to return offset (int): Number of messages to skip from the end archive_date (str): View archive for specific date (YYYY-MM-DD format) days (int): Show only messages from last N days (live view only) channel_idx (int): Filter by channel index (optional) Returns: JSON with messages list """ try: limit = request.args.get('limit', type=int) offset = request.args.get('offset', default=0, type=int) archive_date = request.args.get('archive_date', type=str) days = request.args.get('days', type=int) channel_idx = request.args.get('channel_idx', type=int) # Validate archive_date format if provided if archive_date: try: datetime.strptime(archive_date, '%Y-%m-%d') except ValueError: return jsonify({ 'success': False, 'error': f'Invalid date format: {archive_date}. Expected YYYY-MM-DD' }), 400 # v2: Read messages from Database db = _get_db() if db: if archive_date: # Archive view: query by specific date db_messages = db.get_channel_messages_by_date( date_str=archive_date, channel_idx=channel_idx, ) else: # Live view: query with limit/offset/days ch = channel_idx if channel_idx is not None else 0 db_messages = db.get_channel_messages( channel_idx=ch, limit=limit or 50, offset=offset, days=days, ) # Build channel secret lookup for pkt_payload computation # Use DB channels (fast) instead of get_channels_cached() which # can block on device communication when cache is cold channel_secrets = {} db_channels = db.get_channels() if db else [] for ch_info in db_channels: ch_key = ch_info.get('secret', ch_info.get('key', '')) ch_idx = ch_info.get('idx', ch_info.get('index')) if ch_key and ch_idx is not None: channel_secrets[ch_idx] = ch_key # Convert DB rows to frontend-compatible format messages = [] for row in db_messages: pkt_payload = row.get('pkt_payload') ch_idx = row.get('channel_idx', 0) sender_ts = row.get('sender_timestamp') txt_type = row.get('txt_type', 0) # Compute pkt_payload if not stored (v2: meshcore doesn't provide it) if not pkt_payload and sender_ts and ch_idx in channel_secrets: # Use original text from raw_json (preserves trailing whitespace) raw_text = None raw_json_str = row.get('raw_json') if raw_json_str: try: raw_text = json.loads(raw_json_str).get('text') except (json.JSONDecodeError, TypeError): pass # Fallback: reconstruct from sender + content if not raw_text: is_own = bool(row.get('is_own', 0)) if is_own: device_name = runtime_config.get_device_name() or '' raw_text = f"{device_name}: {row.get('content', '')}" if device_name else row.get('content', '') else: sender = row.get('sender', '') raw_text = f"{sender}: {row.get('content', '')}" if sender else row.get('content', '') pkt_payload = compute_pkt_payload( channel_secrets[ch_idx], sender_ts, txt_type, raw_text ) # Decode path_len into hop_count and path_hash_size path_len_raw = row.get('path_len') hop_count = None path_hash_size = 1 if path_len_raw is not None: hop_count, path_hash_size, _ = decode_path_len(path_len_raw) msg = { 'id': row.get('id'), 'sender': row.get('sender', ''), 'content': row.get('content', ''), 'timestamp': row.get('timestamp', 0), 'datetime': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None, 'is_own': bool(row.get('is_own', 0)), 'snr': row.get('snr'), 'path_len': path_len_raw, 'hop_count': hop_count, 'path_hash_size': path_hash_size, 'channel_idx': ch_idx, 'sender_timestamp': sender_ts, 'txt_type': txt_type, 'raw_text': row.get('content', ''), 'pkt_payload': pkt_payload, } # Enrich with echo data and packet hash (frontend builds analyzer URL) if pkt_payload: msg['packet_hash'] = compute_packet_hash(pkt_payload) echoes = db.get_echoes_for_message(pkt_payload) if echoes: msg['echo_count'] = len(echoes) msg['echo_paths'] = [e.get('path', '') for e in echoes if e.get('path')] msg['echo_snrs'] = [e.get('snr') for e in echoes if e.get('snr') is not None] msg['echo_hash_sizes'] = [e.get('hash_size', 1) for e in echoes if e.get('path')] messages.append(msg) # Filter out blocked contacts' messages blocked_names = db.get_blocked_contact_names() if blocked_names: messages = [m for m in messages if m.get('sender', '') not in blocked_names] else: # Fallback to parser for file-based reads messages = parser.read_messages( limit=limit, offset=offset, archive_date=archive_date, days=days, channel_idx=channel_idx ) return jsonify({ 'success': True, 'count': len(messages), 'messages': messages, 'archive_date': archive_date if archive_date else None, 'channel_idx': channel_idx }), 200 except Exception as e: logger.error(f"Error fetching messages: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/messages//meta', methods=['GET']) def get_message_meta(msg_id): """Return metadata (SNR, hops, route, analyzer URL) for a single channel message.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'No database'}), 500 row = db.get_channel_message_by_id(msg_id) if not row: return jsonify({'success': False, 'error': 'Not found'}), 404 pkt_payload = row.get('pkt_payload') sender_ts = row.get('sender_timestamp') ch_idx = row.get('channel_idx', 0) txt_type = row.get('txt_type', 0) # Compute pkt_payload if not stored # Use DB channels (fast) to avoid blocking on device communication if not pkt_payload and sender_ts: db_channels = db.get_channels() if db else [] channel_secrets = {} for ch_info in db_channels: ch_key = ch_info.get('secret', ch_info.get('key', '')) ci = ch_info.get('idx', ch_info.get('index')) if ch_key and ci is not None: channel_secrets[ci] = ch_key if ch_idx in channel_secrets: raw_text = None raw_json_str = row.get('raw_json') if raw_json_str: try: raw_text = json.loads(raw_json_str).get('text') except (json.JSONDecodeError, TypeError): pass if not raw_text: is_own = bool(row.get('is_own', 0)) if is_own: device_name = runtime_config.get_device_name() or '' raw_text = f"{device_name}: {row.get('content', '')}" if device_name else row.get('content', '') else: sender = row.get('sender', '') raw_text = f"{sender}: {row.get('content', '')}" if sender else row.get('content', '') pkt_payload = compute_pkt_payload( channel_secrets[ch_idx], sender_ts, txt_type, raw_text ) # Decode path_len path_len_raw = row.get('path_len') hop_count = None path_hash_size = 1 if path_len_raw is not None: hop_count, path_hash_size, _ = decode_path_len(path_len_raw) meta = { 'success': True, 'snr': row.get('snr'), 'path_len': path_len_raw, 'hop_count': hop_count, 'path_hash_size': path_hash_size, 'pkt_payload': pkt_payload, } if pkt_payload: meta['packet_hash'] = compute_packet_hash(pkt_payload) echoes = db.get_echoes_for_message(pkt_payload) if echoes: meta['echo_count'] = len(echoes) meta['echo_paths'] = [e.get('path', '') for e in echoes if e.get('path')] meta['echo_snrs'] = [e.get('snr') for e in echoes if e.get('snr') is not None] meta['echo_hash_sizes'] = [e.get('hash_size', 1) for e in echoes if e.get('path')] return jsonify(meta) except Exception as e: logger.error(f"Error fetching message meta: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/messages//resend', methods=['POST']) def resend_channel_message(msg_id): """Raw re-broadcast of an own channel message via CMD_SEND_RAW_PACKET. Pushes the exact stored wire bytes again so repeaters that already saw the original packet dedupe it (same Mesh::hasSeen hash), while any repeaters that missed it can pick it up. Used for "I never heard echoes back" and "I want better coverage" scenarios. """ try: result = cli.resend_channel_message(msg_id) if result.get('success'): return jsonify(result), 200 err = result.get('error', 'Resend failed') # 404 for missing snapshot or unknown id, 400 for ownership/disconnect, # 500 for unexpected device errors. if 'not found' in err.lower(): return jsonify(result), 404 err_l = err.lower() if ('no raw_packet' in err_l or 'own messages' in err_l or 'not connected' in err_l or 'firmware too old' in err_l): return jsonify(result), 400 return jsonify(result), 500 except Exception as e: logger.error(f"Error resending message #{msg_id}: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/messages', methods=['POST']) def send_message(): """ Send a message to a specific channel. JSON body: text (str): Message content (required) reply_to (str): Username to reply to (optional) channel_idx (int): Channel to send to (optional, default: 0) Returns: JSON with success status """ try: data = request.get_json() if not data or 'text' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: text' }), 400 text = data['text'].strip() if not text: return jsonify({ 'success': False, 'error': 'Message text cannot be empty' }), 400 # MeshCore message length limit (~180-200 bytes for LoRa) # Count UTF-8 bytes, not Unicode characters byte_length = len(text.encode('utf-8')) if byte_length > 200: return jsonify({ 'success': False, 'error': f'Message too long ({byte_length} bytes). Maximum 200 bytes allowed due to LoRa constraints.' }), 400 reply_to = data.get('reply_to') channel_idx = data.get('channel_idx', 0) # Send message via meshcli result = cli.send_message(text, reply_to=reply_to, channel_index=channel_idx) if result['success']: # v2: Echo tracking is handled automatically by DeviceManager events return jsonify({ 'success': True, 'message': 'Message sent successfully', 'channel_idx': channel_idx, 'id': result.get('id'), 'timestamp': result.get('timestamp'), }), 200 else: return jsonify({ 'success': False, 'error': result.get('error', 'Failed to send message') }), 500 except Exception as e: logger.error(f"Error sending message: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/status', methods=['GET']) def get_status(): """ Get device connection status and basic info. Returns: JSON with status information """ try: # Check if device is accessible connected = cli.check_connection() # v2: Get message count from Database db = _get_db() message_count = 0 latest_timestamp = None if db: stats = db.get_stats() message_count = stats.get('channel_messages', 0) + stats.get('direct_messages', 0) # Get latest channel message timestamp recent = db.get_channel_messages(limit=1) if recent: latest_timestamp = recent[0].get('timestamp') else: message_count = parser.count_messages() latest = parser.get_latest_message() latest_timestamp = latest['timestamp'] if latest else None status_data = { 'success': True, 'connected': connected, 'device_name': runtime_config.get_device_name(), 'device_name_source': runtime_config.get_device_name_source(), 'transport_type': config.transport_type, 'serial_port': config.MC_SERIAL_PORT, 'message_count': message_count, 'latest_message_timestamp': latest_timestamp } if config.use_ble: status_data['ble_address'] = config.MC_BLE_ADDRESS # Feature capabilities derived from firmware version. UI uses these # to hide/disable controls that the connected device can't support. try: from flask import current_app dm = getattr(current_app, 'device_manager', None) if dm is not None: status_data['fw_ver_code'] = getattr(dm, '_fw_ver_code', None) status_data['supports_raw_resend'] = bool(getattr(dm, 'supports_raw_resend', False)) status_data['path_hash_mode'] = getattr(dm, '_path_hash_mode', None) status_data['path_hash_size'] = getattr(dm, 'path_hash_size', None) except Exception: pass return jsonify(status_data), 200 except Exception as e: logger.error(f"Error getting status: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/contacts', methods=['GET']) def get_contacts(): """ Get list of contacts from the device. Returns: JSON with list of contact names """ try: success, contacts, error = cli.get_contacts_list() if success: return jsonify({ 'success': True, 'contacts': contacts, 'count': len(contacts) }), 200 else: return jsonify({ 'success': False, 'error': error or 'Failed to get contacts', 'contacts': [] }), 500 except Exception as e: logger.error(f"Error getting contacts: {e}") return jsonify({ 'success': False, 'error': str(e), 'contacts': [] }), 500 @api_bp.route('/contacts/cached', methods=['GET']) def get_cached_contacts(): """ Get all known contacts from persistent cache (superset of device contacts). Includes contacts seen via adverts even after removal from device. Query params: ?format=names - Return just name strings for @mentions (default) ?format=full - Return full cache entries with public_key, timestamps, etc. ?format=count - Return only the count (lightweight) """ try: fmt = request.args.get('format', 'names') # v2: Read from Database (fallback to contacts_cache) db = _get_db() if db: db_contacts = db.get_contacts() if fmt == 'count': return jsonify({ 'success': True, 'count': len(db_contacts) }), 200 elif fmt == 'full': ignored_keys = db.get_ignored_keys() blocked_keys = db.get_blocked_keys() contacts = [] for c in db_contacts: pk = c.get('public_key', '') # Parse last_advert to numeric timestamp la = c.get('last_advert') try: last_advert_ts = int(la) if la else 0 except (ValueError, TypeError): last_advert_ts = 0 contacts.append({ 'public_key': pk, 'public_key_prefix': pk[:12], 'name': c.get('name', ''), 'first_seen': c.get('first_seen', ''), 'last_seen': c.get('last_seen', ''), 'last_advert': last_advert_ts, 'source': c.get('source', ''), 'adv_lat': c.get('adv_lat'), 'adv_lon': c.get('adv_lon'), 'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'), 'is_ignored': pk in ignored_keys, 'is_blocked': pk in blocked_keys, }) return jsonify({ 'success': True, 'contacts': contacts, 'count': len(contacts) }), 200 else: names = sorted(set(c.get('name', '') for c in db_contacts if c.get('name'))) return jsonify({ 'success': True, 'contacts': names, 'count': len(names) }), 200 else: # Fallback to contacts_cache if fmt == 'count': contacts = get_all_contacts() return jsonify({ 'success': True, 'count': len(contacts) }), 200 elif fmt == 'full': contacts = get_all_contacts() for c in contacts: c['public_key_prefix'] = c.get('public_key', '')[:12] return jsonify({ 'success': True, 'contacts': contacts, 'count': len(contacts) }), 200 else: names = get_all_names() return jsonify({ 'success': True, 'contacts': names, 'count': len(names) }), 200 except Exception as e: logger.error(f"Error getting cached contacts: {e}") return jsonify({ 'success': False, 'error': str(e), 'contacts': [] }), 500 def _filter_contacts_by_criteria(contacts: list, criteria: dict) -> list: """ Filter contacts based on cleanup criteria. Args: contacts: List of contact dicts from /api/contacts/detailed criteria: Filter criteria: - name_filter (str): Partial name match (empty = ignore) - types (list[int]): Contact types to include [1,2,3,4] - date_field (str): "last_advert" or "lastmod" - days (int): Days of inactivity (0 = ignore) Returns: List of contacts matching criteria (excludes protected contacts) """ name_filter = criteria.get('name_filter', '').strip().lower() selected_types = criteria.get('types', [1, 2, 3, 4]) date_field = criteria.get('date_field', 'last_advert') days = criteria.get('days', 0) # Calculate timestamp threshold for days filter current_time = int(time.time()) days_threshold = days * 86400 # Convert days to seconds # Get protected contacts (exclude from cleanup) db = _get_db() protected_keys = db.get_protected_keys() if db else set() filtered = [] for contact in contacts: # Skip protected contacts if contact.get('public_key', '').lower() in protected_keys: continue # Filter by type if contact.get('type') not in selected_types: continue # Filter by name (partial match, case-insensitive) if name_filter: contact_name = contact.get('name', '').lower() if name_filter not in contact_name: continue # Filter by date (days of inactivity) if days > 0: timestamp = contact.get(date_field, 0) if timestamp == 0: # No timestamp - consider as inactive pass else: # Check if inactive for more than specified days age_seconds = current_time - timestamp if age_seconds <= days_threshold: # Still active within threshold continue # Contact matches all criteria filtered.append(contact) return filtered @api_bp.route('/contacts/preview-cleanup', methods=['POST']) def preview_cleanup_contacts(): """ Preview contacts that will be deleted based on filter criteria. JSON body: { "name_filter": "", # Partial name match (empty = ignore) "types": [1, 2, 3, 4], # Contact types (1=COM, 2=REP, 3=ROOM, 4=SENS) "date_field": "last_advert", # "last_advert" or "lastmod" "days": 2 # Days of inactivity (0 = ignore) } Returns: JSON with preview of contacts to be deleted: { "success": true, "contacts": [...], "count": 15 } """ try: data = request.get_json() or {} # Validate criteria criteria = { 'name_filter': data.get('name_filter', ''), 'types': data.get('types', [1, 2, 3, 4]), 'date_field': data.get('date_field', 'last_advert'), 'days': data.get('days', 0) } # Validate types if not isinstance(criteria['types'], list) or not all(t in [1, 2, 3, 4] for t in criteria['types']): return jsonify({ 'success': False, 'error': 'Invalid types (must be list of 1, 2, 3, 4)' }), 400 # Validate date_field if criteria['date_field'] not in ['last_advert', 'lastmod']: return jsonify({ 'success': False, 'error': 'Invalid date_field (must be "last_advert" or "lastmod")' }), 400 # Validate numeric fields if not isinstance(criteria['days'], int) or criteria['days'] < 0: return jsonify({ 'success': False, 'error': 'Invalid days (must be non-negative integer)' }), 400 # Get all contacts success_detailed, contacts_detailed, error_detailed = cli.get_contacts_with_last_seen() if not success_detailed: return jsonify({ 'success': False, 'error': error_detailed or 'Failed to get contacts' }), 500 # Convert to list format (same as /api/contacts/detailed) type_labels = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'} contacts = [] for public_key, details in contacts_detailed.items(): out_path_len = details.get('out_path_len', -1) contacts.append({ 'public_key': public_key, 'name': details.get('adv_name', ''), 'type': details.get('type'), 'type_label': type_labels.get(details.get('type'), 'UNKNOWN'), 'last_advert': details.get('last_advert'), 'lastmod': details.get('lastmod'), 'out_path_len': out_path_len, 'out_path': details.get('out_path', ''), 'out_path_hash_mode': details.get('out_path_hash_mode', 0), 'adv_lat': details.get('adv_lat'), 'adv_lon': details.get('adv_lon') }) # Filter contacts filtered_contacts = _filter_contacts_by_criteria(contacts, criteria) return jsonify({ 'success': True, 'contacts': filtered_contacts, 'count': len(filtered_contacts) }), 200 except Exception as e: logger.error(f"Error previewing cleanup: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/contacts/cleanup', methods=['POST']) def cleanup_contacts(): """ Clean up contacts based on filter criteria. JSON body: { "name_filter": "", # Partial name match (empty = ignore) "types": [1, 2, 3, 4], # Contact types (1=COM, 2=REP, 3=ROOM, 4=SENS) "date_field": "last_advert", # "last_advert" or "lastmod" "days": 2 # Days of inactivity (0 = ignore) } Returns: JSON with cleanup result: { "success": true, "deleted_count": 15, "failed_count": 2, "failures": [ {"name": "Contact1", "error": "..."}, ... ] } """ try: data = request.get_json() or {} # Validate criteria (same as preview) criteria = { 'name_filter': data.get('name_filter', ''), 'types': data.get('types', [1, 2, 3, 4]), 'date_field': data.get('date_field', 'last_advert'), 'days': data.get('days', 0) } # Validate types if not isinstance(criteria['types'], list) or not all(t in [1, 2, 3, 4] for t in criteria['types']): return jsonify({ 'success': False, 'error': 'Invalid types (must be list of 1, 2, 3, 4)' }), 400 # Validate date_field if criteria['date_field'] not in ['last_advert', 'lastmod']: return jsonify({ 'success': False, 'error': 'Invalid date_field (must be "last_advert" or "lastmod")' }), 400 # Validate numeric fields if not isinstance(criteria['days'], int) or criteria['days'] < 0: return jsonify({ 'success': False, 'error': 'Invalid days (must be non-negative integer)' }), 400 # Get all contacts success_detailed, contacts_detailed, error_detailed = cli.get_contacts_with_last_seen() if not success_detailed: return jsonify({ 'success': False, 'error': error_detailed or 'Failed to get contacts' }), 500 # Convert to list format type_labels = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'} contacts = [] for public_key, details in contacts_detailed.items(): out_path_len = details.get('out_path_len', -1) contacts.append({ 'public_key': public_key, 'name': details.get('adv_name', ''), 'type': details.get('type'), 'type_label': type_labels.get(details.get('type'), 'UNKNOWN'), 'last_advert': details.get('last_advert'), 'lastmod': details.get('lastmod'), 'out_path_len': out_path_len }) # Filter contacts to delete filtered_contacts = _filter_contacts_by_criteria(contacts, criteria) if len(filtered_contacts) == 0: return jsonify({ 'success': True, 'message': 'No contacts matched the criteria', 'deleted_count': 0, 'failed_count': 0, 'failures': [] }), 200 # Delete contacts one by one, track failures deleted_count = 0 failed_count = 0 failures = [] for contact in filtered_contacts: contact_name = contact['name'] success, message = cli.delete_contact(contact_name) if success: deleted_count += 1 else: failed_count += 1 failures.append({ 'name': contact_name, 'error': message }) # Invalidate contacts cache after deletions if deleted_count > 0: invalidate_contacts_cache() return jsonify({ 'success': True, 'message': f'Cleanup completed: {deleted_count} deleted, {failed_count} failed', 'deleted_count': deleted_count, 'failed_count': failed_count, 'failures': failures }), 200 except Exception as e: logger.error(f"Error cleaning contacts: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/device/info', methods=['GET']) def get_device_info(): """ Get detailed device information from DeviceManager. """ try: dm = _get_dm() if not dm or not dm.is_connected: return jsonify({'success': False, 'error': 'Device not connected'}), 503 info = dm.get_device_info() if info: return jsonify({ 'success': True, 'info': info, }), 200 else: return jsonify({'success': False, 'error': 'No device info available'}), 503 except Exception as e: logger.error(f"Error getting device info: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/device/config', methods=['GET']) def get_device_config(): """Get device configuration for settings modal.""" try: dm = _get_dm() if not dm or not dm.is_connected: return jsonify({'success': False, 'error': 'Device not connected'}), 503 info = dm.get_device_info() if not info: return jsonify({'success': False, 'error': 'No device info available'}), 503 path_hash_mode = None try: phm_result = dm.get_param('path_hash_mode') if phm_result.get('success'): path_hash_mode = phm_result.get('data', {}).get('path_hash_mode') except Exception as e: logger.warning(f"Could not read path_hash_mode: {e}") return jsonify({ 'success': True, 'config': { 'name': info.get('name', info.get('adv_name', '')), 'lat': info.get('adv_lat', 0), 'lon': info.get('adv_lon', 0), 'advert_loc_policy': info.get('adv_loc_policy', 0), 'path_hash_mode': path_hash_mode, 'radio_freq': info.get('radio_freq', 0), 'radio_bw': info.get('radio_bw', 0), 'radio_sf': info.get('radio_sf', 0), 'radio_cr': info.get('radio_cr', 0), 'tx_power': info.get('tx_power', 0), } }), 200 except Exception as e: logger.error(f"Error getting device config: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/device/config', methods=['POST']) def update_device_config(): """Update device configuration from settings modal.""" try: dm = _get_dm() if not dm or not dm.is_connected: return jsonify({'success': False, 'error': 'Device not connected'}), 503 data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No data provided'}), 400 errors = [] # Name if 'name' in data: result = dm.set_param('name', str(data['name'])) if not result.get('success'): errors.append(f"name: {result.get('error')}") # Coordinates if 'lat' in data or 'lon' in data: lat = float(data.get('lat', 0)) lon = float(data.get('lon', 0)) result = dm.set_param('coords', f"{lat},{lon}") if not result.get('success'): errors.append(f"coords: {result.get('error')}") # Advert location policy if 'advert_loc_policy' in data: val = '1' if data['advert_loc_policy'] else '0' result = dm.set_param('advert_loc_policy', val) if not result.get('success'): errors.append(f"advert_loc_policy: {result.get('error')}") # Path hash mode (0=1B, 1=2B, 2=3B) if 'path_hash_mode' in data and data['path_hash_mode'] is not None: try: phm = int(data['path_hash_mode']) if phm not in (0, 1, 2): errors.append(f"path_hash_mode: must be 0, 1, or 2") else: result = dm.set_param('path_hash_mode', str(phm)) if not result.get('success'): errors.append(f"path_hash_mode: {result.get('error')}") except (TypeError, ValueError): errors.append("path_hash_mode: invalid value") # Radio params (all 4 together) if 'radio_freq' in data: freq = float(data['radio_freq']) bw = float(data.get('radio_bw', 0)) sf = int(data.get('radio_sf', 0)) cr = int(data.get('radio_cr', 0)) result = dm.set_param('radio', f"{freq},{bw},{sf},{cr}") if not result.get('success'): errors.append(f"radio: {result.get('error')}") # TX Power (separate from radio) if 'tx_power' in data: result = dm.set_param('tx', str(int(data['tx_power']))) if not result.get('success'): errors.append(f"tx_power: {result.get('error')}") if errors: return jsonify({'success': False, 'error': '; '.join(errors)}), 500 return jsonify({'success': True, 'message': 'Device config updated'}), 200 except Exception as e: logger.error(f"Error updating device config: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/device/stats', methods=['GET']) def get_device_stats(): """ Get device statistics (uptime, radio, packets). """ try: dm = _get_dm() if not dm or not dm.is_connected: return jsonify({'success': False, 'error': 'Device not connected'}), 503 stats = dm.get_device_stats() bat = dm.get_battery() db = _get_db() db_stats = db.get_stats() if db else {} return jsonify({ 'success': True, 'stats': stats, 'battery': bat, 'db_stats': db_stats, }), 200 except Exception as e: logger.error(f"Error getting device stats: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Special Commands # ============================================================================= # Registry of available special commands SPECIAL_COMMANDS = { 'advert': { 'function': cli.advert, 'description': 'Send single advertisement (recommended)', }, 'floodadv': { 'function': cli.floodadv, 'description': 'Flood advertisement (use sparingly!)', }, } @api_bp.route('/device/command', methods=['POST']) def execute_special_command(): """ Execute a special device command. JSON body: command (str): Command name (required) - one of: advert, floodadv, node_discover Returns: JSON with command result """ try: data = request.get_json() if not data or 'command' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: command' }), 400 command = data['command'].strip().lower() if command not in SPECIAL_COMMANDS: return jsonify({ 'success': False, 'error': f'Unknown command: {command}. Available commands: {", ".join(SPECIAL_COMMANDS.keys())}' }), 400 # Execute the command cmd_info = SPECIAL_COMMANDS[command] success, message = cmd_info['function']() if success: # Clean up advert message if command == 'advert': clean_message = "Advert sent" else: clean_message = message or f'{command} executed successfully' return jsonify({ 'success': True, 'command': command, 'message': clean_message }), 200 else: return jsonify({ 'success': False, 'command': command, 'error': message }), 500 except Exception as e: logger.error(f"Error executing special command: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/device/commands', methods=['GET']) def list_special_commands(): """ List available special commands. Returns: JSON with list of available commands """ commands = [ {'name': name, 'description': info['description']} for name, info in SPECIAL_COMMANDS.items() ] return jsonify({ 'success': True, 'commands': commands }), 200 @api_bp.route('/sync', methods=['POST']) def sync_messages(): """ Trigger message sync from device. Returns: JSON with sync result """ try: success, message = cli.recv_messages() if success: return jsonify({ 'success': True, 'message': 'Messages synced successfully' }), 200 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error syncing messages: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/archives', methods=['GET']) def get_archives(): """ Get list of available message archives (dates with messages). Returns: JSON with list of archives, each with: - date (str): Date in YYYY-MM-DD format - message_count (int): Number of messages on that date """ try: # v2: Query distinct dates from SQLite db = _get_db() if db: dates = db.get_message_dates() # Exclude today (that's "Live") from datetime import date as date_cls today = date_cls.today().isoformat() archives = [d for d in dates if d['date'] != today] return jsonify({ 'success': True, 'archives': archives, 'count': len(archives) }), 200 # Fallback to file-based archives archives = archive_manager.list_archives() return jsonify({ 'success': True, 'archives': archives, 'count': len(archives) }), 200 except Exception as e: logger.error(f"Error listing archives: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/archive/trigger', methods=['POST']) def trigger_archive(): """ Manually trigger message archiving. JSON body: date (str): Date to archive in YYYY-MM-DD format (optional, defaults to yesterday) Returns: JSON with archive operation result """ try: data = request.get_json() or {} archive_date = data.get('date') # Validate date format if provided if archive_date: try: datetime.strptime(archive_date, '%Y-%m-%d') except ValueError: return jsonify({ 'success': False, 'error': f'Invalid date format: {archive_date}. Expected YYYY-MM-DD' }), 400 # Trigger archiving result = archive_manager.archive_messages(archive_date) if result['success']: return jsonify(result), 200 else: return jsonify(result), 500 except Exception as e: logger.error(f"Error triggering archive: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/channels', methods=['GET']) def get_channels(): """ Get list of configured channels (cached for 30s). Returns: JSON with channels list """ try: success, channels = get_channels_cached() if success: return jsonify({ 'success': True, 'channels': channels, 'count': len(channels) }), 200 else: return jsonify({ 'success': False, 'error': 'Failed to retrieve channels' }), 500 except Exception as e: logger.error(f"Error getting channels: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/channels', methods=['POST']) def create_channel(): """ Create a new channel with auto-generated key. JSON body: name (str): Channel name (required) Returns: JSON with created channel info """ try: data = request.get_json() if not data or 'name' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: name' }), 400 name = data['name'].strip() if not name: return jsonify({ 'success': False, 'error': 'Channel name cannot be empty' }), 400 # Validate name (no special chars that could break CLI) if not re.match(r'^[a-zA-Z0-9_\-]+$', name): return jsonify({ 'success': False, 'error': 'Channel name can only contain letters, numbers, _ and -' }), 400 # Serialize create/join so concurrent requests can't both pick a free # slot for the same name and produce duplicates. with _channel_write_lock: # Idempotency: if a channel with this name already exists, return it # instead of creating a second slot. Compare case-insensitively. invalidate_channels_cache() # force fresh read from device success_ch, existing = get_channels_cached(force_refresh=True) if success_ch: for ch in existing: if ch.get('name', '').lower() == name.lower(): logger.info(f"Channel '{name}' already exists at slot {ch['index']}, returning existing") return jsonify({ 'success': True, 'message': f"Channel '{name}' already exists at slot {ch['index']}", 'channel': { 'index': ch['index'], 'name': ch.get('name', name), 'key': ch.get('key', ''), }, 'already_existed': True, }), 200 success, message, key = cli.add_channel(name) if success: invalidate_channels_cache() # Clear cache to force refresh # Build response response = { 'success': True, 'message': message, 'channel': { 'name': name, 'key': key } } # Check channel count for soft limit warning success_ch, channels = get_channels_cached() if success_ch and len(channels) > 7: response['warning'] = ( f'You now have {len(channels)} channels. ' 'Some devices may only support up to 8 channels. ' 'Check your device specifications if you experience issues.' ) return jsonify(response), 201 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error creating channel: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/channels/join', methods=['POST']) def join_channel(): """ Join an existing channel by setting name and key. JSON body: name (str): Channel name (required) key (str): 32-char hex key (optional for channels starting with #) index (int): Channel slot (optional, auto-detect if not provided) Returns: JSON with result """ try: data = request.get_json() if not data or 'name' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: name' }), 400 name = data['name'].strip() key = data.get('key', '').strip().lower() if 'key' in data else None # Validate: key is optional for channels starting with # if not name.startswith('#') and not key: return jsonify({ 'success': False, 'error': 'Key is required for channels not starting with #' }), 400 # Serialize create/join so concurrent requests can't both pick a free # slot for the same name and produce duplicates. with _channel_write_lock: # Fresh read from device before picking slot or checking duplicates. invalidate_channels_cache() success_ch, channels = get_channels_cached(force_refresh=True) if not success_ch: return jsonify({ 'success': False, 'error': 'Failed to get current channels' }), 500 # Idempotency: if a channel with this name already exists, return # the existing slot instead of creating a duplicate. Skip this check # only when caller explicitly targets an index (explicit overwrite). if 'index' not in data: for ch in channels: if ch.get('name', '').lower() == name.lower(): logger.info(f"Channel '{name}' already exists at slot {ch['index']}, returning existing") return jsonify({ 'success': True, 'message': f"Already joined channel \"{name}\" at slot {ch['index']}", 'channel': { 'index': ch['index'], 'name': ch.get('name', name), 'key': ch.get('key', '') or 'auto-generated', }, 'already_existed': True, }), 200 # Auto-detect free slot if not provided if 'index' in data: index = int(data['index']) used_indices = {ch['index'] for ch in channels} else: # Find first free slot (1-40, skip 0 which is Public) # Hard limit: 40 channels (most LoRa devices support up to 40) # Soft limit: 7 channels (some devices may have lower limits) used_indices = {ch['index'] for ch in channels} index = None for i in range(1, 41): # Max 40 channels (hard limit) if i not in used_indices: index = i break if index is None: return jsonify({ 'success': False, 'error': 'No free channel slots available (max 40 channels)' }), 400 success, message = cli.set_channel(index, name, key) if success: invalidate_channels_cache() # Clear cache to force refresh # Build response response = { 'success': True, 'message': f'Joined channel "{name}" at slot {index}', 'channel': { 'index': index, 'name': name, 'key': key if key else 'auto-generated' } } # Add warning if exceeding soft limit (7 channels) # Some older/smaller devices may only support 8 channels total channel_count = len(used_indices) + 1 # +1 for newly added channel if channel_count > 7: response['warning'] = ( f'You now have {channel_count} channels. ' 'Some devices may only support up to 8 channels. ' 'Check your device specifications if you experience issues.' ) return jsonify(response), 200 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error joining channel: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/channels/', methods=['DELETE']) def delete_channel(index): """ Remove a channel and delete all its messages. Args: index: Channel index to remove Returns: JSON with result """ try: # First, delete all messages for this channel db = _get_db() if db: db.delete_channel_messages(index) else: messages_deleted = parser.delete_channel_messages(index) if not messages_deleted: logger.warning(f"Failed to delete messages for channel {index}, continuing with channel removal") # Then remove the channel itself success, message = cli.remove_channel(index) if success: invalidate_channels_cache() # Clear cache to force refresh return jsonify({ 'success': True, 'message': f'Channel {index} removed and messages deleted' }), 200 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error removing channel: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/channels//qr', methods=['GET']) def get_channel_qr(index): """ Generate QR code for channel sharing. Args: index: Channel index Query params: format: 'json' (default) or 'png' Returns: JSON with QR data or PNG image """ try: import qrcode # Get channel info success, channels = cli.get_channels() if not success: return jsonify({ 'success': False, 'error': 'Failed to get channels' }), 500 channel = next((ch for ch in channels if ch['index'] == index), None) if not channel: return jsonify({ 'success': False, 'error': f'Channel {index} not found' }), 404 # Create QR data qr_data = { 'type': 'meshcore_channel', 'name': channel['name'], 'key': channel['key'] } qr_json = json.dumps(qr_data) format_type = request.args.get('format', 'json') if format_type == 'png': # Generate PNG QR code qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(qr_json) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") # Convert to PNG bytes buf = BytesIO() img.save(buf, format='PNG') buf.seek(0) return send_file(buf, mimetype='image/png') else: # JSON format # Generate base64 data URL for inline display qr = qrcode.QRCode(version=1, box_size=10, border=4) qr.add_data(qr_json) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buf = BytesIO() img.save(buf, format='PNG') buf.seek(0) img_base64 = base64.b64encode(buf.read()).decode() data_url = f"data:image/png;base64,{img_base64}" return jsonify({ 'success': True, 'qr_data': qr_data, 'qr_image': data_url, 'qr_text': qr_json }), 200 except ImportError: return jsonify({ 'success': False, 'error': 'QR code library not available' }), 500 except Exception as e: logger.error(f"Error generating QR code: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 _MENTION_RE = re.compile(r'@\[([^\]]+)\]') def _make_preview(content, max_len=60): """Strip @[name] mention syntax and truncate content for channel list preview.""" if not content: return '' stripped = _MENTION_RE.sub(r'\1', content) if len(stripped) > max_len: return stripped[:max_len] + '…' return stripped @api_bp.route('/messages/updates', methods=['GET']) def get_messages_updates(): """ Check for new messages across all channels without fetching full message content. Used for intelligent refresh mechanism and unread notifications. OPTIMIZED: Reads messages file only ONCE and computes stats for all channels. Previously read the file N*2 times (once per channel, twice if updates). Query parameters: last_seen (str): JSON object with last seen timestamps per channel Format: {"0": 1234567890, "1": 1234567891, ...} Returns: JSON with update information per channel: { "success": true, "channels": [ { "index": 0, "name": "Public", "has_updates": true, "latest_timestamp": 1234567900, "unread_count": 5 }, ... ], "total_unread": 10 } """ try: # Parse last_seen timestamps from query param last_seen_str = request.args.get('last_seen', '{}') try: last_seen = json.loads(last_seen_str) # Convert keys to integers and values to floats last_seen = {int(k): float(v) for k, v in last_seen.items()} except (json.JSONDecodeError, ValueError): last_seen = {} # Get list of channels from DB (fast) to avoid blocking on device db = _get_db() db_channels = db.get_channels() if db else [] # Normalize DB channel format to match expected structure channels = [{'index': ch['idx'], 'name': ch['name']} for ch in db_channels] if not channels: # Fallback: at least include Public channel channels = [{'index': 0, 'name': 'Public'}] # OPTIMIZATION: Read ALL messages ONCE (no channel filter) # Then compute per-channel statistics in memory if db: all_messages = db.get_channel_messages(limit=None, days=7) else: all_messages = parser.read_messages(limit=None, days=7) # Filter out blocked contacts' messages from unread counts if db: blocked_names = db.get_blocked_contact_names() if blocked_names: all_messages = [m for m in all_messages if m.get('sender', '') not in blocked_names] # Group messages by channel and compute stats channel_stats = {} # channel_idx -> {latest_ts, messages_after_last_seen} for msg in all_messages: ch_idx = msg.get('channel_idx', 0) ts = msg.get('timestamp', 0) if ch_idx not in channel_stats: channel_stats[ch_idx] = { 'latest_timestamp': 0, 'unread_count': 0, 'last_message_content': '', 'last_message_sender': '' } # Track latest timestamp per channel (and capture its content/sender) if ts > channel_stats[ch_idx]['latest_timestamp']: channel_stats[ch_idx]['latest_timestamp'] = ts channel_stats[ch_idx]['last_message_content'] = msg.get('content', '') channel_stats[ch_idx]['last_message_sender'] = msg.get('sender', '') # Count unread messages (newer than last_seen) last_seen_ts = last_seen.get(ch_idx, 0) if ts > last_seen_ts: channel_stats[ch_idx]['unread_count'] += 1 # Get muted channels to exclude from total from app import read_status as rs muted_channels = set(rs.get_muted_channels()) favorite_channels = rs.get_favorite_channels() # Build response updates = [] total_unread = 0 for channel in channels: channel_idx = channel['index'] stats = channel_stats.get(channel_idx, { 'latest_timestamp': 0, 'unread_count': 0, 'last_message_content': '', 'last_message_sender': '' }) last_seen_ts = last_seen.get(channel_idx, 0) has_updates = stats['latest_timestamp'] > last_seen_ts unread_count = stats['unread_count'] if has_updates else 0 # Only count unmuted channels toward total if channel_idx not in muted_channels: total_unread += unread_count updates.append({ 'index': channel_idx, 'name': channel['name'], 'has_updates': has_updates, 'latest_timestamp': stats['latest_timestamp'], 'unread_count': unread_count, 'last_message_preview': _make_preview(stats.get('last_message_content', '')), 'last_message_time': stats['latest_timestamp'] }) return jsonify({ 'success': True, 'channels': updates, 'total_unread': total_unread, 'muted_channels': list(muted_channels), 'favorite_channels': favorite_channels }), 200 except Exception as e: logger.error(f"Error checking message updates: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ============================================================================= # Message Search # ============================================================================= @api_bp.route('/messages/search', methods=['GET']) def search_messages(): """ Full-text search across all channel and direct messages (FTS5). Query params: q (str): Search query (required) limit (int): Max results (default: 50) Returns: JSON with search results sorted by timestamp descending. """ query = request.args.get('q', '').strip() if not query: return jsonify({'success': False, 'error': 'Missing search query'}), 400 limit = request.args.get('limit', 50, type=int) limit = min(limit, 200) try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 # FTS5 query — wrap in quotes for phrase search if contains spaces # and add * for prefix matching fts_query = query results = db.search_messages(fts_query, limit=limit) # Enrich results with channel names and contact info success, channels_list = get_channels_cached() channel_names = {ch['index']: ch['name'] for ch in channels_list} if success else {} enriched = [] for r in results: item = { 'id': r.get('id'), 'content': r.get('content', ''), 'timestamp': r.get('timestamp', 0), 'source': r.get('msg_source', 'channel'), } if r.get('msg_source') == 'channel': item['sender'] = r.get('sender', '') item['channel_idx'] = r.get('channel_idx') item['channel_name'] = channel_names.get(r.get('channel_idx'), f"Channel {r.get('channel_idx')}") item['is_own'] = bool(r.get('is_own')) else: item['contact_pubkey'] = r.get('contact_pubkey', '') item['direction'] = r.get('direction', '') # Look up contact name contact = db.get_contact(r.get('contact_pubkey', '')) if r.get('contact_pubkey') else None item['contact_name'] = contact.get('name', r.get('contact_pubkey', '')[:12]) if contact else r.get('contact_pubkey', '')[:12] enriched.append(item) return jsonify({ 'success': True, 'results': enriched, 'count': len(enriched), 'query': query }), 200 except Exception as e: logger.error(f"Search error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Direct Messages (DM) Endpoints # ============================================================================= @api_bp.route('/dm/conversations', methods=['GET']) def get_dm_conversations(): """ Get list of DM conversations. Query params: days (int): Filter to last N days (default: 7) Returns: JSON with conversations list: { "success": true, "conversations": [ { "conversation_id": "pk_4563b1621b58", "display_name": "daniel5120", "pubkey_prefix": "4563b1621b58", "last_message_timestamp": 1766491173, "last_message_preview": "Hello there...", "unread_count": 0, "message_count": 15 } ], "count": 5 } """ try: days = request.args.get('days', default=7, type=int) # v2: Read from Database db = _get_db() if db: convos = db.get_dm_conversations() # Filter out blocked contacts blocked_keys = db.get_blocked_keys() convos = [c for c in convos if c.get('contact_pubkey', '').lower() not in blocked_keys] # Convert to frontend-compatible format conversations = [] for c in convos: pk = c.get('contact_pubkey', '') conversations.append({ 'conversation_id': f"pk_{pk}" if pk else 'unknown', 'display_name': c.get('display_name', pk[:8] + '...' if pk else 'Unknown'), 'pubkey_prefix': pk[:12] if pk else '', 'last_message_timestamp': c.get('last_message_timestamp', 0), 'last_message_preview': (c.get('last_message_preview', '') or '')[:50], 'unread_count': 0, 'message_count': c.get('message_count', 0), }) else: conversations = parser.get_dm_conversations(days=days) return jsonify({ 'success': True, 'conversations': conversations, 'count': len(conversations) }), 200 except Exception as e: logger.error(f"Error getting DM conversations: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/dm/messages', methods=['GET']) def get_dm_messages(): """ Get DM messages for a specific conversation. Query params: conversation_id (str): Required - conversation identifier (pk_xxx or name_xxx) limit (int): Max messages to return (default: 100) days (int): Filter to last N days (default: 7) Returns: JSON with messages list: { "success": true, "conversation_id": "pk_4563b1621b58", "display_name": "daniel5120", "messages": [...], "count": 25 } """ try: conversation_id = request.args.get('conversation_id', type=str) if not conversation_id: return jsonify({ 'success': False, 'error': 'Missing required parameter: conversation_id' }), 400 limit = request.args.get('limit', default=100, type=int) days = request.args.get('days', default=7, type=int) # v2: Read from Database db = _get_db() pubkey_to_name = {} # Extract pubkey from conversation_id contact_pubkey = '' if conversation_id.startswith('pk_'): contact_pubkey = conversation_id[3:] elif conversation_id.startswith('name_'): # Look up pubkey by name contact_name = conversation_id[5:] if db: contacts = db.get_contacts() for c in contacts: if c.get('name', '').strip() == contact_name: contact_pubkey = c['public_key'] break if db and contact_pubkey: db_msgs = db.get_dm_messages(contact_pubkey, limit=limit) messages = [] for row in db_msgs: # Decode path_len dm_path_len_raw = row.get('path_len') dm_hop_count = None dm_path_hash_size = 1 if dm_path_len_raw is not None: dm_hop_count, dm_path_hash_size, _ = decode_path_len(dm_path_len_raw) messages.append({ 'type': 'dm', 'id': row['id'], 'direction': 'incoming' if row['direction'] == 'in' else 'outgoing', 'sender': row.get('contact_pubkey', ''), 'content': row.get('content', ''), 'timestamp': row.get('timestamp', 0), 'datetime': datetime.fromtimestamp(row['timestamp']).isoformat() if row.get('timestamp') else None, 'is_own': row['direction'] == 'out', 'snr': row.get('snr'), 'path_len': dm_path_len_raw, 'hop_count': dm_hop_count, 'path_hash_size': dm_path_hash_size, 'expected_ack': row.get('expected_ack'), 'delivery_status': row.get('delivery_status'), 'delivery_attempt': row.get('delivery_attempt'), 'delivery_max_attempts': row.get('delivery_max_attempts'), 'delivery_path': row.get('delivery_path'), 'delivery_path_hash_size': row.get('delivery_path_hash_size') or 1, 'conversation_id': conversation_id, }) else: messages, pubkey_to_name = parser.read_dm_messages( limit=limit, conversation_id=conversation_id, days=days ) messages = parser.dedup_retry_messages(messages) # Determine display name display_name = 'Unknown' if conversation_id.startswith('pk_'): pk = conversation_id[3:] # Try DB first if db: contact = db.get_contact(pk) if contact: display_name = contact.get('name', pk[:8] + '...') else: display_name = pubkey_to_name.get(pk, pk[:8] + '...') else: display_name = pubkey_to_name.get(pk, pk[:8] + '...') elif conversation_id.startswith('name_'): display_name = conversation_id[5:] # Also check messages for better name for msg in messages: if msg['direction'] == 'incoming' and msg.get('sender'): display_name = msg['sender'] break elif msg['direction'] == 'outgoing' and msg.get('recipient'): display_name = msg['recipient'] # Set delivery status from DB field (covers both delivered and failed) for msg in messages: if msg.get('direction') == 'outgoing': ds = msg.get('delivery_status') if ds == 'delivered': msg['status'] = 'delivered' elif ds == 'failed': msg['status'] = 'failed' # Merge additional delivery info from ACK tracking ack_codes = [msg['expected_ack'] for msg in messages if msg.get('direction') == 'outgoing' and msg.get('expected_ack')] if ack_codes: try: success_ack, acks, _ = cli.check_dm_delivery(ack_codes) if success_ack: for msg in messages: ack_code = msg.get('expected_ack') if ack_code and acks.get(ack_code): ack_info = acks[ack_code] msg['status'] = 'delivered' msg['delivery_snr'] = ack_info.get('snr') msg['delivery_route'] = ack_info.get('route_type', ack_info.get('route')) except Exception as e: logger.debug(f"ACK status fetch failed (non-critical): {e}") return jsonify({ 'success': True, 'conversation_id': conversation_id, 'display_name': display_name, 'messages': messages, 'count': len(messages) }), 200 except Exception as e: logger.error(f"Error getting DM messages: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/dm/messages', methods=['POST']) def send_dm_message(): """ Send a direct message. JSON body: recipient (str): Contact name (required) text (str): Message content (required) Returns: JSON with send result: { "success": true, "message": "DM sent", "recipient": "daniel5120", "status": "pending" } """ try: data = request.get_json() if not data: return jsonify({ 'success': False, 'error': 'Missing JSON body' }), 400 recipient = data.get('recipient', '').strip() text = data.get('text', '').strip() if not recipient: return jsonify({ 'success': False, 'error': 'Missing required field: recipient' }), 400 if not text: return jsonify({ 'success': False, 'error': 'Missing required field: text' }), 400 # MeshCore message length limit byte_length = len(text.encode('utf-8')) if byte_length > 200: return jsonify({ 'success': False, 'error': f'Message too long ({byte_length} bytes). Maximum 200 bytes allowed.' }), 400 # Send via CLI success, result = cli.send_dm(recipient, text) if success: return jsonify({ 'success': True, 'message': 'DM sent', 'recipient': recipient, 'status': 'pending', 'dm_id': result.get('id'), 'expected_ack': result.get('expected_ack'), }), 200 else: return jsonify({ 'success': False, 'error': result.get('error', 'Send failed') }), 500 except Exception as e: logger.error(f"Error sending DM: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/dm/auto_retry', methods=['GET']) def get_auto_retry_config(): """Get DM retry settings.""" try: return jsonify(get_dm_retry_settings()), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/dm/auto_retry', methods=['POST']) def set_auto_retry_config(): """Update DM retry settings.""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'Missing JSON body'}), 400 # Validate numeric fields valid_keys = set(DM_RETRY_DEFAULTS.keys()) settings = {} for key in valid_keys: if key in data: val = data[key] if not isinstance(val, (int, float)) or val < 0: return jsonify({'success': False, 'error': f'Invalid value for {key}'}), 400 settings[key] = int(val) if not settings: return jsonify({'success': False, 'error': 'No valid settings provided'}), 400 if save_dm_retry_settings(settings): return jsonify({**get_dm_retry_settings(), 'success': True}), 200 return jsonify({'success': False, 'error': 'Failed to save settings'}), 500 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 ## ================================================================ # Contact Paths (user-configured DM routing paths) # ================================================================ @api_bp.route('/contacts//paths', methods=['GET']) def get_contact_paths(pubkey): """List all configured paths for a contact.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: paths = db.get_contact_paths(pubkey) return jsonify({'success': True, 'paths': paths}), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths', methods=['POST']) def add_contact_path(pubkey): """Add a new configured path for a contact.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'Missing JSON body'}), 400 path_hex = data.get('path_hex', '').strip() hash_size = data.get('hash_size', 1) label = data.get('label', '').strip() is_primary = bool(data.get('is_primary', False)) # Validate path_hex if not path_hex: return jsonify({'success': False, 'error': 'path_hex is required'}), 400 if len(path_hex) % 2 != 0: return jsonify({'success': False, 'error': 'path_hex must have even length'}), 400 try: bytes.fromhex(path_hex) except ValueError: return jsonify({'success': False, 'error': 'path_hex must be valid hex'}), 400 # Validate hash_size if hash_size not in (1, 2, 3): return jsonify({'success': False, 'error': 'hash_size must be 1, 2, or 3'}), 400 # Validate hop count hop_count = len(path_hex) // (hash_size * 2) max_hops = {1: 64, 2: 32, 3: 21} if hop_count < 1 or hop_count > max_hops[hash_size]: return jsonify({'success': False, 'error': f'Hop count {hop_count} exceeds max {max_hops[hash_size]} for {hash_size}-byte hash'}), 400 # Validate path_hex length is a multiple of hash_size if len(path_hex) % (hash_size * 2) != 0: return jsonify({'success': False, 'error': f'path_hex length must be a multiple of {hash_size * 2} (hash_size={hash_size})'}), 400 # Validate label length if len(label) > 50: return jsonify({'success': False, 'error': 'Label must be 50 characters or less'}), 400 path_id = db.add_contact_path(pubkey, path_hex, hash_size, label, is_primary) return jsonify({'success': True, 'id': path_id}), 201 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths/', methods=['PUT']) def update_contact_path(pubkey, path_id): """Update a configured path.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'Missing JSON body'}), 400 kwargs = {} if 'path_hex' in data: path_hex = data['path_hex'].strip() if len(path_hex) % 2 != 0: return jsonify({'success': False, 'error': 'path_hex must have even length'}), 400 try: bytes.fromhex(path_hex) except ValueError: return jsonify({'success': False, 'error': 'path_hex must be valid hex'}), 400 kwargs['path_hex'] = path_hex if 'hash_size' in data: if data['hash_size'] not in (1, 2, 3): return jsonify({'success': False, 'error': 'hash_size must be 1, 2, or 3'}), 400 kwargs['hash_size'] = data['hash_size'] if 'label' in data: label = data['label'].strip() if len(label) > 50: return jsonify({'success': False, 'error': 'Label must be 50 characters or less'}), 400 kwargs['label'] = label if 'is_primary' in data: kwargs['is_primary'] = 1 if data['is_primary'] else 0 if not kwargs: return jsonify({'success': False, 'error': 'No valid fields to update'}), 400 if db.update_contact_path(path_id, **kwargs): return jsonify({'success': True}), 200 return jsonify({'success': False, 'error': 'Path not found'}), 404 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths/', methods=['DELETE']) def delete_contact_path(pubkey, path_id): """Delete a single configured path.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: if db.delete_contact_path(path_id): return jsonify({'success': True}), 200 return jsonify({'success': False, 'error': 'Path not found'}), 404 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths/reorder', methods=['POST']) def reorder_contact_paths(pubkey): """Reorder configured paths. Body: {path_ids: [3, 1, 2]}""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: data = request.get_json() if not data or 'path_ids' not in data: return jsonify({'success': False, 'error': 'path_ids array required'}), 400 path_ids = data['path_ids'] if not isinstance(path_ids, list) or not all(isinstance(i, int) for i in path_ids): return jsonify({'success': False, 'error': 'path_ids must be an array of integers'}), 400 db.reorder_contact_paths(pubkey, path_ids) return jsonify({'success': True}), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths//apply', methods=['POST']) def apply_contact_path(pubkey, path_id): """Push a configured path to the device as the active path. Equivalent to running `change_path ` from the console: the configured path's hex + hash_size are sent to the firmware so the next DM goes via that route instead of FLOOD or the prior device path. """ db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 dm = _get_dm() if not dm or not dm.is_connected: return jsonify({'success': False, 'error': 'Device not connected'}), 503 try: target = next((p for p in db.get_contact_paths(pubkey) if p['id'] == path_id), None) if not target: return jsonify({'success': False, 'error': 'Path not found'}), 404 path_hex = (target.get('path_hex') or '').strip() hash_size = target.get('hash_size') or 1 if not path_hex: return jsonify({'success': False, 'error': 'Configured path has no hex data'}), 400 result = dm.change_path(pubkey, path_hex, hash_size=hash_size) if result.get('success'): invalidate_contacts_cache() return jsonify({'success': True}), 200 return jsonify({'success': False, 'error': result.get('error', 'Device change_path failed')}), 500 except Exception as e: logger.error(f"apply_contact_path error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths/reset_flood', methods=['POST']) def reset_contact_to_flood(pubkey): """Reset device path to FLOOD mode (does not delete configured paths).""" try: dm = _get_dm() if not dm or not dm.is_connected: return jsonify({'success': False, 'error': 'Device not connected'}), 503 dev_result = dm.reset_path(pubkey) logger.info(f"reset_path({pubkey[:12]}...) result: {dev_result}") if dev_result.get('success'): invalidate_contacts_cache() return jsonify({'success': True}), 200 return jsonify({'success': False, 'error': dev_result.get('error', 'Device reset failed')}), 500 except Exception as e: logger.error(f"reset_flood error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//paths/clear', methods=['POST']) def clear_contact_paths(pubkey): """Delete all configured paths for a contact from the database.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: deleted = db.delete_all_contact_paths(pubkey) return jsonify({'success': True, 'paths_deleted': deleted}), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//no_auto_flood', methods=['GET']) def get_no_auto_flood(pubkey): """Get the no_auto_flood flag for a contact.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: value = db.get_contact_no_auto_flood(pubkey) return jsonify({'success': True, 'no_auto_flood': value}), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//no_auto_flood', methods=['PUT']) def set_no_auto_flood(pubkey): """Set the no_auto_flood flag for a contact.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: data = request.get_json() if data is None or 'no_auto_flood' not in data: return jsonify({'success': False, 'error': 'no_auto_flood field required'}), 400 value = bool(data['no_auto_flood']) if db.set_contact_no_auto_flood(pubkey, value): return jsonify({'success': True, 'no_auto_flood': value}), 200 return jsonify({'success': False, 'error': 'Contact not found'}), 404 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/repeaters', methods=['GET']) def get_repeater_contacts(): """List all repeater contacts (type=1) from DB, including ignored.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: repeaters = db.get_repeater_contacts() return jsonify({'success': True, 'repeaters': repeaters}), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/chat/settings', methods=['GET']) def get_chat_config(): """Get chat settings.""" try: return jsonify(get_chat_settings()), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/chat/settings', methods=['POST']) def set_chat_config(): """Update chat settings.""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'Missing JSON body'}), 400 valid_keys = set(CHAT_SETTINGS_DEFAULTS.keys()) settings = {} for key in valid_keys: if key not in data: continue val = data[key] if key in CHAT_SETTINGS_BOOL_KEYS: if not isinstance(val, bool): return jsonify({'success': False, 'error': f'Invalid value for {key}'}), 400 settings[key] = val else: if isinstance(val, bool) or not isinstance(val, (int, float)) or val < 1: return jsonify({'success': False, 'error': f'Invalid value for {key}'}), 400 if key == 'path_popup_timeout_sec' and val > 60: return jsonify({'success': False, 'error': f'Invalid value for {key}'}), 400 settings[key] = int(val) if not settings: return jsonify({'success': False, 'error': 'No valid settings provided'}), 400 if save_chat_settings(settings): return jsonify({**get_chat_settings(), 'success': True}), 200 return jsonify({'success': False, 'error': 'Failed to save settings'}), 500 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/ui/settings', methods=['GET']) def get_ui_config(): """Get UI (interface) settings.""" try: return jsonify(get_ui_settings()), 200 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/ui/settings', methods=['POST']) def set_ui_config(): """Update UI (interface) settings.""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'Missing JSON body'}), 400 settings = {} if 'toast_timeout_sec' in data: val = data['toast_timeout_sec'] if not isinstance(val, (int, float)) or isinstance(val, bool) or val < 1 or val > 60: return jsonify({'success': False, 'error': 'Invalid value for toast_timeout_sec'}), 400 settings['toast_timeout_sec'] = float(val) if 'toast_no_autoclose' in data: val = data['toast_no_autoclose'] if not isinstance(val, bool): return jsonify({'success': False, 'error': 'Invalid value for toast_no_autoclose'}), 400 settings['toast_no_autoclose'] = val if 'toast_position' in data: val = data['toast_position'] if val not in TOAST_POSITIONS: return jsonify({'success': False, 'error': 'Invalid value for toast_position'}), 400 settings['toast_position'] = val if not settings: return jsonify({'success': False, 'error': 'No valid settings provided'}), 400 if save_ui_settings(settings): return jsonify({**get_ui_settings(), 'success': True}), 200 return jsonify({'success': False, 'error': 'Failed to save settings'}), 500 except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/dm/updates', methods=['GET']) def get_dm_updates(): """ Check for new DMs across all conversations. Used for notification badge updates. Query params: last_seen (str): JSON object with last seen timestamps per conversation Format: {"pk_xxx": 1234567890, "name_yyy": 1234567891, ...} Returns: JSON with update information: { "success": true, "total_unread": 5, "conversations": [ { "conversation_id": "pk_4563b1621b58", "display_name": "daniel5120", "unread_count": 3, "latest_timestamp": 1766491173 } ] } """ try: # Parse last_seen timestamps last_seen_str = request.args.get('last_seen', '{}') try: last_seen = json.loads(last_seen_str) except json.JSONDecodeError: last_seen = {} # v2: Read from Database db = _get_db() updates = [] total_unread = 0 if db: convos = db.get_dm_conversations() blocked_names = db.get_blocked_contact_names() blocked_keys = db.get_blocked_keys() for c in convos: pk = c.get('contact_pubkey', '') # Skip blocked contacts if pk in blocked_keys: continue conv_id = f"pk_{pk}" if pk else 'unknown' display_name = c.get('display_name', pk[:8] + '...' if pk else 'Unknown') if display_name in blocked_names: continue last_msg_ts = c.get('last_message_timestamp', 0) last_seen_ts = last_seen.get(conv_id, 0) if last_msg_ts > last_seen_ts and pk: db_msgs = db.get_dm_messages(pk, limit=200) unread_count = sum(1 for m in db_msgs if m.get('timestamp', 0) > last_seen_ts) else: unread_count = 0 total_unread += unread_count if unread_count > 0: updates.append({ 'conversation_id': conv_id, 'display_name': display_name, 'unread_count': unread_count, 'latest_timestamp': last_msg_ts }) else: # Fallback to parser conversations = parser.get_dm_conversations(days=7) for conv in conversations: conv_id = conv['conversation_id'] last_seen_ts = last_seen.get(conv_id, 0) if conv['last_message_timestamp'] > last_seen_ts: messages, _ = parser.read_dm_messages(conversation_id=conv_id, days=7) messages = parser.dedup_retry_messages(messages) unread_count = sum(1 for m in messages if m['timestamp'] > last_seen_ts) else: unread_count = 0 total_unread += unread_count if unread_count > 0: updates.append({ 'conversation_id': conv_id, 'display_name': conv['display_name'], 'unread_count': unread_count, 'latest_timestamp': conv['last_message_timestamp'] }) return jsonify({ 'success': True, 'total_unread': total_unread, 'conversations': updates }), 200 except Exception as e: logger.error(f"Error checking DM updates: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ============================================================================= # Contact Management (Existing, Pending Contacts & Settings) # ============================================================================= @api_bp.route('/contacts/detailed', methods=['GET']) def get_contacts_detailed_api(): """ Get detailed list of ALL existing contacts on the device (COM, REP, ROOM, SENS). Returns full contact_info data from meshcli including GPS coordinates, paths, etc. Returns: JSON with contacts list: { "success": true, "count": 263, "limit": 350, "contacts": [ { "name": "TK Zalesie Test 🦜", // adv_name "public_key": "df2027d3f2ef...", // Full public key (64 chars) "public_key_prefix": "df2027d3f2ef", // First 12 chars "type": 2, // 1=COM, 2=REP, 3=ROOM, 4=SENS "type_label": "REP", // Human-readable type "flags": 0, "out_path_len": -1, // -1 = Flood, 0 = Direct, >0 = hop count "out_path": "", // Path bytes as hex "out_path_hash_mode": 0, // 0=1B/hop, 1=2B/hop, 2=3B/hop "last_advert": 1735429453, // Unix timestamp "adv_lat": 50.866005, // GPS latitude "adv_lon": 20.669308, // GPS longitude "lastmod": 1715973527 // Last modification timestamp }, ... ] } """ try: # Get detailed contact info from cache (reduces 4 USB calls to 0 on cache hit) force_refresh = request.args.get('refresh', 'false').lower() == 'true' success_detailed, contacts_detailed, error_detailed = get_contacts_detailed_cached(force_refresh) if not success_detailed: return jsonify({ 'success': False, 'error': error_detailed or 'Failed to get contact details', 'contacts': [], 'count': 0, 'limit': 350 }), 500 # Convert dict to list and add computed fields type_labels = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'} contacts = [] # Get protected/ignored/blocked contacts for status fields db = _get_db() protected_keys = db.get_protected_keys() if db else set() ignored_keys = db.get_ignored_keys() if db else set() blocked_keys = db.get_blocked_keys() if db else set() for public_key, details in contacts_detailed.items(): # Compute path display string. # meshcore lib 2.x: out_path_len already holds the hop count (6 LSB) # and the hash-size mode is stored separately in out_path_hash_mode. out_path_len = details.get('out_path_len', -1) out_path_raw = details.get('out_path', '') out_path_hash_mode = details.get('out_path_hash_mode', 0) if out_path_len > 0 and out_path_raw: hop_count = out_path_len hash_size = max(1, out_path_hash_mode + 1) if out_path_hash_mode >= 0 else 1 chunk = hash_size * 2 # Truncate to meaningful bytes (firmware buffer may have trailing garbage) meaningful_hex = out_path_raw[:hop_count * chunk] # Format as HEX→HEX→HEX (each hop is hash_size*2 hex chars) hops = [meaningful_hex[i:i+chunk].upper() for i in range(0, len(meaningful_hex), chunk)] path_or_mode = '→'.join(hops) if hops else out_path_raw elif out_path_len == 0: path_or_mode = 'Direct' else: path_or_mode = 'Flood' contact = { # All original fields from contact_info 'public_key': public_key, 'type': details.get('type'), 'flags': details.get('flags'), 'out_path_len': out_path_len, 'out_path': out_path_raw, 'out_path_hash_mode': out_path_hash_mode, 'last_advert': details.get('last_advert'), 'adv_lat': details.get('adv_lat'), 'adv_lon': details.get('adv_lon'), 'lastmod': details.get('lastmod'), # Computed/convenience fields 'name': details.get('adv_name', ''), # Map adv_name to name for compatibility 'public_key_prefix': public_key[:12] if len(public_key) >= 12 else public_key, 'type_label': type_labels.get(details.get('type'), 'UNKNOWN'), 'path_or_mode': path_or_mode, # For UI display 'last_seen': details.get('last_advert'), # Alias for compatibility 'is_protected': public_key.lower() in protected_keys, # Protection status 'is_ignored': public_key.lower() in ignored_keys, 'is_blocked': public_key.lower() in blocked_keys, } contacts.append(contact) return jsonify({ 'success': True, 'contacts': contacts, 'count': len(contacts), 'limit': 350 # MeshCore device limit }), 200 except Exception as e: logger.error(f"Error getting detailed contacts list: {e}") return jsonify({ 'success': False, 'error': str(e), 'contacts': [], 'count': 0, 'limit': 350 }), 500 @api_bp.route('/contacts/delete', methods=['POST']) def delete_contact_api(): """ Delete a contact from the device. JSON body: { "selector": "" } Using public_key_prefix is recommended for reliability. Returns: JSON with deletion result: { "success": true, "message": "Contact removed successfully" } """ try: data = request.get_json() if not data or 'selector' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: selector' }), 400 selector = data['selector'] if not isinstance(selector, str) or not selector.strip(): return jsonify({ 'success': False, 'error': 'selector must be a non-empty string' }), 400 success, message = cli.delete_contact(selector) if success: # Invalidate contacts cache after deletion invalidate_contacts_cache() return jsonify({ 'success': True, 'message': message }), 200 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error deleting contact: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/contacts/cached/delete', methods=['POST']) def delete_cached_contact_api(): """Delete a cache-only contact from the database.""" try: data = request.get_json() if not data or 'public_key' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: public_key' }), 400 public_key = data['public_key'] if not isinstance(public_key, str) or not public_key.strip(): return jsonify({ 'success': False, 'error': 'public_key must be a non-empty string' }), 400 success, message = cli.delete_cached_contact(public_key) if success: return jsonify({'success': True, 'message': message}), 200 else: return jsonify({'success': False, 'error': message}), 500 except Exception as e: logger.error(f"Error deleting cached contact: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/manual-add', methods=['POST']) def manual_add_contact(): """Add a contact manually via URI or raw parameters (name, public_key, type).""" try: dm = _get_dm() if not dm: return jsonify({'success': False, 'error': 'Device manager unavailable'}), 500 data = request.get_json() or {} # Mode 1: URI (meshcore://contact/add?... or hex blob) uri = data.get('uri', '').strip() if uri: result = dm.import_contact_uri(uri) if result['success']: invalidate_contacts_cache() status = 200 if result['success'] else 400 return jsonify(result), status # Mode 2: Raw parameters name = data.get('name', '').strip() public_key = data.get('public_key', '').strip() contact_type = data.get('type', 1) if not name or not public_key: return jsonify({'success': False, 'error': 'Name and public_key are required'}), 400 try: contact_type = int(contact_type) except (ValueError, TypeError): contact_type = 1 result = dm.add_contact_manual(name, public_key, contact_type) if result['success']: invalidate_contacts_cache() status = 200 if result['success'] else 400 return jsonify(result), status except Exception as e: logger.error(f"Error adding contact manually: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//push-to-device', methods=['POST']) def push_contact_to_device(public_key): """Push a cache-only contact to the device.""" try: dm = _get_dm() if not dm: return jsonify({'success': False, 'error': 'Device manager unavailable'}), 500 result = dm.push_to_device(public_key.strip().lower()) if result['success']: invalidate_contacts_cache() status = 200 if result['success'] else 400 return jsonify(result), status except Exception as e: logger.error(f"Error pushing contact to device: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//move-to-cache', methods=['POST']) def move_contact_to_cache(public_key): """Move a device contact to cache (remove from device, keep in DB).""" try: dm = _get_dm() if not dm: return jsonify({'success': False, 'error': 'Device manager unavailable'}), 500 result = dm.move_to_cache(public_key.strip().lower()) if result['success']: invalidate_contacts_cache() status = 200 if result['success'] else 400 return jsonify(result), status except Exception as e: logger.error(f"Error moving contact to cache: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/protected', methods=['GET']) def get_protected_contacts_api(): """ Get list of all protected contact public keys. Returns: JSON with protected contacts: { "success": true, "protected_contacts": ["public_key1", "public_key2", ...], "count": 2 } """ try: protected = get_protected_contacts() return jsonify({ 'success': True, 'protected_contacts': protected, 'count': len(protected) }), 200 except Exception as e: logger.error(f"Error getting protected contacts: {e}") return jsonify({ 'success': False, 'error': str(e), 'protected_contacts': [], 'count': 0 }), 500 @api_bp.route('/contacts//protect', methods=['POST']) def toggle_contact_protection(public_key): """ Toggle protection status for a contact. Args: public_key: Full public key (64 hex chars) or prefix (12+ chars) JSON body (optional): { "protected": true/false # If not provided, toggles current state } Returns: JSON with result: { "success": true, "public_key": "full_public_key", "protected": true, "message": "Contact protected" } """ try: # Validate public_key format (at least 12 hex chars) if not public_key or not re.match(r'^[a-fA-F0-9]{12,64}$', public_key): return jsonify({ 'success': False, 'error': 'Invalid public_key format (must be 12-64 hex characters)' }), 400 public_key = public_key.lower() db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database unavailable'}), 500 # Find matching full public_key if prefix provided if len(public_key) < 64: contact = db.get_contact_by_prefix(public_key) if not contact: return jsonify({ 'success': False, 'error': f'Contact not found with public_key prefix: {public_key}' }), 404 public_key = contact['public_key'] else: contact = db.get_contact(public_key) # Check if explicit protected value provided data = request.get_json() or {} if 'protected' in data: should_protect = bool(data['protected']) else: # Toggle current state is_currently_protected = contact['is_protected'] == 1 if contact else False should_protect = not is_currently_protected # Update in database if contact: db.set_contact_protected(public_key, should_protect) else: # Contact not in DB - create minimal record db.upsert_contact(public_key, name='', is_protected=1 if should_protect else 0, source='advert') return jsonify({ 'success': True, 'public_key': public_key, 'protected': should_protect, 'message': 'Contact protected' if should_protect else 'Contact unprotected' }), 200 except Exception as e: logger.error(f"Error toggling contact protection: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/contacts//ignore', methods=['POST']) def toggle_contact_ignore(public_key): """Toggle ignore status for a contact.""" try: if not public_key or not re.match(r'^[a-fA-F0-9]{12,64}$', public_key): return jsonify({'success': False, 'error': 'Invalid public_key format'}), 400 public_key = public_key.lower() # Resolve prefix to full key via DB db = _get_db() if len(public_key) < 64 and db: contact = db.get_contact_by_prefix(public_key) if contact: public_key = contact['public_key'] else: return jsonify({'success': False, 'error': 'Contact not found'}), 404 data = request.get_json() or {} if 'ignored' in data: should_ignore = data['ignored'] else: should_ignore = not db.is_contact_ignored(public_key) # If ignoring a device contact, delete from device first if should_ignore: contact = db.get_contact(public_key) if contact and contact.get('source') == 'device': dm = _get_dm() if dm: dm.delete_contact(public_key) db.set_contact_ignored(public_key, should_ignore) invalidate_contacts_cache() return jsonify({ 'success': True, 'public_key': public_key, 'ignored': should_ignore, 'message': 'Contact ignored' if should_ignore else 'Contact unignored' }), 200 except Exception as e: logger.error(f"Error toggling contact ignore: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts//block', methods=['POST']) def toggle_contact_block(public_key): """Toggle block status for a contact.""" try: if not public_key or not re.match(r'^[a-fA-F0-9]{12,64}$', public_key): return jsonify({'success': False, 'error': 'Invalid public_key format'}), 400 public_key = public_key.lower() db = _get_db() if len(public_key) < 64 and db: contact = db.get_contact_by_prefix(public_key) if contact: public_key = contact['public_key'] else: return jsonify({'success': False, 'error': 'Contact not found'}), 404 data = request.get_json() or {} if 'blocked' in data: should_block = data['blocked'] else: should_block = not db.is_contact_blocked(public_key) # If blocking a device contact, delete from device first if should_block: contact = db.get_contact(public_key) if contact and contact.get('source') == 'device': dm = _get_dm() if dm: dm.delete_contact(public_key) db.set_contact_blocked(public_key, should_block) invalidate_contacts_cache() return jsonify({ 'success': True, 'public_key': public_key, 'blocked': should_block, 'message': 'Contact blocked' if should_block else 'Contact unblocked' }), 200 except Exception as e: logger.error(f"Error toggling contact block: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/blocked-names', methods=['GET']) def get_blocked_contact_names_api(): """Get list of current names of blocked contacts (for frontend filtering).""" try: db = _get_db() if db: names = list(db.get_blocked_contact_names()) else: names = [] return jsonify({'success': True, 'names': names}), 200 except Exception as e: logger.error(f"Error getting blocked contact names: {e}") return jsonify({'success': False, 'error': str(e), 'names': []}), 500 @api_bp.route('/contacts/block-name', methods=['POST']) def block_name_api(): """Block/unblock a contact by name (for bots without known public_key).""" try: data = request.get_json() if not data or 'name' not in data: return jsonify({'success': False, 'error': 'Missing name parameter'}), 400 name = data['name'].strip() if not name: return jsonify({'success': False, 'error': 'Name cannot be empty'}), 400 blocked = data.get('blocked', True) db = _get_db() if db: db.set_name_blocked(name, blocked) return jsonify({ 'success': True, 'name': name, 'blocked': blocked, 'message': f'Name "{name}" blocked' if blocked else f'Name "{name}" unblocked' }), 200 return jsonify({'success': False, 'error': 'Database not available'}), 500 except Exception as e: logger.error(f"Error blocking name: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/blocked-names-list', methods=['GET']) def get_blocked_names_list_api(): """Get list of directly blocked names (from blocked_names table).""" try: db = _get_db() if db: entries = db.get_blocked_names_list() else: entries = [] return jsonify({'success': True, 'blocked_names': entries}), 200 except Exception as e: logger.error(f"Error getting blocked names list: {e}") return jsonify({'success': False, 'error': str(e), 'blocked_names': []}), 500 @api_bp.route('/contacts/cleanup-settings', methods=['GET']) def get_cleanup_settings_api(): """ Get auto-cleanup settings. Returns: JSON with cleanup settings: { "success": true, "settings": { "enabled": false, "types": [1, 2, 3, 4], "date_field": "last_advert", "days": 30, "name_filter": "", "hour": 1 }, "timezone": "Europe/Warsaw" } """ try: from app.archiver.manager import get_local_timezone_name settings = get_cleanup_settings() timezone = get_local_timezone_name() return jsonify({ 'success': True, 'settings': settings, 'timezone': timezone }), 200 except Exception as e: logger.error(f"Error getting cleanup settings: {e}") return jsonify({ 'success': False, 'error': str(e), 'settings': { 'enabled': False, 'types': [1, 2, 3, 4], 'date_field': 'last_advert', 'days': 30, 'name_filter': '', 'hour': 1 }, 'timezone': 'local' }), 500 @api_bp.route('/contacts/cleanup-settings', methods=['POST']) def update_cleanup_settings_api(): """ Update auto-cleanup settings. JSON body: { "enabled": true, "types": [1, 2], "date_field": "last_advert", "days": 30, "name_filter": "", "hour": 1 } Returns: JSON with update result: { "success": true, "message": "Cleanup settings updated", "settings": {...} } """ try: data = request.get_json() or {} # Validate fields if 'types' in data: if not isinstance(data['types'], list) or not all(t in [1, 2, 3, 4] for t in data['types']): return jsonify({ 'success': False, 'error': 'Invalid types (must be list of 1, 2, 3, 4)' }), 400 if 'date_field' in data: if data['date_field'] not in ['last_advert', 'lastmod']: return jsonify({ 'success': False, 'error': 'Invalid date_field (must be "last_advert" or "lastmod")' }), 400 if 'days' in data: if not isinstance(data['days'], int) or data['days'] < 0: return jsonify({ 'success': False, 'error': 'Invalid days (must be non-negative integer)' }), 400 if 'enabled' in data: if not isinstance(data['enabled'], bool): return jsonify({ 'success': False, 'error': 'Invalid enabled (must be boolean)' }), 400 if 'hour' in data: if not isinstance(data['hour'], int) or data['hour'] < 0 or data['hour'] > 23: return jsonify({ 'success': False, 'error': 'Invalid hour (must be integer 0-23)' }), 400 # Get current settings and merge with new values current = get_cleanup_settings() updated = {**current, **data} # Save settings if not save_cleanup_settings(updated): return jsonify({ 'success': False, 'error': 'Failed to save cleanup settings' }), 500 # Update scheduler based on enabled state and hour from app.archiver.manager import schedule_cleanup, get_local_timezone_name schedule_cleanup(enabled=updated.get('enabled', False), hour=updated.get('hour', 1)) return jsonify({ 'success': True, 'message': 'Cleanup settings updated', 'settings': updated, 'timezone': get_local_timezone_name() }), 200 except Exception as e: logger.error(f"Error updating cleanup settings: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ============================================================================= # Contact Management (Pending Contacts & Settings) # ============================================================================= @api_bp.route('/contacts/pending', methods=['GET']) def get_pending_contacts_api(): """ Get list of contacts awaiting manual approval. Query parameters: types (list[int]): Filter by contact types (optional) Example: ?types=1&types=2 (COM and REP only) Valid values: 1 (COM), 2 (REP), 3 (ROOM), 4 (SENS) If not provided, returns all pending contacts Returns: JSON with pending contacts list with enriched contact data: { "success": true, "pending": [ { "name": "KRK - WD 🔌", "public_key": "2d86b4a747b6565ad1...", "public_key_prefix": "2d86b4a747b6", "type": 2, "type_label": "REP", "adv_lat": 50.02377, "adv_lon": 19.96038, "last_advert": 1715889153, "lastmod": 1716372319, "out_path_len": -1, "out_path": "", "path_or_mode": "Flood" }, ... ], "count": 2 } """ try: # Get type filter from query params types_param = request.args.getlist('types', type=int) # Validate types (must be 1-4) if types_param: invalid_types = [t for t in types_param if t not in [1, 2, 3, 4]] if invalid_types: return jsonify({ 'success': False, 'error': f'Invalid types: {invalid_types}. Valid types: 1 (COM), 2 (REP), 3 (ROOM), 4 (SENS)', 'pending': [] }), 400 success, pending, error = cli.get_pending_contacts() if success: # Filter out ignored/blocked contacts from pending list db = _get_db() if db: ignored_keys = db.get_ignored_keys() blocked_keys = db.get_blocked_keys() excluded = ignored_keys | blocked_keys pending = [c for c in pending if c.get('public_key', '').lower() not in excluded] # Add type_label for frontend display type_labels = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'} for c in pending: c['type_label'] = type_labels.get(c.get('type', 0), 'COM') pk = c.get('public_key', '') c['public_key_prefix'] = pk[:12] if len(pk) >= 12 else pk # Filter by types if specified if types_param: pending = [contact for contact in pending if contact.get('type') in types_param] return jsonify({ 'success': True, 'pending': pending, 'count': len(pending) }), 200 else: return jsonify({ 'success': False, 'error': error or 'Failed to get pending contacts', 'pending': [] }), 500 except Exception as e: logger.error(f"Error getting pending contacts: {e}") return jsonify({ 'success': False, 'error': str(e), 'pending': [] }), 500 @api_bp.route('/contacts/pending/approve', methods=['POST']) def approve_pending_contact_api(): """ Approve and add a pending contact. JSON body: { "public_key": "" } IMPORTANT: Always send the full public_key (not name or prefix). Full public key works for all contact types (COM, ROOM, REP, SENS). Returns: JSON with approval result: { "success": true, "message": "Contact approved successfully" } """ try: data = request.get_json() if not data or 'public_key' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: public_key' }), 400 public_key = data['public_key'] if not isinstance(public_key, str) or not public_key.strip(): return jsonify({ 'success': False, 'error': 'public_key must be a non-empty string' }), 400 success, message = cli.approve_pending_contact(public_key) if success: # Invalidate contacts cache after adding new contact invalidate_contacts_cache() return jsonify({ 'success': True, 'message': message or 'Contact approved successfully' }), 200 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error approving pending contact: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/contacts/pending/reject', methods=['POST']) def reject_pending_contact_api(): """Reject a pending contact (remove from pending list without adding).""" try: data = request.get_json() if not data or 'public_key' not in data: return jsonify({'success': False, 'error': 'Missing required field: public_key'}), 400 dm = _get_dm() if not dm: return jsonify({'success': False, 'error': 'Device not connected'}), 503 result = dm.reject_contact(data['public_key']) status = 200 if result['success'] else 500 return jsonify(result), status except Exception as e: logger.error(f"Error rejecting pending contact: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/pending/clear', methods=['POST']) def clear_pending_contacts_api(): """Clear all pending contacts.""" try: dm = _get_dm() if not dm: return jsonify({'success': False, 'error': 'Device not connected'}), 503 result = dm.clear_pending_contacts() status = 200 if result['success'] else 500 return jsonify(result), status except Exception as e: logger.error(f"Error clearing pending contacts: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/device/settings', methods=['GET']) def get_device_settings_api(): """ Get persistent device settings. Returns: JSON with settings: { "success": true, "settings": { "manual_add_contacts": false } } """ try: success, settings = cli.get_device_settings() if success: return jsonify({ 'success': True, 'settings': settings }), 200 else: return jsonify({ 'success': False, 'error': 'Failed to get device settings', 'settings': {'manual_add_contacts': False} }), 500 except Exception as e: logger.error(f"Error getting device settings: {e}") return jsonify({ 'success': False, 'error': str(e), 'settings': {'manual_add_contacts': False} }), 500 @api_bp.route('/device/settings', methods=['POST']) def update_device_settings_api(): """ Update persistent device settings. JSON body: { "manual_add_contacts": true/false } This setting is: 1. Saved to database for persistence across container restarts 2. Applied immediately to the running meshcli session Returns: JSON with update result: { "success": true, "message": "manual_add_contacts set to on" } """ try: data = request.get_json() if not data or 'manual_add_contacts' not in data: return jsonify({ 'success': False, 'error': 'Missing required field: manual_add_contacts' }), 400 manual_add_contacts = data['manual_add_contacts'] if not isinstance(manual_add_contacts, bool): return jsonify({ 'success': False, 'error': 'manual_add_contacts must be a boolean' }), 400 success, message = cli.set_manual_add_contacts(manual_add_contacts) if success: return jsonify({ 'success': True, 'message': message, 'settings': {'manual_add_contacts': manual_add_contacts} }), 200 else: return jsonify({ 'success': False, 'error': message }), 500 except Exception as e: logger.error(f"Error updating device settings: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/contacts/settings', methods=['GET']) def get_contacts_settings_api(): """Get contact-management UI/behaviour settings. Returns: { "success": true, "settings": { "manual_add_contacts": bool, "suppress_advert_notifications": bool, "auto_ignore_new_adverts": bool } } """ try: success, dev_settings = cli.get_device_settings() manual = bool(dev_settings.get('manual_add_contacts', False)) if success else False db = _get_db() suppress = bool(db.get_setting_json('suppress_advert_notifications', False)) if db else False auto_ignore = bool(db.get_setting_json('auto_ignore_new_adverts', False)) if db else False return jsonify({ 'success': True, 'settings': { 'manual_add_contacts': manual, 'suppress_advert_notifications': suppress, 'auto_ignore_new_adverts': auto_ignore, } }), 200 except Exception as e: logger.error(f"Error getting contacts settings: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/contacts/settings', methods=['POST']) def update_contacts_settings_api(): """Update one or more contact-management settings (partial payload allowed). JSON body keys (all optional): manual_add_contacts: bool -> applied to device + persisted suppress_advert_notifications: bool -> persisted (frontend-only behaviour) auto_ignore_new_adverts: bool -> persisted, used by device_manager """ try: data = request.get_json() or {} if not isinstance(data, dict): return jsonify({'success': False, 'error': 'Invalid payload'}), 400 db = _get_db() if 'manual_add_contacts' in data: val = data['manual_add_contacts'] if not isinstance(val, bool): return jsonify({'success': False, 'error': 'manual_add_contacts must be a boolean'}), 400 success, message = cli.set_manual_add_contacts(val) if not success: return jsonify({'success': False, 'error': message}), 500 if 'suppress_advert_notifications' in data: val = data['suppress_advert_notifications'] if not isinstance(val, bool): return jsonify({'success': False, 'error': 'suppress_advert_notifications must be a boolean'}), 400 if db: db.set_setting_json('suppress_advert_notifications', val) if 'auto_ignore_new_adverts' in data: val = data['auto_ignore_new_adverts'] if not isinstance(val, bool): return jsonify({'success': False, 'error': 'auto_ignore_new_adverts must be a boolean'}), 400 if db: db.set_setting_json('auto_ignore_new_adverts', val) return jsonify({'success': True}), 200 except Exception as e: logger.error(f"Error updating contacts settings: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Regions (MeshCore flood scopes) — Settings > Channels tab # ============================================================================= @api_bp.route('/regions', methods=['GET']) def list_regions_api(): """List the device's region registry.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 return jsonify({ 'success': True, 'regions': db.list_regions(), }), 200 except Exception as e: logger.error(f"Error listing regions: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/regions', methods=['POST']) def create_region_api(): """Create a new region. Body: {name: str}. Key is derived from the name.""" try: data = request.get_json() or {} name = (data.get('name') or '').strip() ok, err = is_valid_region_name(name) if not ok: return jsonify({'success': False, 'error': err}), 400 db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 if db.get_region_by_name(name) is not None: return jsonify({'success': False, 'error': f'Region "{name}" already exists'}), 409 import sqlite3 try: rid = db.create_region(name, derive_scope_key_hex(name)) except sqlite3.IntegrityError: return jsonify({'success': False, 'error': f'Region "{name}" already exists'}), 409 return jsonify({'success': True, 'region': db.get_region(rid)}), 201 except Exception as e: logger.error(f"Error creating region: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/regions/', methods=['DELETE']) def delete_region_api(region_id): """Delete a region. Channels mapped to it have their scope cleared (cascade).""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 region = db.get_region(region_id) if region is None: return jsonify({'success': False, 'error': 'Region not found'}), 404 was_default = bool(region.get('is_default')) db.delete_region(region_id) # If we just deleted the firmware default, best-effort clear it on device. warning = None if was_default: dm = _get_dm() if dm and dm.is_connected: try: res = dm.set_default_flood_scope('', '') if not res.get('success'): warning = f"{res.get('error')} Firmware default left as-is." except Exception as e: warning = f"Could not clear firmware default: {e}" out = {'success': True} if warning: out['warning'] = warning return jsonify(out), 200 except Exception as e: logger.error(f"Error deleting region: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/channels/scopes', methods=['GET']) def list_channel_scopes_api(): """Bulk-load the per-channel region mapping for UI rendering. Returns: {"success": true, "scopes": {"0": {region_id, name, key_hex, is_default}, ...}} """ try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 scopes = db.get_all_channel_scopes() # JSON object keys must be strings return jsonify({ 'success': True, 'scopes': {str(k): v for k, v in scopes.items()}, }), 200 except Exception as e: logger.error(f"Error listing channel scopes: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/channels//scope', methods=['PUT']) def set_channel_scope_api(index): """Assign or clear the region scope for a channel. Body: {"region_id": int | null}. null removes the mapping (firmware default applies). """ try: # Use the device-reported max so we don't reject valid slots on # firmwares that expose more than 8 channels (current firmware reports # max_channels=40). Fall back to 8 when the DM isn't reachable. dm = _get_dm() max_channels = getattr(dm, '_max_channels', 8) if dm else 8 if index < 0 or index >= max_channels: return jsonify({ 'success': False, 'error': f'Channel index out of range (0-{max_channels - 1})', }), 400 data = request.get_json() or {} if 'region_id' not in data: return jsonify({'success': False, 'error': 'region_id is required (int or null)'}), 400 region_id = data['region_id'] db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 if region_id is not None: if not isinstance(region_id, int): return jsonify({'success': False, 'error': 'region_id must be an integer or null'}), 400 if db.get_region(region_id) is None: return jsonify({'success': False, 'error': 'Region not found'}), 404 db.set_channel_scope(index, region_id) return jsonify({'success': True, 'scope': db.get_channel_scope(index)}), 200 except Exception as e: logger.error(f"Error setting channel scope: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/regions/default', methods=['DELETE']) def clear_default_region_api(): """Clear the default-region flag in the DB and the firmware default (CMD 63). If the firmware push fails the DB flag is still cleared; we return 200 with a non-blocking `warning` so the UI can toast it. """ try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 had_default = db.get_default_region() is not None db.set_default_region(None) warning = None if had_default: dm = _get_dm() if dm and dm.is_connected: try: res = dm.set_default_flood_scope('', '') if not res.get('success'): warning = f"{res.get('error')} Your choice is saved locally." except Exception as e: warning = f"Could not clear firmware default: {e}. Your choice is saved locally." else: warning = 'Device disconnected — choice saved locally; firmware default not updated.' out = {'success': True} if warning: out['warning'] = warning return jsonify(out), 200 except Exception as e: logger.error(f"Error clearing default region: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/regions//default', methods=['POST']) def set_default_region_api(region_id): """Mark a region as default in the DB AND push it to the firmware (CMD 63). If the firmware push fails the DB flag is still flipped; we return 200 with a non-blocking `warning` so the UI can toast it. """ try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 region = db.get_region(region_id) if region is None: return jsonify({'success': False, 'error': 'Region not found'}), 404 db.set_default_region(region_id) warning = None dm = _get_dm() if dm and dm.is_connected: try: res = dm.set_default_flood_scope(region['name'], region['key_hex']) if not res.get('success'): # device_manager already returns a user-friendly message here. warning = f"{res.get('error')} Your choice is saved locally." except Exception as e: warning = f"Could not push to firmware: {e}. Your choice is saved locally." else: warning = 'Device disconnected — choice saved locally; firmware default not updated.' out = {'success': True, 'region': db.get_region(region_id)} if warning: out['warning'] = warning return jsonify(out), 200 except Exception as e: logger.error(f"Error setting default region: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Analyzers (user-configured MeshCore Analyzer services) — Settings > Analyzer tab # ============================================================================= def _validate_analyzer_url_template(url_template: str): """Return (ok, error_msg). Validates the template the frontend will substitute.""" if not url_template: return False, 'URL is required' if not (url_template.startswith('http://') or url_template.startswith('https://')): return False, 'URL must start with http:// or https://' if ANALYZER_PLACEHOLDER not in url_template: return False, f'URL must contain the {ANALYZER_PLACEHOLDER} placeholder' return True, None @api_bp.route('/analyzers', methods=['GET']) def list_analyzers_api(): """List user-configured analyzers and the built-in Letsmesh URL template.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 return jsonify({ 'success': True, 'analyzers': db.list_analyzers(), 'letsmesh_url_template': LETSMESH_ANALYZER_URL_TEMPLATE, }), 200 except Exception as e: logger.error(f"Error listing analyzers: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/analyzers', methods=['POST']) def create_analyzer_api(): """Create a new analyzer. Body: {name, url_template}.""" try: data = request.get_json() or {} name = (data.get('name') or '').strip() url_template = (data.get('url_template') or '').strip() if not name: return jsonify({'success': False, 'error': 'Name is required'}), 400 ok, err = _validate_analyzer_url_template(url_template) if not ok: return jsonify({'success': False, 'error': err}), 400 db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 import sqlite3 try: aid = db.create_analyzer(name, url_template) except sqlite3.IntegrityError: return jsonify({'success': False, 'error': f'Analyzer "{name}" already exists'}), 409 return jsonify({'success': True, 'analyzer': db.get_analyzer(aid)}), 201 except Exception as e: logger.error(f"Error creating analyzer: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/analyzers/', methods=['PUT']) def update_analyzer_api(analyzer_id): """Update an analyzer. Body: {name?, url_template?, is_disabled?}.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 if db.get_analyzer(analyzer_id) is None: return jsonify({'success': False, 'error': 'Analyzer not found'}), 404 data = request.get_json() or {} name = data.get('name') url_template = data.get('url_template') is_disabled = data.get('is_disabled') if name is not None: name = name.strip() if not name: return jsonify({'success': False, 'error': 'Name cannot be empty'}), 400 if url_template is not None: url_template = url_template.strip() ok, err = _validate_analyzer_url_template(url_template) if not ok: return jsonify({'success': False, 'error': err}), 400 import sqlite3 try: db.update_analyzer(analyzer_id, name=name, url_template=url_template, is_disabled=is_disabled) except sqlite3.IntegrityError: return jsonify({'success': False, 'error': f'Analyzer "{name}" already exists'}), 409 return jsonify({'success': True, 'analyzer': db.get_analyzer(analyzer_id)}), 200 except Exception as e: logger.error(f"Error updating analyzer: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/analyzers/', methods=['DELETE']) def delete_analyzer_api(analyzer_id): """Delete an analyzer.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 if db.get_analyzer(analyzer_id) is None: return jsonify({'success': False, 'error': 'Analyzer not found'}), 404 db.delete_analyzer(analyzer_id) return jsonify({'success': True}), 200 except Exception as e: logger.error(f"Error deleting analyzer: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/analyzers/default', methods=['DELETE']) def clear_default_analyzer_api(): """Clear the default-analyzer flag.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 db.set_default_analyzer(None) return jsonify({'success': True}), 200 except Exception as e: logger.error(f"Error clearing default analyzer: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/analyzers//default', methods=['POST']) def set_default_analyzer_api(analyzer_id): """Mark an analyzer as default. Clears any previous default in the same transaction.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 500 if db.get_analyzer(analyzer_id) is None: return jsonify({'success': False, 'error': 'Analyzer not found'}), 404 db.set_default_analyzer(analyzer_id) return jsonify({'success': True, 'analyzer': db.get_analyzer(analyzer_id)}), 200 except Exception as e: logger.error(f"Error setting default analyzer: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Message Retention Settings # ============================================================================= @api_bp.route('/retention-settings', methods=['GET']) def get_retention_settings_api(): """Get current message retention settings.""" try: from app.archiver.manager import get_local_timezone_name settings = get_retention_settings() settings['timezone'] = get_local_timezone_name() return jsonify({'success': True, **settings}) except Exception as e: logger.error(f"Error getting retention settings: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/retention-settings', methods=['POST']) def update_retention_settings_api(): """Update message retention settings and reschedule job.""" try: data = request.get_json() if not data: return jsonify({'success': False, 'error': 'No data provided'}), 400 current = get_retention_settings() current['enabled'] = data.get('enabled', current['enabled']) current['days'] = max(1, min(data.get('days', current['days']), 3650)) current['include_dms'] = data.get('include_dms', current['include_dms']) current['include_adverts'] = data.get('include_adverts', current['include_adverts']) current['hour'] = max(0, min(data.get('hour', current['hour']), 23)) if not save_retention_settings(current): return jsonify({'success': False, 'error': 'Failed to save settings'}), 500 from app.archiver.manager import schedule_retention schedule_retention(enabled=current['enabled'], hour=current['hour']) return jsonify({ 'success': True, 'message': f"Retention {'enabled' if current['enabled'] else 'disabled'}", 'settings': current }) except Exception as e: logger.error(f"Error updating retention settings: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # VACUUM runs in a worker thread so a multi-second SQLite rewrite can't be # killed by an upstream reverse proxy's response timeout (~30 s on most # defaults). The UI polls /api/db/vacuum/status until 'running' flips false. _vacuum_state_lock = threading.Lock() _vacuum_state = { 'running': False, 'started_at': None, 'finished_at': None, 'result': None, # dict from db.vacuum() on success 'error': None, # string on failure } def _run_vacuum_in_thread(app, db): with app.app_context(): try: stats = db.vacuum() logger.info( f"Manual VACUUM: {stats['size_before']:,} -> {stats['size_after']:,} " f"bytes (freed {stats['freed']:,} in {stats['elapsed_seconds']}s)" ) with _vacuum_state_lock: _vacuum_state['result'] = stats _vacuum_state['error'] = None except Exception as e: logger.error(f"VACUUM failed: {e}", exc_info=True) with _vacuum_state_lock: _vacuum_state['result'] = None _vacuum_state['error'] = str(e) finally: with _vacuum_state_lock: _vacuum_state['running'] = False _vacuum_state['finished_at'] = time.time() @api_bp.route('/db/vacuum', methods=['POST']) def vacuum_database_api(): """Kick off a SQLite VACUUM in the background. Returns 202 immediately so a slow VACUUM can't be killed by a reverse proxy timeout. Poll /api/db/vacuum/status for the outcome. """ db = _get_db() if db is None: return jsonify({'success': False, 'error': 'Database not available'}), 500 with _vacuum_state_lock: if _vacuum_state['running']: return jsonify({ 'success': False, 'running': True, 'error': 'VACUUM already in progress', }), 409 _vacuum_state['running'] = True _vacuum_state['started_at'] = time.time() _vacuum_state['finished_at'] = None _vacuum_state['result'] = None _vacuum_state['error'] = None app = current_app._get_current_object() threading.Thread( target=_run_vacuum_in_thread, args=(app, db), name='vacuum-worker', daemon=True, ).start() return jsonify({'success': True, 'running': True, 'started': True}), 202 @api_bp.route('/db/vacuum/status', methods=['GET']) def vacuum_status_api(): """Return current VACUUM status. UI polls this until running=false.""" with _vacuum_state_lock: state = dict(_vacuum_state) payload = { 'running': state['running'], 'started_at': state['started_at'], 'finished_at': state['finished_at'], } if state['running'] and state['started_at']: payload['elapsed_seconds'] = round(time.time() - state['started_at'], 1) if state['result'] is not None: payload['success'] = True payload.update(state['result']) elif state['error'] is not None: payload['success'] = False payload['error'] = state['error'] else: # Idle (never run since boot) or running with no result yet payload['success'] = None if state['running'] else True return jsonify(payload) @api_bp.route('/db/size', methods=['GET']) def get_database_size_api(): """Return current database file size in bytes (for the Optimize DB UI).""" try: db = _get_db() if db is None: return jsonify({'success': False, 'error': 'Database not available'}), 500 from pathlib import Path path = Path(db.db_path) size = path.stat().st_size if path.exists() else 0 return jsonify({'success': True, 'size': size, 'path': str(path)}) except Exception as e: logger.error(f"Failed to read DB size: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Read Status (Server-side message read tracking) # ============================================================================= @api_bp.route('/read_status', methods=['GET']) def get_read_status_api(): """ Get server-side read status for all channels and DM conversations. This replaces localStorage-based tracking to enable cross-device synchronization. Returns: JSON with read status: { "success": true, "channels": { "0": 1735900000, "1": 1735900100 }, "dm": { "name_User1": 1735900200, "pk_abc123": 1735900300 } } """ try: from app import read_status status = read_status.load_read_status() return jsonify({ 'success': True, 'channels': status['channels'], 'dm': status['dm'], 'muted_channels': status.get('muted_channels', []), 'favorite_channels': status.get('favorite_channels', []) }), 200 except Exception as e: logger.error(f"Error getting read status: {e}") return jsonify({ 'success': False, 'error': str(e), 'channels': {}, 'dm': {}, 'muted_channels': [], 'favorite_channels': [] }), 500 @api_bp.route('/version', methods=['GET']) def get_version(): """ Get application version. Returns: JSON with version info: { "success": true, "version": "2025.01.18+576c8ca9", "docker_tag": "2025.01.18-576c8ca9", "branch": "dev" } """ from app.version import VERSION_STRING, DOCKER_TAG, GIT_BRANCH return jsonify({ 'success': True, 'version': VERSION_STRING, 'docker_tag': DOCKER_TAG, 'branch': GIT_BRANCH }), 200 # GitHub repository for update checks GITHUB_REPO = "MarekWo/mc-webui" @api_bp.route('/check-update', methods=['GET']) def check_update(): """ Check if a newer version is available on GitHub. Compares current commit hash with latest commit on GitHub. Uses the branch from frozen version (dev/main) automatically. Query parameters: branch (str): Branch to check (default: from frozen version) Returns: JSON with update status: { "success": true, "update_available": true, "current_version": "2026.01.18+abc1234", "current_commit": "abc1234", "current_branch": "dev", "latest_commit": "def5678", "latest_date": "2026.01.20", "latest_message": "feat: New feature", "github_url": "https://github.com/MarekWo/mc-webui/commits/dev" } """ from app.version import VERSION_STRING, GIT_BRANCH try: # Use branch from frozen version, or allow override via query param branch = request.args.get('branch', GIT_BRANCH) # Extract current commit hash from VERSION_STRING (format: YYYY.MM.DD+hash or YYYY.MM.DD+hash+dirty) current_commit = None if '+' in VERSION_STRING: parts = VERSION_STRING.split('+') if len(parts) >= 2: current_commit = parts[1] # Get hash part (skip date, ignore +dirty) if not current_commit or current_commit == 'unknown': return jsonify({ 'success': False, 'error': 'Cannot determine current version. Run version freeze first.' }), 400 # Fetch latest commit from GitHub API github_api_url = f"https://api.github.com/repos/{GITHUB_REPO}/commits/{branch}" headers = { 'Accept': 'application/vnd.github.v3+json', 'User-Agent': 'mc-webui-update-checker' } response = requests.get(github_api_url, headers=headers, timeout=10) if response.status_code == 403: return jsonify({ 'success': False, 'error': 'GitHub API rate limit exceeded. Try again later.' }), 429 if response.status_code != 200: return jsonify({ 'success': False, 'error': f'GitHub API error: {response.status_code}' }), 502 data = response.json() latest_full_sha = data.get('sha', '') latest_commit = latest_full_sha[:7] # Short hash (7 chars like git default) # Get commit details commit_info = data.get('commit', {}) latest_message = commit_info.get('message', '').split('\n')[0] # First line only commit_date = commit_info.get('committer', {}).get('date', '') # Parse date to YYYY.MM.DD format latest_date = '' if commit_date: try: dt = datetime.fromisoformat(commit_date.replace('Z', '+00:00')) latest_date = dt.strftime('%Y.%m.%d') except ValueError: latest_date = commit_date[:10] # Compare commits (case-insensitive, compare first 7 chars) update_available = current_commit.lower()[:7] != latest_commit.lower()[:7] return jsonify({ 'success': True, 'update_available': update_available, 'current_version': VERSION_STRING, 'current_commit': current_commit[:7], 'current_branch': branch, 'latest_commit': latest_commit, 'latest_date': latest_date, 'latest_message': latest_message, 'github_url': f"https://github.com/{GITHUB_REPO}/commits/{branch}" }), 200 except requests.Timeout: return jsonify({ 'success': False, 'error': 'GitHub API request timed out' }), 504 except requests.RequestException as e: logger.error(f"Error checking for updates: {e}") return jsonify({ 'success': False, 'error': f'Network error: {str(e)}' }), 502 except Exception as e: logger.error(f"Error checking for updates: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ============================================================================= # Remote Update (via host webhook) # ============================================================================= # Updater webhook URL - tries multiple addresses for Docker compatibility UPDATER_URLS = [ 'http://host.docker.internal:5050', # Docker Desktop (Mac/Windows) 'http://172.17.0.1:5050', # Docker default bridge gateway (Linux) 'http://127.0.0.1:5050', # Localhost fallback ] def get_updater_url(): """Find working updater webhook URL.""" for url in UPDATER_URLS: try: response = requests.get(f"{url}/health", timeout=2) if response.status_code == 200: return url except requests.RequestException: continue return None @api_bp.route('/updater/status', methods=['GET']) def updater_status(): """ Check if the update webhook is available on the host. Returns: JSON with updater status: { "success": true, "available": true, "url": "http://172.17.0.1:5050", "update_in_progress": false } """ try: url = get_updater_url() if not url: return jsonify({ 'success': True, 'available': False, 'message': 'Update webhook not installed or not running' }), 200 # Get detailed status from webhook response = requests.get(f"{url}/health", timeout=5) data = response.json() return jsonify({ 'success': True, 'available': True, 'url': url, 'update_in_progress': data.get('update_in_progress', False), 'mcwebui_dir': data.get('mcwebui_dir', '') }), 200 except Exception as e: logger.error(f"Error checking updater status: {e}") return jsonify({ 'success': False, 'available': False, 'error': str(e) }), 200 @api_bp.route('/updater/trigger', methods=['POST']) def updater_trigger(): """ Trigger remote update via host webhook. This will: 1. Call the webhook to start update.sh 2. The server will restart (containers rebuilt) 3. Frontend should poll /api/version to detect completion Returns: JSON with result: { "success": true, "message": "Update started" } """ try: url = get_updater_url() if not url: return jsonify({ 'success': False, 'error': 'Update webhook not available. Install it first.' }), 503 # Trigger update response = requests.post(f"{url}/update", timeout=10) data = response.json() if response.status_code == 200 and data.get('success'): return jsonify({ 'success': True, 'message': 'Update started. Server will restart shortly.' }), 200 else: return jsonify({ 'success': False, 'error': data.get('error', 'Unknown error') }), response.status_code except requests.Timeout: # Timeout might mean the update started and server is restarting return jsonify({ 'success': True, 'message': 'Update may have started (request timed out)' }), 200 except Exception as e: logger.error(f"Error triggering update: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/read_status/mark_read', methods=['POST']) def mark_read_api(): """ Mark a channel or DM conversation as read. JSON body (one of the following): {"type": "channel", "channel_idx": 0, "timestamp": 1735900000} {"type": "dm", "conversation_id": "name_User1", "timestamp": 1735900200} Returns: JSON with result: { "success": true, "message": "Channel marked as read" } """ try: from app import read_status data = request.get_json() if not data: return jsonify({ 'success': False, 'error': 'Missing JSON body' }), 400 msg_type = data.get('type') timestamp = data.get('timestamp') if not msg_type or not timestamp: return jsonify({ 'success': False, 'error': 'Missing required fields: type and timestamp' }), 400 if msg_type == 'channel': channel_idx = data.get('channel_idx') if channel_idx is None: return jsonify({ 'success': False, 'error': 'Missing required field: channel_idx' }), 400 success = read_status.mark_channel_read(channel_idx, timestamp) if success: return jsonify({ 'success': True, 'message': f'Channel {channel_idx} marked as read' }), 200 else: return jsonify({ 'success': False, 'error': 'Failed to save read status' }), 500 elif msg_type == 'dm': conversation_id = data.get('conversation_id') if not conversation_id: return jsonify({ 'success': False, 'error': 'Missing required field: conversation_id' }), 400 success = read_status.mark_dm_read(conversation_id, timestamp) if success: return jsonify({ 'success': True, 'message': f'DM conversation {conversation_id} marked as read' }), 200 else: return jsonify({ 'success': False, 'error': 'Failed to save read status' }), 500 else: return jsonify({ 'success': False, 'error': f'Invalid type: {msg_type}. Must be "channel" or "dm"' }), 400 except Exception as e: logger.error(f"Error marking as read: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/read_status/mark_all_read', methods=['POST']) def mark_all_read_api(): """Mark all channels as read in bulk.""" try: from app import read_status data = request.get_json() if not data or 'channels' not in data: return jsonify({'success': False, 'error': 'Missing channels timestamps'}), 400 success = read_status.mark_all_channels_read(data['channels']) if success: return jsonify({'success': True, 'message': 'All channels marked as read'}), 200 else: return jsonify({'success': False, 'error': 'Failed to save'}), 500 except Exception as e: logger.error(f"Error marking all as read: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/channels/muted', methods=['GET']) def get_muted_channels_api(): """Get list of muted channel indices.""" try: from app import read_status muted = read_status.get_muted_channels() return jsonify({'success': True, 'muted_channels': muted}), 200 except Exception as e: logger.error(f"Error getting muted channels: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/channels//mute', methods=['POST']) def set_channel_muted_api(index): """Set mute state for a channel.""" try: from app import read_status data = request.get_json() if data is None or 'muted' not in data: return jsonify({'success': False, 'error': 'Missing muted field'}), 400 success = read_status.set_channel_muted(index, data['muted']) if success: return jsonify({ 'success': True, 'message': f'Channel {index} {"muted" if data["muted"] else "unmuted"}' }), 200 else: return jsonify({'success': False, 'error': 'Failed to save'}), 500 except Exception as e: logger.error(f"Error setting channel mute: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/channels/favorites', methods=['GET']) def get_favorite_channels_api(): """Get list of favorite channel indices.""" try: from app import read_status favorites = read_status.get_favorite_channels() return jsonify({'success': True, 'favorite_channels': favorites}), 200 except Exception as e: logger.error(f"Error getting favorite channels: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/channels//favorite', methods=['POST']) def set_channel_favorite_api(index): """Set favorite state for a channel.""" try: from app import read_status data = request.get_json() if data is None or 'favorite' not in data: return jsonify({'success': False, 'error': 'Missing favorite field'}), 400 success = read_status.set_channel_favorite(index, data['favorite']) if success: return jsonify({ 'success': True, 'message': f'Channel {index} {"favorited" if data["favorite"] else "unfavorited"}' }), 200 else: return jsonify({'success': False, 'error': 'Failed to save'}), 500 except Exception as e: logger.error(f"Error setting channel favorite: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================ # Console History API # ============================================================ CONSOLE_HISTORY_FILE = 'console_history.json' CONSOLE_HISTORY_MAX_SIZE = 50 def _get_console_history_path(): """Get path to console history file""" return Path(config.MC_CONFIG_DIR) / CONSOLE_HISTORY_FILE def _load_console_history(): """Load console history from file""" history_path = _get_console_history_path() try: if history_path.exists(): with open(history_path, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('commands', []) except Exception as e: logger.error(f"Error loading console history: {e}") return [] def _save_console_history(commands): """Save console history to file""" history_path = _get_console_history_path() try: # Ensure directory exists history_path.parent.mkdir(parents=True, exist_ok=True) with open(history_path, 'w', encoding='utf-8') as f: json.dump({'commands': commands}, f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"Error saving console history: {e}") return False @api_bp.route('/console/history', methods=['GET']) def get_console_history(): """Get console command history""" try: commands = _load_console_history() return jsonify({ 'success': True, 'commands': commands }), 200 except Exception as e: logger.error(f"Error getting console history: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/console/history', methods=['POST']) def add_console_history(): """Add command to console history""" try: data = request.get_json() if not data or 'command' not in data: return jsonify({ 'success': False, 'error': 'Missing command field' }), 400 command = data['command'].strip() if not command: return jsonify({ 'success': False, 'error': 'Empty command' }), 400 # Load existing history commands = _load_console_history() # Remove command if already exists (will be moved to end) if command in commands: commands.remove(command) # Add to end commands.append(command) # Limit size if len(commands) > CONSOLE_HISTORY_MAX_SIZE: commands = commands[-CONSOLE_HISTORY_MAX_SIZE:] # Save if _save_console_history(commands): return jsonify({ 'success': True, 'commands': commands }), 200 else: return jsonify({ 'success': False, 'error': 'Failed to save history' }), 500 except Exception as e: logger.error(f"Error adding console history: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @api_bp.route('/advertisements', methods=['GET']) def get_advertisements(): """Get advertisement history, optionally filtered by public key.""" db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 try: limit = request.args.get('limit', 100, type=int) public_key = request.args.get('pubkey', None) limit = max(1, min(limit, 1000)) adverts = db.get_advertisements(limit=limit, public_key=public_key) # Enrich with contact name lookup names = get_all_names() for adv in adverts: pk = adv.get('public_key', '') adv['contact_name'] = names.get(pk, adv.get('name', '')) return jsonify({ 'success': True, 'advertisements': adverts, 'count': len(adverts) }) except Exception as e: logger.error(f"Error fetching advertisements: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/console/history', methods=['DELETE']) def clear_console_history(): """Clear console command history""" try: if _save_console_history([]): return jsonify({ 'success': True, 'message': 'History cleared' }), 200 else: return jsonify({ 'success': False, 'error': 'Failed to clear history' }), 500 except Exception as e: logger.error(f"Error clearing console history: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 # ============================================================ # Console Output History API (persistent transcript) # ============================================================ CONSOLE_OUTPUT_FILE = 'console_output_history.json' CONSOLE_OUTPUT_MAX_ENTRIES = 500 CONSOLE_OUTPUT_ALLOWED_TYPES = {'command', 'response', 'error', 'system'} def _get_console_output_path(): return Path(config.MC_CONFIG_DIR) / CONSOLE_OUTPUT_FILE def _load_console_output(): path = _get_console_output_path() try: if path.exists(): with open(path, 'r', encoding='utf-8') as f: data = json.load(f) return data.get('entries', []) except Exception as e: logger.error(f"Error loading console output history: {e}") return [] def _save_console_output(entries): path = _get_console_output_path() try: path.parent.mkdir(parents=True, exist_ok=True) with open(path, 'w', encoding='utf-8') as f: json.dump({'entries': entries}, f, ensure_ascii=False) return True except Exception as e: logger.error(f"Error saving console output history: {e}") return False @api_bp.route('/console/output', methods=['GET']) def get_console_output(): """Get persisted console output transcript.""" try: return jsonify({'success': True, 'entries': _load_console_output()}), 200 except Exception as e: logger.error(f"Error getting console output: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/console/output', methods=['POST']) def add_console_output(): """Append an entry to the console output transcript.""" try: data = request.get_json() or {} entry_type = data.get('type', 'system') text = data.get('text', '') if entry_type not in CONSOLE_OUTPUT_ALLOWED_TYPES: return jsonify({'success': False, 'error': 'Invalid type'}), 400 if not isinstance(text, str) or not text: return jsonify({'success': False, 'error': 'Empty text'}), 400 entries = _load_console_output() entries.append({ 'ts': datetime.now().isoformat(timespec='seconds'), 'type': entry_type, 'text': text, }) if len(entries) > CONSOLE_OUTPUT_MAX_ENTRIES: entries = entries[-CONSOLE_OUTPUT_MAX_ENTRIES:] if _save_console_output(entries): return jsonify({'success': True}), 200 return jsonify({'success': False, 'error': 'Failed to save'}), 500 except Exception as e: logger.error(f"Error adding console output: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/console/output', methods=['DELETE']) def clear_console_output(): """Clear persisted console output transcript.""" try: if _save_console_output([]): return jsonify({'success': True, 'message': 'Output cleared'}), 200 return jsonify({'success': False, 'error': 'Failed to clear'}), 500 except Exception as e: logger.error(f"Error clearing console output: {e}") return jsonify({'success': False, 'error': str(e)}), 500 # ============================================================================= # Backup Endpoints # ============================================================================= @api_bp.route('/backup/list', methods=['GET']) def list_backups(): """List available database backups.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 backup_dir = Path(config.MC_CONFIG_DIR) / 'backups' backups = db.list_backups(backup_dir) # Format sizes for display for b in backups: size = b.get('size_bytes', 0) if size >= 1024 * 1024: b['size_display'] = f"{size / (1024*1024):.1f} MB" elif size >= 1024: b['size_display'] = f"{size / 1024:.1f} KB" else: b['size_display'] = f"{size} B" return jsonify({ 'success': True, 'backups': backups, 'auto_backup_enabled': config.MC_BACKUP_ENABLED, 'backup_hour': config.MC_BACKUP_HOUR, 'retention_days': config.MC_BACKUP_RETENTION_DAYS, }), 200 except Exception as e: logger.error(f"Error listing backups: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/backup/create', methods=['POST']) def create_backup(): """Create an immediate database backup.""" try: db = _get_db() if not db: return jsonify({'success': False, 'error': 'Database not available'}), 503 backup_dir = Path(config.MC_CONFIG_DIR) / 'backups' backup_path = db.create_backup(backup_dir) return jsonify({ 'success': True, 'message': 'Backup created', 'filename': backup_path.name, 'size_bytes': backup_path.stat().st_size, }), 200 except Exception as e: logger.error(f"Error creating backup: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @api_bp.route('/backup/download', methods=['GET']) def download_backup(): """Download a backup file.""" filename = request.args.get('file', '') if not filename: return jsonify({'success': False, 'error': 'Missing file parameter'}), 400 # Security: prevent path traversal if '/' in filename or '\\' in filename or '..' in filename: return jsonify({'success': False, 'error': 'Invalid filename'}), 400 backup_dir = Path(config.MC_CONFIG_DIR) / 'backups' backup_path = backup_dir / filename if not backup_path.exists(): return jsonify({'success': False, 'error': 'Backup not found'}), 404 return send_file( str(backup_path), mimetype='application/x-sqlite3', as_attachment=True, download_name=filename ) # ============================================================================= # System Log API # ============================================================================= @api_bp.route('/logs', methods=['GET']) def get_logs_api(): """ Get log entries from in-memory ring buffer. Query parameters: level: Minimum log level (DEBUG, INFO, WARNING, ERROR) logger: Logger name prefix filter search: Text search in message (case-insensitive) limit: Max entries to return (default 500) Returns: JSON with log entries and available loggers """ try: log_handler = getattr(current_app, 'log_handler', None) if not log_handler: return jsonify({'success': False, 'error': 'Log handler not available'}), 500 level = request.args.get('level') logger_filter = request.args.get('logger') search = request.args.get('search') limit = request.args.get('limit', 500, type=int) entries = log_handler.get_entries( level=level, logger_filter=logger_filter, search=search, limit=limit ) return jsonify({ 'success': True, 'entries': entries, 'count': len(entries), 'loggers': log_handler.get_loggers(), }), 200 except Exception as e: logger.error(f"Error getting logs: {e}") return jsonify({'success': False, 'error': str(e)}), 500