""" MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge). Function signatures preserved for backward compatibility with api.py. """ import logging from typing import Tuple, Optional, List, Dict from app.config import config logger = logging.getLogger(__name__) class MeshCLIError(Exception): """Custom exception for meshcli command failures""" pass def _get_dm(): """Get the DeviceManager instance — try Flask app context first, then module global.""" try: from flask import current_app dm = getattr(current_app, 'device_manager', None) if dm is not None: return dm except RuntimeError: pass # Outside of Flask request context from app.main import device_manager if device_manager is None: raise MeshCLIError("DeviceManager not initialized") return device_manager def _get_db(): """Get Database instance — try Flask app context first, then DeviceManager.""" try: from flask import current_app db = getattr(current_app, 'db', None) if db is not None: return db except RuntimeError: pass try: dm = _get_dm() return dm.db except Exception: return None # ============================================================================= # Messages # ============================================================================= def recv_messages() -> Tuple[bool, str]: """ In v2, messages arrive via events (auto-fetching). This is a no-op — kept for backward compatibility. """ return True, "Messages are received automatically via events" def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Tuple[bool, str]: """Send a message to a channel.""" if reply_to: text = f"@[{reply_to}] {text}" try: dm = _get_dm() result = dm.send_channel_message(channel_index, text) if result['success']: return True, result.get('message', 'Message sent') return False, result.get('error', 'Failed to send message') except Exception as e: logger.error(f"send_message error: {e}") return False, str(e) # ============================================================================= # Contacts # ============================================================================= def get_contacts() -> Tuple[bool, str]: """Get contacts list as formatted text.""" try: dm = _get_dm() contacts = dm.get_contacts_from_device() if not contacts: return True, "No contacts" lines = [] for c in contacts: name = c.get('name', '?') pk = c.get('public_key', '')[:12] lines.append(f"{name} {pk}") return True, "\n".join(lines) except Exception as e: logger.error(f"get_contacts error: {e}") return False, str(e) def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]: """Parse contacts output to extract names. In v2, reads from DB.""" try: dm = _get_dm() contacts = dm.db.get_contacts() return [c['name'] for c in contacts if c.get('name')] except Exception: return [] def get_contacts_list() -> Tuple[bool, List[str], str]: """Get parsed list of contact names.""" try: dm = _get_dm() contacts = dm.db.get_contacts() names = [c['name'] for c in contacts if c.get('name')] return True, names, "" except Exception as e: return False, [], str(e) def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]: """Get detailed list of all contacts from DB.""" try: dm = _get_dm() contacts = dm.db.get_contacts() result = [] for c in contacts: pk = c.get('public_key', '') result.append({ 'name': c.get('name', ''), 'public_key_prefix': pk[:12] if len(pk) >= 12 else pk, 'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'), 'path_or_mode': c.get('out_path', '') or 'Flood', 'raw_line': '', }) return True, result, len(result), "" except Exception as e: return False, [], 0, str(e) def _parse_last_advert(value) -> int: """Convert last_advert from DB (Unix timestamp string or ISO string) to Unix int.""" if not value: return 0 # Try as Unix timestamp first try: ts = int(value) if ts > 0: return ts except (ValueError, TypeError): pass # Try as ISO datetime string try: from datetime import datetime dt = datetime.fromisoformat(str(value)) return int(dt.timestamp()) except (ValueError, TypeError): pass return 0 def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]: """Get contacts actually on the device firmware (from mc.contacts).""" try: dm = _get_dm() if not dm.mc or not dm.mc.contacts: return True, {}, "" contacts_dict = {} for pk, contact in dm.mc.contacts.items(): last_adv = contact.get('last_advert') contacts_dict[pk] = { 'public_key': pk, 'type': contact.get('type', 1), 'flags': contact.get('flags', 0), 'out_path_len': contact.get('out_path_len', -1), 'out_path': contact.get('out_path', ''), 'adv_name': contact.get('adv_name', contact.get('name', '')), 'last_advert': int(last_adv) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else 0, 'adv_lat': contact.get('adv_lat', 0.0), 'adv_lon': contact.get('adv_lon', 0.0), 'lastmod': '', } return True, contacts_dict, "" except Exception as e: return False, {}, str(e) def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]: """Get all contacts as JSON dict (keyed by public_key).""" try: dm = _get_dm() contacts = dm.db.get_contacts() contacts_dict = {} for c in contacts: pk = c.get('public_key', '') contacts_dict[pk] = { 'public_key': pk, 'type': c.get('type', 0), 'adv_name': c.get('name', ''), 'flags': c.get('flags', 0), 'out_path_len': c.get('out_path_len', -1), 'out_path': c.get('out_path', ''), 'last_advert': _parse_last_advert(c.get('last_advert')), 'adv_lat': c.get('adv_lat'), 'adv_lon': c.get('adv_lon'), 'lastmod': c.get('lastmod', ''), } return True, contacts_dict, "" except Exception as e: return False, {}, str(e) def delete_contact(selector: str) -> Tuple[bool, str]: """Delete a contact by public key or name.""" if not selector or not selector.strip(): return False, "Contact selector is required" selector = selector.strip() try: dm = _get_dm() # Try as public key first contact = dm.db.get_contact(selector) if contact: result = dm.delete_contact(selector) return result['success'], result.get('message', result.get('error', '')) # Try to find by name contacts = dm.db.get_contacts() for c in contacts: if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector): result = dm.delete_contact(c['public_key']) return result['success'], result.get('message', result.get('error', '')) return False, f"Contact not found: {selector}" except Exception as e: return False, str(e) def delete_cached_contact(public_key: str) -> Tuple[bool, str]: """Hard-delete a cache-only contact from the database.""" if not public_key or not public_key.strip(): return False, "Public key is required" try: dm = _get_dm() result = dm.delete_cached_contact(public_key.strip()) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]: """Remove contacts inactive for specified hours. Simplified in v2.""" # TODO: implement time-based cleanup via database query return False, "Contact cleanup not yet implemented in v2" def get_pending_contacts() -> Tuple[bool, List[Dict], str]: """Get list of contacts awaiting manual approval.""" try: dm = _get_dm() pending = dm.get_pending_contacts() return True, pending, "" except Exception as e: return False, [], str(e) def approve_pending_contact(public_key: str) -> Tuple[bool, str]: """Approve a pending contact.""" try: dm = _get_dm() result = dm.approve_contact(public_key) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) # ============================================================================= # Device Info # ============================================================================= def get_device_info() -> Tuple[bool, str]: """Get device information.""" try: dm = _get_dm() info = dm.get_device_info() if info: lines = [f"{k}: {v}" for k, v in info.items()] return True, "\n".join(lines) return False, "No device info available" except Exception as e: return False, str(e) def check_connection() -> bool: """Check if device is connected.""" try: dm = _get_dm() return dm.is_connected except Exception: return False # ============================================================================= # Channels # ============================================================================= def get_channels() -> Tuple[bool, List[Dict]]: """Get list of configured channels.""" try: dm = _get_dm() channels = [] for idx in range(dm._max_channels): info = dm.get_channel_info(idx) if info and info.get('name'): channels.append({ 'index': idx, 'name': info.get('name', ''), 'key': info.get('secret', info.get('key', '')), }) return True, channels except Exception as e: logger.error(f"get_channels error: {e}") return False, [] def add_channel(name: str) -> Tuple[bool, str, Optional[str]]: """Add a new channel.""" try: dm = _get_dm() # Find first free slot (1+, slot 0 is Public) for idx in range(1, dm._max_channels): info = dm.get_channel_info(idx) if not info or not info.get('name'): result = dm.set_channel(idx, name) if result['success']: return True, f"Channel '{name}' created at slot {idx}", None return False, result.get('error', 'Failed'), None return False, "No free channel slots available", None except Exception as e: return False, str(e), None def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]: """Set/join a channel.""" try: dm = _get_dm() secret = bytes.fromhex(key) if key else None result = dm.set_channel(index, name, secret) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) def remove_channel(index: int) -> Tuple[bool, str]: """Remove a channel.""" if index == 0: return False, "Cannot remove Public channel (channel 0)" try: dm = _get_dm() result = dm.remove_channel(index) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) # ============================================================================= # Advertisement # ============================================================================= def advert() -> Tuple[bool, str]: """Send a single advertisement.""" try: dm = _get_dm() result = dm.send_advert(flood=False) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) def floodadv() -> Tuple[bool, str]: """Send flood advertisement.""" try: dm = _get_dm() result = dm.send_advert(flood=True) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) # ============================================================================= # Direct Messages # ============================================================================= def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]: """Send a direct message. Returns (success, result_dict).""" if not recipient or not recipient.strip(): return False, {'error': "Recipient is required"} if not text or not text.strip(): return False, {'error': "Message text is required"} try: dm = _get_dm() recipient = recipient.strip() # Try to find contact by name in mc.contacts (in-memory) contact = None if dm.mc: contact = dm.mc.get_contact_by_name(recipient) if contact: pubkey = contact.get('public_key', recipient) elif len(recipient) >= 12 and all(c in '0123456789abcdef' for c in recipient.lower()): # Looks like a pubkey/prefix already pubkey = recipient else: # Name not in mc.contacts — try DB lookup db_contact = dm.db.get_contact_by_name(recipient) if db_contact: pubkey = db_contact['public_key'] else: pubkey = recipient result = dm.send_dm(pubkey, text.strip()) return result['success'], result except Exception as e: return False, {'error': str(e)} def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]: """Check delivery status for DMs by ACK codes.""" try: dm = _get_dm() ack_status = {} for code in ack_codes: ack = dm.db.get_ack_for_dm(code) ack_status[code] = ack return True, ack_status, "" except Exception as e: return False, {}, str(e) # ============================================================================= # Device Settings # ============================================================================= def get_device_settings() -> Tuple[bool, Dict]: """Get persistent device settings from database.""" try: db = _get_db() manual = db.get_setting_json('manual_add_contacts', False) if db else False return True, {'manual_add_contacts': manual} except Exception as e: logger.error(f"Failed to read device settings: {e}") return False, {'manual_add_contacts': False} def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]: """Enable/disable manual contact approval.""" try: dm_inst = _get_dm() result = dm_inst.set_manual_add_contacts(enabled) if result['success']: # Persist to database db = _get_db() if db: db.set_setting_json('manual_add_contacts', enabled) return result['success'], result.get('message', result.get('error', '')) except Exception as e: return False, str(e) def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]: """ v2: Get device name from DeviceManager instead of bridge. Kept for backward compatibility with any code that still calls this. """ try: dm = _get_dm() if dm.is_connected: return dm.device_name, "device" except Exception: pass return config.MC_DEVICE_NAME, "fallback"