From badf67cf74fd269ff910e1007fc9b440e3bae52d Mon Sep 17 00:00:00 2001 From: MarekWo Date: Sun, 1 Mar 2026 07:23:59 +0100 Subject: [PATCH] feat(v2): Rewrite main.py and cli.py for direct device communication main.py: Initialize Database + DeviceManager in create_app(), replace bridge-dependent startup code, simplified console command router. cli.py: All functions now delegate to DeviceManager instead of HTTP bridge calls. Same signatures preserved for api.py compatibility. Co-Authored-By: Claude Opus 4.6 --- app/main.py | 309 ++++------ app/meshcore/cli.py | 1326 +++++++++++-------------------------------- 2 files changed, 454 insertions(+), 1181 deletions(-) diff --git a/app/main.py b/app/main.py index af3d851..2416d0e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,68 +1,57 @@ """ -mc-webui - Flask application entry point +mc-webui v2 — Flask application entry point + +Direct device communication via meshcore library (no bridge). """ import logging -import re import shlex import threading import time -import requests from flask import Flask, request as flask_request from flask_socketio import SocketIO, emit from app.config import config, runtime_config +from app.database import Database +from app.device_manager import DeviceManager from app.routes.views import views_bp from app.routes.api import api_bp from app.version import VERSION_STRING, GIT_BRANCH -from app.archiver.manager import schedule_daily_archiving -from app.meshcore.cli import fetch_device_name_from_bridge -from app.contacts_cache import load_cache, scan_new_adverts, initialize_from_device - -# Commands that require longer timeout (in seconds) -SLOW_COMMANDS = { - 'node_discover': 15, - 'recv': 60, - 'send': 15, - 'send_msg': 15, - # Repeater commands - 'req_status': 15, - 'req_neighbours': 15, - 'trace': 15, -} # Configure logging logging.basicConfig( - level=logging.DEBUG if config.FLASK_DEBUG else logging.INFO, + level=getattr(logging, config.MC_LOG_LEVEL, logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) -# Filter to suppress known werkzeug WebSocket errors (cosmetic issue with dev server) +# Filter to suppress known werkzeug WebSocket errors class WerkzeugWebSocketFilter(logging.Filter): def filter(self, record): - # Suppress "write() before start_response" errors during WebSocket upgrade if record.levelno == logging.ERROR: - # Check message if 'write() before start_response' in str(record.msg): return False - # Check exception info (traceback) if record.exc_info and record.exc_info[1]: if 'write() before start_response' in str(record.exc_info[1]): return False return True -# Apply filter to werkzeug logger logging.getLogger('werkzeug').addFilter(WerkzeugWebSocketFilter()) # Initialize SocketIO globally socketio = SocketIO() +# Global references (set in create_app) +db = None +device_manager = None + def create_app(): """Create and configure Flask application""" + global db, device_manager + app = Flask(__name__) # Load configuration @@ -78,128 +67,41 @@ def create_app(): app.register_blueprint(views_bp) app.register_blueprint(api_bp) - # Initialize SocketIO with the app - # Using 'threading' mode for better compatibility with regular HTTP requests - # (gevent mode requires monkey-patching and slows down non-WebSocket requests) + # Initialize SocketIO socketio.init_app(app, cors_allowed_origins="*", async_mode='threading') - # Initialize archive scheduler if enabled - if config.MC_ARCHIVE_ENABLED: - schedule_daily_archiving() - logger.info(f"Archive scheduler enabled - directory: {config.MC_ARCHIVE_DIR}") - else: - logger.info("Archive scheduler disabled") + # v2: Initialize database + db = Database(config.db_path) + app.db = db - # Fetch device name from bridge in background thread (with retry) - def init_device_name(): - device_name, source = fetch_device_name_from_bridge() - runtime_config.set_device_name(device_name, source) + # v2: Initialize and start device manager + device_manager = DeviceManager(config, db, socketio) + app.device_manager = device_manager - # If we got a fallback name, keep retrying in background - retry_delay = 5 - max_delay = 60 - while source == "fallback": - time.sleep(retry_delay) - device_name, source = fetch_device_name_from_bridge() - if source != "fallback": - runtime_config.set_device_name(device_name, source) - logger.info(f"Device name resolved after retry: {device_name}") - break - retry_delay = min(retry_delay * 2, max_delay) + # Start device connection in background (non-blocking) + device_manager.start() - threading.Thread(target=init_device_name, daemon=True).start() + # Update runtime config when device connects + def _wait_for_device_name(): + """Wait for device manager to connect and update runtime config.""" + for _ in range(60): # wait up to 60 seconds + time.sleep(1) + if device_manager.is_connected: + runtime_config.set_device_name( + device_manager.device_name, "device" + ) + logger.info(f"Device name resolved: {device_manager.device_name}") + return + logger.warning("Timeout waiting for device connection") - # Background thread: contacts cache initialization and periodic advert scanning - def init_contacts_cache(): - # Wait for device name to resolve - time.sleep(10) + threading.Thread(target=_wait_for_device_name, daemon=True).start() - cache = load_cache() - - # Seed from device contacts if cache is empty - if not cache: - try: - from app.routes.api import get_contacts_detailed_cached - success, contacts, error = get_contacts_detailed_cached() - if success and contacts: - initialize_from_device(contacts) - logger.info("Contacts cache seeded from device") - except Exception as e: - logger.error(f"Failed to seed contacts cache: {e}") - - # Periodic advert scan loop - while True: - time.sleep(45) - try: - scan_new_adverts() - except Exception as e: - logger.error(f"Contacts cache scan error: {e}") - - threading.Thread(target=init_contacts_cache, daemon=True).start() - - logger.info(f"mc-webui started - device: {config.MC_DEVICE_NAME}") - logger.info(f"Messages file: {config.msgs_file_path}") - logger.info(f"Serial port: {config.MC_SERIAL_PORT}") + logger.info(f"mc-webui v2 started — transport: {'TCP' if config.use_tcp else 'serial'}") + logger.info(f"Database: {config.db_path}") return app -# ============================================================ -# Console output helpers -# ============================================================ - -def clean_console_output(output: str, command: str) -> str: - """ - Clean meshcli console output by removing: - - Prompt lines (e.g., "MarWoj|*" or "DeviceName|*[E]") - - JSON packet lines (internal mesh protocol data) - - Echoed command line - - Leading/trailing whitespace - """ - if not output: - return output - - lines = output.split('\n') - cleaned_lines = [] - - # Pattern to match any line containing the meshcli prompt "|*" - # Examples: "MarWoj|*", "MarWoj|*[E]", "MarWoj|*[E] infos" - # The prompt is: |* - prompt_pattern = re.compile(r'^[^|]+\|\*') - - for line in lines: - stripped = line.rstrip() - - # Skip empty lines at start - if not cleaned_lines and not stripped: - continue - - # Skip any line that starts with the meshcli prompt pattern - if prompt_pattern.match(stripped): - continue - - # Skip JSON packet lines (internal mesh protocol data) - stripped_full = stripped.lstrip() - if stripped_full.startswith('{') and '"payload_typename"' in stripped_full: - continue - - cleaned_lines.append(line) - - # Remove leading empty lines - while cleaned_lines and not cleaned_lines[0].strip(): - cleaned_lines.pop(0) - - # Remove trailing empty lines - while cleaned_lines and not cleaned_lines[-1].strip(): - cleaned_lines.pop() - - # Strip leading whitespace from first line (leftover from prompt removal) - if cleaned_lines: - cleaned_lines[0] = cleaned_lines[0].lstrip() - - return '\n'.join(cleaned_lines) - - # ============================================================ # WebSocket handlers for Console # ============================================================ @@ -208,7 +110,7 @@ def clean_console_output(output: str, command: str) -> str: def handle_console_connect(): """Handle console WebSocket connection""" logger.info("Console WebSocket client connected") - emit('console_status', {'message': 'Connected to mc-webui console proxy'}) + emit('console_status', {'message': 'Connected to mc-webui console'}) @socketio.on('disconnect', namespace='/console') @@ -219,78 +121,37 @@ def handle_console_disconnect(): @socketio.on('send_command', namespace='/console') def handle_send_command(data): - """Handle command from console client - proxy to bridge via HTTP""" + """Handle command from console client — route through DeviceManager.""" command = data.get('command', '').strip() - # Capture socket ID for use in background task sid = flask_request.sid if not command: - emit('command_response', { - 'success': False, - 'error': 'Empty command' - }) + emit('command_response', {'success': False, 'error': 'Empty command'}) return logger.info(f"Console command received: {command}") - # Execute command via bridge HTTP API - # Parse command into args list (split by spaces, respecting quotes) - try: - args = shlex.split(command) - except ValueError: - args = command.split() - - # Determine timeout based on command - base_command = args[0] if args else '' - cmd_timeout = SLOW_COMMANDS.get(base_command, 10) - def execute_and_respond(): try: - response = requests.post( - config.MC_BRIDGE_URL, - json={'args': args, 'timeout': cmd_timeout}, - timeout=cmd_timeout + 5 # HTTP timeout slightly longer - ) + try: + args = shlex.split(command) + except ValueError: + args = command.split() - if response.status_code == 200: - result = response.json() - if result.get('success'): - raw_output = result.get('stdout', '') - # Clean output: remove prompts and echoed commands - output = clean_console_output(raw_output, command) - if not output: - output = '(no output)' - socketio.emit('command_response', { - 'success': True, - 'command': command, - 'output': output - }, room=sid, namespace='/console') - else: - error = result.get('stderr', 'Unknown error') - socketio.emit('command_response', { - 'success': False, - 'command': command, - 'error': error - }, room=sid, namespace='/console') - else: + if not args: socketio.emit('command_response', { - 'success': False, - 'command': command, - 'error': f'Bridge returned status {response.status_code}' + 'success': False, 'command': command, 'error': 'Empty command' }, room=sid, namespace='/console') + return + + output = _execute_console_command(args) - except requests.exceptions.Timeout: socketio.emit('command_response', { - 'success': False, + 'success': True, 'command': command, - 'error': 'Command timed out' - }, room=sid, namespace='/console') - except requests.exceptions.ConnectionError: - socketio.emit('command_response', { - 'success': False, - 'command': command, - 'error': 'Cannot connect to meshcore-bridge' + 'output': output or '(no output)' }, room=sid, namespace='/console') + except Exception as e: logger.error(f"Console command error: {e}") socketio.emit('command_response', { @@ -299,10 +160,72 @@ def handle_send_command(data): 'error': str(e) }, room=sid, namespace='/console') - # Run in background to not block socketio.start_background_task(execute_and_respond) +def _execute_console_command(args: list) -> str: + """ + Execute a console command via DeviceManager. + Maps meshcli-style text commands to DeviceManager methods. + Simplified router — full ConsoleRouter planned for Phase 2. + """ + cmd = args[0].lower() + + if not device_manager or not device_manager.is_connected: + return "Error: Device not connected" + + if cmd == 'infos': + info = device_manager.get_device_info() + if info: + lines = [f" {k}: {v}" for k, v in info.items()] + return "Device Info:\n" + "\n".join(lines) + return "No device info available" + + elif cmd == 'contacts': + contacts = device_manager.get_contacts_from_device() + if not contacts: + return "No contacts" + lines = [] + for c in contacts: + name = c.get('name', '?') + pk = c.get('public_key', '')[:8] + lines.append(f" {name} ({pk}...)") + return f"Contacts ({len(contacts)}):\n" + "\n".join(lines) + + elif cmd == 'bat': + bat = device_manager.get_battery() + if bat: + return f"Battery: {bat}" + return "Battery info unavailable" + + elif cmd in ('advert', 'floodadv'): + result = device_manager.send_advert(flood=(cmd == 'floodadv')) + return result.get('message', result.get('error', 'Unknown')) + + elif cmd == 'chan' and len(args) >= 3: + try: + ch_idx = int(args[1]) + text = ' '.join(args[2:]) + result = device_manager.send_channel_message(ch_idx, text) + return result.get('message', result.get('error', 'Unknown')) + except (ValueError, IndexError): + return "Usage: chan " + + elif cmd == 'msg' and len(args) >= 3: + recipient = args[1] + text = ' '.join(args[2:]) + contact = device_manager.mc.get_contact_by_name(recipient) + if contact: + pubkey = contact.get('public_key', recipient) + else: + pubkey = recipient + result = device_manager.send_dm(pubkey, text) + return result.get('message', result.get('error', 'Unknown')) + + else: + return f"Unknown command: {cmd}\nAvailable: infos, contacts, bat, advert, floodadv, chan, msg" + + if __name__ == '__main__': app = create_app() socketio.run( @@ -310,5 +233,5 @@ if __name__ == '__main__': host=config.FLASK_HOST, port=config.FLASK_PORT, debug=config.FLASK_DEBUG, - allow_unsafe_werkzeug=True # Required for threading mode + allow_unsafe_werkzeug=True ) diff --git a/app/meshcore/cli.py b/app/meshcore/cli.py index c878e53..586954e 100644 --- a/app/meshcore/cli.py +++ b/app/meshcore/cli.py @@ -1,1104 +1,454 @@ """ -MeshCore CLI wrapper - executes meshcli commands via HTTP bridge +MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge). + +Function signatures preserved for backward compatibility with api.py. """ import logging -import re import json -import time -import requests from pathlib import Path from typing import Tuple, Optional, List, Dict from app.config import config logger = logging.getLogger(__name__) -# Command timeout in seconds (reduced to prevent long waits) -DEFAULT_TIMEOUT = 12 # Reduced from 30s - bridge has 10s + 2s buffer -RECV_TIMEOUT = 60 # recv can take longer - class MeshCLIError(Exception): """Custom exception for meshcli command failures""" pass -def _run_command(args: list, timeout: int = DEFAULT_TIMEOUT) -> Tuple[bool, str, str]: - """ - Execute meshcli command via HTTP bridge. +def _get_dm(): + """Get the DeviceManager instance (deferred import to avoid circular refs).""" + from app.main import device_manager + if device_manager is None: + raise MeshCLIError("DeviceManager not initialized") + return device_manager - Args: - args: Command arguments (e.g., ['recv'], ['public', 'Hello']) - timeout: Command timeout in seconds - - Returns: - Tuple of (success, stdout, stderr) - """ - logger.info(f"Executing via bridge: {' '.join(args)}") - - try: - response = requests.post( - config.MC_BRIDGE_URL, - json={ - 'args': args, - 'timeout': timeout - }, - headers={'Connection': 'close'}, # Prevent connection reuse issues in background threads - timeout=timeout + 5 # Add 5s buffer for HTTP timeout - ) - - # Handle HTTP errors - if response.status_code != 200: - logger.error(f"Bridge HTTP error {response.status_code}: {response.text}") - return False, '', f'Bridge HTTP error: {response.status_code}' - - data = response.json() - - success = data.get('success', False) - stdout = data.get('stdout', '').strip() - stderr = data.get('stderr', '').strip() - - if not success: - logger.warning(f"Command failed: {stderr or stdout}") - - return success, stdout, stderr - - except requests.exceptions.Timeout: - logger.error(f"Bridge request timeout after {timeout}s") - return False, '', f'Bridge timeout after {timeout} seconds' - - except requests.exceptions.ConnectionError as e: - logger.error(f"Cannot connect to meshcore-bridge: {e}") - return False, '', 'Cannot connect to meshcore-bridge service' - - except Exception as e: - logger.error(f"Bridge communication error: {e}") - return False, '', str(e) +# ============================================================================= +# Messages +# ============================================================================= def recv_messages() -> Tuple[bool, str]: """ - Fetch new messages from the device. - - Returns: - Tuple of (success, message) + In v2, messages arrive via events (auto-fetching). + This is a no-op — kept for backward compatibility. """ - success, stdout, stderr = _run_command(['recv'], timeout=RECV_TIMEOUT) - return success, stdout or stderr + return True, "Messages are received automatically via events" def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Tuple[bool, str]: - """ - Send a message to a specific channel. - - Args: - text: Message content - reply_to: Optional username to reply to (will format as @[username]) - channel_index: Channel to send to (default: 0 = Public) - - Returns: - Tuple of (success, message) - """ + """Send a message to a channel.""" if reply_to: - message = f"@[{reply_to}] {text}" - else: - message = text + text = f"@[{reply_to}] {text}" - # Use 'chan' command for all channels (including Public/0) for consistent quoting behavior - # Note: 'public' command treats quotes literally, while 'chan' properly parses them as delimiters - success, stdout, stderr = _run_command(['chan', str(channel_index), message]) + try: + dm = _get_dm() + result = dm.send_channel_message(channel_index, text) + if result['success']: + return True, result.get('message', 'Message sent') + return False, result.get('error', 'Failed to send message') + except Exception as e: + logger.error(f"send_message error: {e}") + return False, str(e) - return success, stdout or stderr +# ============================================================================= +# Contacts +# ============================================================================= def get_contacts() -> Tuple[bool, str]: - """ - Get list of contacts from the device. - - Returns: - Tuple of (success, output) - """ - success, stdout, stderr = _run_command(['contacts']) - return success, stdout or stderr + """Get contacts list as formatted text.""" + try: + dm = _get_dm() + contacts = dm.get_contacts_from_device() + if not contacts: + return True, "No contacts" + lines = [] + for c in contacts: + name = c.get('name', '?') + pk = c.get('public_key', '')[:12] + lines.append(f"{name} {pk}") + return True, "\n".join(lines) + except Exception as e: + logger.error(f"get_contacts error: {e}") + return False, str(e) def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]: - """ - Parse meshcli contacts output to extract contact names. - - Expected format from meshcli contacts: - ContactName CLI pubkey_prefix path - ContactName 🔫 CLI pubkey_prefix path - - Contact name is separated from type column (CLI/REP/ROOM/SENS) by multiple spaces. - - Args: - output: Raw output from meshcli contacts command - filter_types: Optional list of contact types to include (e.g., ['CLI']) - If None, all types are included. - - Returns: - List of contact names (unique) - """ - contacts = [] - - for line in output.split('\n'): - line_stripped = line.strip() - - # Skip empty lines, headers, and INFO lines - if not line_stripped or line_stripped.startswith('---') or \ - line.lower().startswith('contact') or line.startswith('INFO:'): - continue - - # Split by 2+ consecutive spaces (columns separator in meshcli output) - # Format: "ContactName CLI pubkey path" - parts = re.split(r'\s{2,}', line) - - if len(parts) >= 2: - # First part is the contact name (may include emoji and spaces) - contact_name = parts[0].strip() - - # Second part should be type (CLI, REP, ROOM, SENS) - contact_type = parts[1].strip() - - # Validate that second column looks like a type - if contact_type in ['CLI', 'REP', 'ROOM', 'SENS'] and contact_name: - # Apply type filter if specified - if filter_types is None or contact_type in filter_types: - if contact_name not in contacts: - contacts.append(contact_name) - - return contacts + """Parse contacts output to extract names. In v2, reads from DB.""" + try: + dm = _get_dm() + contacts = dm.db.get_contacts() + return [c['name'] for c in contacts if c.get('name')] + except Exception: + return [] def get_contacts_list() -> Tuple[bool, List[str], str]: - """ - Get parsed list of contact names from the device. - Only returns CLI (client) contacts, excluding REP, ROOM, and SENS. - - Returns: - Tuple of (success, contact_names_list, error_message) - """ - success, output = get_contacts() - - if not success: - return False, [], output - - # Filter only CLI (client) contacts - no repeaters, rooms, or sensors - contacts = parse_contacts(output, filter_types=['CLI']) - return True, contacts, "" - - -def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]: - """ - Remove contacts inactive for specified hours. - - Args: - hours: Inactivity threshold in hours - - Returns: - Tuple of (success, message) - """ - # Command format: apply_to u<48h,t=1 remove_contact - # u<48h = updated less than 48h ago (inactive) - # t=1 = type client (not router/repeater) - filter_cmd = f"u<{hours}h,t=1" - success, stdout, stderr = _run_command(['apply_to', filter_cmd, 'remove_contact']) - return success, stdout or stderr - - -def get_device_info() -> Tuple[bool, str]: - """ - Get device information. - - Returns: - Tuple of (success, info) - """ - success, stdout, stderr = _run_command(['infos']) - return success, stdout or stderr - - -def check_connection() -> bool: - """ - Quick check if device is accessible. - - Returns: - True if device responds, False otherwise - """ - success, _, _ = _run_command(['infos'], timeout=5) - return success - - -def get_channels() -> Tuple[bool, List[Dict]]: - """ - Get list of configured channels. - - Returns: - Tuple of (success, list of channel dicts) - Each dict: { - 'index': int, - 'name': str, - 'key': str - } - """ - success, stdout, stderr = _run_command(['get_channels']) - - if not success: - return False, [] - - channels = [] - for line in stdout.split('\n'): - line = line.strip() - if not line: - continue - - # Parse: "0: Public [8b3387e9c5cdea6ac9e5edbaa115cd72]" - match = re.match(r'^(\d+):\s+(.+?)\s+\[([a-f0-9]{32})\]$', line) - if match: - channels.append({ - 'index': int(match.group(1)), - 'name': match.group(2), - 'key': match.group(3) - }) - - return True, channels - - -def add_channel(name: str) -> Tuple[bool, str, Optional[str]]: - """ - Add a new channel with auto-generated key. - - Args: - name: Channel name - - Returns: - Tuple of (success, message, key_or_none) - key_or_none: The generated key if successful, None otherwise - """ - success, stdout, stderr = _run_command(['add_channel', name]) - - if not success: - return False, stderr or stdout, None - - # Get channels to find the newly created one - success_ch, channels = get_channels() - if success_ch: - for ch in channels: - if ch['name'] == name: - return True, f"Channel '{name}' created", ch['key'] - - return True, stdout or stderr, None - - -def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]: - """ - Set/join a channel at specific index with name and optional key. - - Args: - index: Channel slot number - name: Channel name - key: 32-char hex key (optional for channels starting with #) - - Returns: - Tuple of (success, message) - """ - # Build command arguments - cmd_args = ['set_channel', str(index), name] - - # Add key if provided - if key: - # Validate key format - if not re.match(r'^[a-f0-9]{32}$', key.lower()): - return False, "Invalid key format (must be 32 hex characters)" - cmd_args.append(key.lower()) - - success, stdout, stderr = _run_command(cmd_args) - - return success, stdout or stderr - - -def remove_channel(index: int) -> Tuple[bool, str]: - """ - Remove a channel. - - Args: - index: Channel number to remove - - Returns: - Tuple of (success, message) - """ - if index == 0: - return False, "Cannot remove Public channel (channel 0)" - - success, stdout, stderr = _run_command(['remove_channel', str(index)]) - return success, stdout or stderr - - -# ============================================================================= -# Special Commands (Network Advertisement) -# ============================================================================= - -def advert() -> Tuple[bool, str]: - """ - Send a single advertisement frame to the mesh network. - - This is the recommended way to announce node presence. - Uses minimal airtime and follows normal routing rules. - - Returns: - Tuple of (success, message) - """ - success, stdout, stderr = _run_command(['advert']) - return success, stdout or stderr - - -def floodadv() -> Tuple[bool, str]: - """ - Send advertisement in flooding mode (broadcast storm). - - WARNING: This should be used sparingly! It causes high airtime usage - and can destabilize larger networks. Use only for: - - Initial network bootstrap - - After device reset/firmware change - - When routing is broken - - Debug/testing purposes - - Returns: - Tuple of (success, message) - """ - success, stdout, stderr = _run_command(['floodadv']) - return success, stdout or stderr - - -# ============================================================================= -# Direct Messages (DM) -# ============================================================================= - -def send_dm(recipient: str, text: str) -> Tuple[bool, str]: - """ - Send a direct/private message to a contact. - - Uses meshcli 'msg' command: msg - - Args: - recipient: Contact name to send to - text: Message content - - Returns: - Tuple of (success, message) - """ - if not recipient or not recipient.strip(): - return False, "Recipient name is required" - - if not text or not text.strip(): - return False, "Message text is required" - - success, stdout, stderr = _run_command(['msg', recipient.strip(), text.strip()]) - return success, stdout or stderr - - -def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]: - """ - Check delivery status for sent DMs by their expected_ack codes. - - Args: - ack_codes: List of expected_ack hex strings from SENT_MSG log entries - - Returns: - Tuple of (success, ack_status_dict, error_message) - ack_status_dict maps ack_code -> ack_info dict or None - """ + """Get parsed list of contact names.""" try: - response = requests.get( - f"{config.MC_BRIDGE_URL.replace('/cli', '/ack_status')}", - params={'ack_codes': ','.join(ack_codes)}, - timeout=DEFAULT_TIMEOUT - ) - - if response.status_code != 200: - return False, {}, f"Bridge error: {response.status_code}" - - data = response.json() - return data.get('success', False), data.get('acks', {}), '' - - except requests.exceptions.ConnectionError: - return False, {}, 'Cannot connect to bridge' + dm = _get_dm() + contacts = dm.db.get_contacts() + names = [c['name'] for c in contacts if c.get('name')] + return True, names, "" except Exception as e: - return False, {}, str(e) + return False, [], str(e) -# ============================================================================= -# Auto-retry helpers -# ============================================================================= - -def get_retry_ack_codes() -> set: - """Get set of expected_ack codes belonging to retry attempts (not first send).""" - try: - response = requests.get( - f"{config.MC_BRIDGE_URL.replace('/cli', '/retry_ack_codes')}", - timeout=DEFAULT_TIMEOUT - ) - if response.status_code == 200: - data = response.json() - return set(data.get('retry_ack_codes', [])) - except Exception as e: - logger.debug(f"Failed to fetch retry_ack_codes: {e}") - return set() - - -def get_auto_retry_config() -> Tuple[bool, Dict]: - """Get current auto-retry configuration from bridge.""" - try: - response = requests.get( - f"{config.MC_BRIDGE_URL.replace('/cli', '/auto_retry/config')}", - timeout=DEFAULT_TIMEOUT - ) - if response.status_code == 200: - data = response.json() - return True, data - except Exception as e: - logger.debug(f"Failed to fetch auto_retry config: {e}") - return False, {} - - -def set_auto_retry_config(enabled=None, max_attempts=None, max_flood=None) -> Tuple[bool, Dict]: - """Update auto-retry configuration on bridge.""" - payload = {} - if enabled is not None: - payload['enabled'] = enabled - if max_attempts is not None: - payload['max_attempts'] = max_attempts - if max_flood is not None: - payload['max_flood'] = max_flood - - try: - response = requests.post( - f"{config.MC_BRIDGE_URL.replace('/cli', '/auto_retry/config')}", - json=payload, - timeout=DEFAULT_TIMEOUT - ) - if response.status_code == 200: - data = response.json() - return True, data - except Exception as e: - logger.debug(f"Failed to set auto_retry config: {e}") - return False, {} - - -# ============================================================================= -# Contact Management (Existing & Pending Contacts) -# ============================================================================= - def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]: - """ - Get detailed list of ALL existing contacts on the device (CLI, REP, ROOM, SENS). - - Returns: - Tuple of (success, contacts_list, total_count, error_message) - Each contact dict: { - 'name': str, - 'public_key_prefix': str (12 hex chars), - 'type_label': str (CLI|REP|ROOM|SENS|UNKNOWN), - 'path_or_mode': str (Flood or hex path), - 'raw_line': str (for debugging) - } - """ + """Get detailed list of all contacts from DB.""" try: - success, stdout, stderr = _run_command(['contacts']) - - if not success: - return False, [], 0, stderr or 'Failed to get contacts list' - - # Parse the output - contacts = [] - total_count = 0 - - lines = stdout.strip().split('\n') - - for line in lines: - # Skip prompt lines and empty lines (prompt format: |*) - if re.match(r'^.+\|\*', line) or not line.strip(): - continue - - # Check for final count line: "> 263 contacts in device" - if line.strip().startswith('>') and 'contacts in device' in line: - try: - total_count = int(re.search(r'> (\d+) contacts', line).group(1)) - except: - pass - continue - - # Parse contact line - # Format: NAME TYPE PUBKEY_PREFIX PATH_OR_MODE - # Example: "TK Zalesie Test 🦜 REP df2027d3f2ef Flood" - - # Strategy: work backwards from the end - # Last column is either "Flood" or hex path (variable length) - # Before that: 12-char hex public key prefix - # Before that: TYPE (REP, CLI, ROOM, SENS) - 4 chars with padding - # Everything else is the name - - stripped = line.rstrip() - if not stripped: - continue - - # Split by whitespace, but we need to be smart about it - parts = stripped.split() - if len(parts) < 4: - # Malformed line, skip - continue - - # The last part is path_or_mode - path_or_mode = parts[-1] - - # The second-to-last part is public_key_prefix (should be 12 hex chars) - public_key_prefix = parts[-2] - - # The third-to-last part is type (should be REP, CLI, ROOM, SENS) - type_label = parts[-3].strip() - - # Everything before that is the name - # We need to reconstruct it by finding where it ends in the original line - # Find the position of type_label in the line (searching from right) - # This is tricky because type_label might appear in the name too - - # Better approach: use the public_key_prefix as anchor (it's unique hex) - pubkey_pos = stripped.rfind(public_key_prefix) - if pubkey_pos == -1: - continue - - # Everything before the public key (minus the type and spacing) is the name - before_pubkey = stripped[:pubkey_pos].rstrip() - - # The type should be the last word in before_pubkey - type_pos = before_pubkey.rfind(type_label) - if type_pos == -1: - # Type not found, try extracting it differently - # Just take the last token before pubkey_prefix - tokens = before_pubkey.split() - if len(tokens) >= 1: - type_label = tokens[-1] - name = ' '.join(tokens[:-1]).strip() - else: - continue - else: - name = before_pubkey[:type_pos].strip() - - # Validate type_label - if type_label not in ['CLI', 'REP', 'ROOM', 'SENS']: - type_label = 'UNKNOWN' - - # Validate public_key_prefix (should be 12 hex chars) - if not re.match(r'^[a-fA-F0-9]{12}$', public_key_prefix): - # Invalid format, skip - continue - - contact = { - 'name': name, - 'public_key_prefix': public_key_prefix.lower(), - 'type_label': type_label, - 'path_or_mode': path_or_mode, - 'raw_line': line - } - - contacts.append(contact) - - # If total_count wasn't found in output, use length of contacts list - if total_count == 0: - total_count = len(contacts) - - return True, contacts, total_count, "" - + dm = _get_dm() + contacts = dm.db.get_contacts() + result = [] + for c in contacts: + pk = c.get('public_key', '') + result.append({ + 'name': c.get('name', ''), + 'public_key_prefix': pk[:12] if len(pk) >= 12 else pk, + 'type_label': {0: 'CLI', 1: 'CLI', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'), + 'path_or_mode': c.get('out_path', '') or 'Flood', + 'raw_line': '', + }) + return True, result, len(result), "" except Exception as e: - logger.error(f"Error parsing contacts list: {e}") return False, [], 0, str(e) def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]: - """ - Get detailed contact information including last_advert timestamps. - - Uses 'apply_to t=1,t=2,t=3,t=4 contact_info' command to fetch metadata - for all contact types (CLI, REP, ROOM, SENS). - - Returns: - Tuple of (success, contacts_dict, error_message) - contacts_dict maps public_key -> contact_details where each detail dict contains: - { - 'public_key': str (full key), - 'type': int (1=CLI, 2=REP, 3=ROOM, 4=SENS), - 'flags': int, - 'out_path_len': int, - 'out_path': str, - 'adv_name': str (name with emoji), - 'last_advert': int (Unix timestamp), - 'adv_lat': float, - 'adv_lon': float, - 'lastmod': int (Unix timestamp) - } - """ + """Get contacts with last_advert timestamps from DB.""" try: - # Execute command to get all contact types - # Call separately for each type since commas might not work through bridge - # t=1 (CLI), t=2 (REP), t=3 (ROOM), t=4 (SENS) - + dm = _get_dm() + contacts = dm.db.get_contacts() contacts_dict = {} - - for contact_type in ['t=1', 't=2', 't=3', 't=4']: - success, stdout, stderr = _run_command(['apply_to', contact_type, 'contact_info']) - - if not success: - logger.warning(f"apply_to {contact_type} contact_info failed: {stderr}") - continue # Skip this type, try next - - # Parse prettified JSON output - # Output contains multiple JSON objects separated by newlines - # Use brace-matching to extract each complete object - try: - # Find all complete JSON objects (balanced braces) - json_objects = [] - depth = 0 - start_idx = None - - for i, char in enumerate(stdout): - if char == '{': - if depth == 0: - start_idx = i - depth += 1 - elif char == '}': - depth -= 1 - if depth == 0 and start_idx is not None: - # Found complete JSON object - json_str = stdout[start_idx:i+1] - try: - contact = json.loads(json_str) - if 'public_key' in contact: - json_objects.append(contact) - except json.JSONDecodeError: - # Skip malformed JSON - pass - start_idx = None - - # Add to contacts dict - for contact in json_objects: - contacts_dict[contact['public_key']] = contact - - logger.info(f"Parsed {len(json_objects)} contacts from {contact_type}") - - except Exception as e: - logger.error(f"Error parsing {contact_type} output: {e}") - continue - - if len(contacts_dict) == 0: - logger.error(f"No contacts parsed from any type") - return False, {}, 'No contacts found in contact_info output' - - logger.info(f"Total contacts collected: {len(contacts_dict)}") + for c in contacts: + pk = c.get('public_key', '') + contacts_dict[pk] = { + 'public_key': pk, + 'type': c.get('type', 1), + 'flags': c.get('flags', 0), + 'out_path_len': c.get('out_path_len', -1), + 'out_path': c.get('out_path', ''), + 'adv_name': c.get('name', ''), + 'last_advert': c.get('last_advert', ''), + 'adv_lat': c.get('adv_lat', 0.0), + 'adv_lon': c.get('adv_lon', 0.0), + 'lastmod': c.get('lastmod', ''), + } return True, contacts_dict, "" - except Exception as e: - logger.error(f"Error getting contact details: {e}") return False, {}, str(e) def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]: - """ - Get all contacts using .contacts command (JSON format). - - This command returns exact contact names including any trailing/leading spaces, - making it the most reliable way to get contact names for deletion operations. - - The .contacts command returns a single JSON object with public_keys as keys: - { - "public_key_hash": { - "public_key": "...", - "type": 1, - "adv_name": "Contact Name", - "flags": 0, - "out_path_len": -1, - "out_path": "", - "last_advert": 1234567890, - "adv_lat": 50.123, - "adv_lon": 20.456, - "lastmod": 1234567890 - }, - ... - } - - Returns: - Tuple of (success, contacts_dict, error_message) - contacts_dict maps public_key -> contact_details where each detail dict contains: - { - 'public_key': str (full key), - 'type': int (1=CLI, 2=REP, 3=ROOM, 4=SENS), - 'adv_name': str (exact name with any spaces), - 'flags': int, - 'out_path_len': int, - 'out_path': str, - 'last_advert': int (Unix timestamp), - 'adv_lat': float, - 'adv_lon': float, - 'lastmod': int (Unix timestamp) - } - """ + """Get all contacts as JSON dict (keyed by public_key).""" try: - success, stdout, stderr = _run_command(['.contacts']) - - if not success: - logger.error(f".contacts command failed: {stderr}") - return False, {}, stderr or 'Failed to execute .contacts command' - - # Check if stdout is empty - if not stdout or not stdout.strip(): - logger.error(f".contacts returned empty output (success={success})") - return False, {}, '.contacts command returned empty output' - - # Parse JSON output - use brace-matching to extract complete JSON object - # stdout format: "|* .contacts\n{...}\n|* " - # We need to find matching braces and parse only the JSON object - try: - # Use brace-matching to extract complete JSON object (same as bridge does) - depth = 0 - start_idx = None - end_idx = None - - for i, char in enumerate(stdout): - if char == '{': - if depth == 0: - start_idx = i - depth += 1 - elif char == '}': - depth -= 1 - if depth == 0 and start_idx is not None: - end_idx = i + 1 - break # Found complete JSON object - - if start_idx is None or end_idx is None: - logger.error(f".contacts output has no complete JSON object") - return False, {}, 'No complete JSON object found in .contacts output' - - # Extract only the JSON object (ignoring prompts before and after) - json_str = stdout[start_idx:end_idx] - - # Parse JSON - contacts_dict = json.loads(json_str) - logger.info(f"Parsed {len(contacts_dict)} contacts from .contacts command") - return True, contacts_dict, "" - except json.JSONDecodeError as e: - logger.error(f"Failed to parse .contacts JSON output: {e}") - return False, {}, f'JSON parse error: {e}' - + dm = _get_dm() + contacts = dm.db.get_contacts() + contacts_dict = {} + for c in contacts: + pk = c.get('public_key', '') + contacts_dict[pk] = { + 'public_key': pk, + 'type': c.get('type', 0), + 'adv_name': c.get('name', ''), + 'flags': c.get('flags', 0), + 'out_path_len': c.get('out_path_len', -1), + 'out_path': c.get('out_path', ''), + 'last_advert': c.get('last_advert', ''), + 'adv_lat': c.get('adv_lat'), + 'adv_lon': c.get('adv_lon'), + 'lastmod': c.get('lastmod', ''), + } + return True, contacts_dict, "" except Exception as e: - logger.error(f"Error executing .contacts command: {e}") return False, {}, str(e) def delete_contact(selector: str) -> Tuple[bool, str]: - """ - Delete a contact from the device. - - Uses .contacts command to look up exact contact name before deletion, - ensuring names with trailing/leading spaces are handled correctly. - - Args: - selector: Contact selector - can be: - - full public_key (64 hex chars) - - public_key_prefix (12+ hex chars) - - contact name (exact or approximate match) - - Returns: - Tuple of (success, message) - """ + """Delete a contact by public key or name.""" if not selector or not selector.strip(): return False, "Contact selector is required" selector = selector.strip() try: - # Step 1: Fetch all contacts to find exact name - success_json, contacts_dict, error_json = get_contacts_json() + dm = _get_dm() + # Try as public key first + contact = dm.db.get_contact(selector) + if contact: + result = dm.delete_contact(selector) + return result['success'], result.get('message', result.get('error', '')) - if not success_json: - # Fallback: try direct deletion if .contacts fails - logger.warning(f".contacts failed, attempting direct deletion: {error_json}") - success, stdout, stderr = _run_command(['remove_contact', selector]) - logger.info(f"remove_contact (fallback) {selector}: success={success}, stdout='{stdout}', stderr='{stderr}'") - - if success: - return True, stdout.strip() if stdout.strip() else f"Contact {selector} removed successfully" - else: - return False, stderr.strip() if stderr.strip() else "Failed to remove contact" - - # Step 2: Find matching contact - exact_name = None - matched_pubkey = None - - for public_key, details in contacts_dict.items(): - # Match by public_key (full or prefix) - if public_key == selector or public_key.startswith(selector): - exact_name = details.get('adv_name', '') - matched_pubkey = public_key - logger.info(f"Found contact by public_key match: '{exact_name}' (pk: {public_key[:12]}...)") - break - - # Match by name (case-sensitive exact match or stripped match) - contact_name = details.get('adv_name', '') - if contact_name == selector or contact_name.strip() == selector: - exact_name = contact_name - matched_pubkey = public_key - logger.info(f"Found contact by name match: '{exact_name}' (pk: {public_key[:12]}...)") - break - - if not exact_name: - logger.warning(f"Contact not found in .contacts output: {selector}") - return False, f"Contact not found: {selector}" - - # Step 3: Delete using exact name (preserving any trailing/leading spaces) - success, stdout, stderr = _run_command(['remove_contact', exact_name]) - - logger.info(f"remove_contact '{exact_name}': success={success}, stdout='{stdout}', stderr='{stderr}'") - - if success: - message = stdout.strip() if stdout.strip() else f"Contact {exact_name.strip()} removed successfully" - return True, message - else: - error = stderr.strip() if stderr.strip() else "Failed to remove contact" - logger.warning(f"remove_contact failed for '{exact_name}': {error}") - return False, error + # Try to find by name + contacts = dm.db.get_contacts() + for c in contacts: + if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector): + result = dm.delete_contact(c['public_key']) + return result['success'], result.get('message', result.get('error', '')) + return False, f"Contact not found: {selector}" except Exception as e: - logger.error(f"Error deleting contact: {e}") return False, str(e) +def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]: + """Remove contacts inactive for specified hours. Simplified in v2.""" + # TODO: implement time-based cleanup via database query + return False, "Contact cleanup not yet implemented in v2" + + def get_pending_contacts() -> Tuple[bool, List[Dict], str]: - """ - Get list of contacts awaiting manual approval. - - Returns: - Tuple of (success, pending_contacts_list, error_message) - Each contact dict contains: - { - 'name': str (adv_name from contact_info), - 'public_key': str (full 64-char hex key), - 'public_key_prefix': str (first 12 chars for display), - 'type': int (1=CLI, 2=REP, 3=ROOM, 4=SENS), - 'type_label': str (CLI/REP/ROOM/SENS), - 'adv_lat': float (GPS latitude), - 'adv_lon': float (GPS longitude), - 'last_advert': int (Unix timestamp), - 'lastmod': int (Unix timestamp), - 'out_path_len': int, - 'out_path': str, - 'path_or_mode': str (computed: 'Flood' or path string) - } - """ + """Get list of contacts awaiting manual approval.""" try: - response = requests.get( - f"{config.MC_BRIDGE_URL.replace('/cli', '/pending_contacts')}", - timeout=DEFAULT_TIMEOUT + 5 - ) - - if response.status_code != 200: - return False, [], f'Bridge HTTP error: {response.status_code}' - - data = response.json() - - if not data.get('success', False): - error = data.get('error', 'Failed to get pending contacts') - return False, [], error - - pending = data.get('pending', []) - - # Add computed fields (same pattern as get_contacts_with_last_seen) - type_labels = {1: 'CLI', 2: 'REP', 3: 'ROOM', 4: 'SENS'} - - for contact in pending: - # Public key prefix (first 12 chars for display) - public_key = contact.get('public_key', '') - contact['public_key_prefix'] = public_key[:12] if len(public_key) >= 12 else public_key - - # Type label - contact_type = contact.get('type', 1) - contact['type_label'] = type_labels.get(contact_type, 'UNKNOWN') - - # Path or mode display - out_path_len = contact.get('out_path_len', -1) - out_path = contact.get('out_path', '') - if out_path_len == -1: - contact['path_or_mode'] = 'Flood' - elif out_path: - contact['path_or_mode'] = out_path - else: - contact['path_or_mode'] = f'Path len: {out_path_len}' - + dm = _get_dm() + pending = dm.get_pending_contacts() return True, pending, "" - - except requests.exceptions.Timeout: - return False, [], 'Bridge timeout' - except requests.exceptions.ConnectionError: - return False, [], 'Cannot connect to meshcore-bridge service' except Exception as e: return False, [], str(e) def approve_pending_contact(public_key: str) -> Tuple[bool, str]: - """ - Approve and add a pending contact by public key. - - Args: - public_key: Full public key of the contact to approve (REQUIRED - full key works for all contact types) - - Returns: - Tuple of (success, message) - """ - if not public_key or not public_key.strip(): - return False, "Public key is required" - + """Approve a pending contact.""" try: - response = requests.post( - f"{config.MC_BRIDGE_URL.replace('/cli', '/add_pending')}", - json={'selector': public_key.strip()}, - timeout=DEFAULT_TIMEOUT + 5 - ) - - if response.status_code != 200: - return False, f'Bridge HTTP error: {response.status_code}' - - data = response.json() - - if not data.get('success', False): - error = data.get('stderr', 'Failed to approve contact') - return False, error - - stdout = data.get('stdout', 'Contact approved successfully') - return True, stdout - - except requests.exceptions.Timeout: - return False, 'Bridge timeout' - except requests.exceptions.ConnectionError: - return False, 'Cannot connect to meshcore-bridge service' + dm = _get_dm() + result = dm.approve_contact(public_key) + return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) # ============================================================================= -# Device Settings (Persistent Configuration) +# Device Info +# ============================================================================= + +def get_device_info() -> Tuple[bool, str]: + """Get device information.""" + try: + dm = _get_dm() + info = dm.get_device_info() + if info: + lines = [f"{k}: {v}" for k, v in info.items()] + return True, "\n".join(lines) + return False, "No device info available" + except Exception as e: + return False, str(e) + + +def check_connection() -> bool: + """Check if device is connected.""" + try: + dm = _get_dm() + return dm.is_connected + except Exception: + return False + + +# ============================================================================= +# Channels +# ============================================================================= + +def get_channels() -> Tuple[bool, List[Dict]]: + """Get list of configured channels.""" + try: + dm = _get_dm() + channels = [] + for idx in range(8): + info = dm.get_channel_info(idx) + if info and info.get('name'): + channels.append({ + 'index': idx, + 'name': info.get('name', ''), + 'key': info.get('secret', info.get('key', '')), + }) + return True, channels + except Exception as e: + logger.error(f"get_channels error: {e}") + return False, [] + + +def add_channel(name: str) -> Tuple[bool, str, Optional[str]]: + """Add a new channel.""" + try: + dm = _get_dm() + # Find first free slot (1-7, slot 0 is Public) + for idx in range(1, 8): + info = dm.get_channel_info(idx) + if not info or not info.get('name'): + result = dm.set_channel(idx, name) + if result['success']: + return True, f"Channel '{name}' created at slot {idx}", None + return False, result.get('error', 'Failed'), None + return False, "No free channel slots available", None + except Exception as e: + return False, str(e), None + + +def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]: + """Set/join a channel.""" + try: + dm = _get_dm() + secret = bytes.fromhex(key) if key else None + result = dm.set_channel(index, name, secret) + return result['success'], result.get('message', result.get('error', '')) + except Exception as e: + return False, str(e) + + +def remove_channel(index: int) -> Tuple[bool, str]: + """Remove a channel.""" + if index == 0: + return False, "Cannot remove Public channel (channel 0)" + + try: + dm = _get_dm() + result = dm.remove_channel(index) + return result['success'], result.get('message', result.get('error', '')) + except Exception as e: + return False, str(e) + + +# ============================================================================= +# Advertisement +# ============================================================================= + +def advert() -> Tuple[bool, str]: + """Send a single advertisement.""" + try: + dm = _get_dm() + result = dm.send_advert(flood=False) + return result['success'], result.get('message', result.get('error', '')) + except Exception as e: + return False, str(e) + + +def floodadv() -> Tuple[bool, str]: + """Send flood advertisement.""" + try: + dm = _get_dm() + result = dm.send_advert(flood=True) + return result['success'], result.get('message', result.get('error', '')) + except Exception as e: + return False, str(e) + + +# ============================================================================= +# Direct Messages +# ============================================================================= + +def send_dm(recipient: str, text: str) -> Tuple[bool, str]: + """Send a direct message.""" + if not recipient or not recipient.strip(): + return False, "Recipient is required" + if not text or not text.strip(): + return False, "Message text is required" + + try: + dm = _get_dm() + # Try to find contact by name first + contact = None + if dm.mc: + contact = dm.mc.get_contact_by_name(recipient.strip()) + + if contact: + pubkey = contact.get('public_key', recipient) + else: + pubkey = recipient.strip() + + result = dm.send_dm(pubkey, text.strip()) + return result['success'], result.get('message', result.get('error', '')) + except Exception as e: + return False, str(e) + + +def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]: + """Check delivery status for DMs by ACK codes.""" + try: + dm = _get_dm() + ack_status = {} + for code in ack_codes: + ack = dm.db.get_ack_for_dm(code) + ack_status[code] = ack + return True, ack_status, "" + except Exception as e: + return False, {}, str(e) + + +def get_retry_ack_codes() -> set: + """Get retry ACK codes. Simplified in v2.""" + return set() + + +def get_auto_retry_config() -> Tuple[bool, Dict]: + """Get auto-retry config. Using meshcore library's built-in retry.""" + return True, { + 'enabled': True, + 'max_attempts': 3, + 'max_flood': 2, + 'note': 'v2 uses meshcore library built-in retry (send_msg_with_retry)' + } + + +def set_auto_retry_config(enabled=None, max_attempts=None, max_flood=None) -> Tuple[bool, Dict]: + """Set auto-retry config. Stub in v2.""" + return get_auto_retry_config() + + +# ============================================================================= +# Device Settings # ============================================================================= def get_device_settings() -> Tuple[bool, Dict]: - """ - Get persistent device settings from .webui_settings.json. - - Returns: - Tuple of (success, settings_dict) - Settings dict currently contains: - { - 'manual_add_contacts': bool - } - """ + """Get persistent device settings.""" settings_path = Path(config.MC_CONFIG_DIR) / ".webui_settings.json" - try: if not settings_path.exists(): - # Return defaults if file doesn't exist return True, {'manual_add_contacts': False} - with open(settings_path, 'r', encoding='utf-8') as f: settings = json.load(f) - # Ensure manual_add_contacts exists if 'manual_add_contacts' not in settings: settings['manual_add_contacts'] = False return True, settings - except Exception as e: logger.error(f"Failed to read device settings: {e}") return False, {'manual_add_contacts': False} def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]: - """ - Enable or disable manual contact approval mode. - - This setting is: - 1. Saved to .webui_settings.json for persistence across container restarts - 2. Applied immediately to the running meshcli session via bridge - - Args: - enabled: True to enable manual approval, False for automatic - - Returns: - Tuple of (success, message) - """ + """Enable/disable manual contact approval.""" try: - response = requests.post( - f"{config.MC_BRIDGE_URL.replace('/cli', '/set_manual_add_contacts')}", - json={'enabled': enabled}, - timeout=DEFAULT_TIMEOUT + 5 - ) - - if response.status_code != 200: - return False, f'Bridge HTTP error: {response.status_code}' - - data = response.json() - - if not data.get('success', False): - error = data.get('error', 'Failed to set manual_add_contacts') - return False, error - - message = data.get('message', f"manual_add_contacts set to {'on' if enabled else 'off'}") - return True, message - - except requests.exceptions.Timeout: - return False, 'Bridge timeout' - except requests.exceptions.ConnectionError: - return False, 'Cannot connect to meshcore-bridge service' + dm_inst = _get_dm() + result = dm_inst.set_manual_add_contacts(enabled) + if result['success']: + # Persist to settings file + settings_path = Path(config.MC_CONFIG_DIR) / ".webui_settings.json" + try: + settings = {} + if settings_path.exists(): + with open(settings_path, 'r') as f: + settings = json.load(f) + settings['manual_add_contacts'] = enabled + settings_path.parent.mkdir(parents=True, exist_ok=True) + with open(settings_path, 'w') as f: + json.dump(settings, f) + except Exception as e: + logger.warning(f"Failed to persist settings: {e}") + return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) -# ============================================================================= -# Device Name Detection -# ============================================================================= - def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]: """ - Fetch detected device name from meshcore-bridge /health endpoint. - - The bridge auto-detects device name from meshcli prompt ("DeviceName|*") - and exposes it via /health endpoint. - - Args: - max_retries: Number of retry attempts if bridge is unavailable - retry_delay: Delay between retries in seconds - - Returns: - Tuple of (device_name, source) - - device_name: Detected name or fallback from config - - source: "detected", "config", or "fallback" + v2: Get device name from DeviceManager instead of bridge. + Kept for backward compatibility with any code that still calls this. """ - bridge_health_url = config.MC_BRIDGE_URL.replace('/cli', '/health') - - for attempt in range(max_retries): - try: - response = requests.get(bridge_health_url, timeout=5) - if response.status_code == 200: - data = response.json() - if data.get('status') == 'healthy' or data.get('device_name_source') == 'detected': - device_name = data.get('device_name') - source = data.get('device_name_source', 'unknown') - if device_name: - logger.info(f"Got device name from bridge: {device_name} (source: {source})") - return device_name, source - except requests.exceptions.ConnectionError: - logger.warning(f"Bridge not reachable, attempt {attempt + 1}/{max_retries}") - except requests.exceptions.Timeout: - logger.warning(f"Bridge timeout, attempt {attempt + 1}/{max_retries}") - except Exception as e: - logger.warning(f"Attempt {attempt + 1}/{max_retries} failed: {e}") - - if attempt < max_retries - 1: - time.sleep(retry_delay) - - logger.warning(f"Using fallback device name: {config.MC_DEVICE_NAME}") + try: + dm = _get_dm() + if dm.is_connected: + return dm.device_name, "device" + except Exception: + pass return config.MC_DEVICE_NAME, "fallback"