mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-03-28 17:42:45 +01:00
The MeshCore community uses "companion" not "client" for type 1 nodes. Rename the CLI label to COM across all UI, API, JS, and docs to align with official terminology. Includes cache migration for old CLI entries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
488 lines
16 KiB
Python
488 lines
16 KiB
Python
"""
|
|
MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge).
|
|
|
|
Function signatures preserved for backward compatibility with api.py.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Tuple, Optional, List, Dict
|
|
from app.config import config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MeshCLIError(Exception):
|
|
"""Custom exception for meshcli command failures"""
|
|
pass
|
|
|
|
|
|
def _get_dm():
|
|
"""Get the DeviceManager instance — try Flask app context first, then module global."""
|
|
try:
|
|
from flask import current_app
|
|
dm = getattr(current_app, 'device_manager', None)
|
|
if dm is not None:
|
|
return dm
|
|
except RuntimeError:
|
|
pass # Outside of Flask request context
|
|
|
|
from app.main import device_manager
|
|
if device_manager is None:
|
|
raise MeshCLIError("DeviceManager not initialized")
|
|
return device_manager
|
|
|
|
|
|
def _get_db():
|
|
"""Get Database instance — try Flask app context first, then DeviceManager."""
|
|
try:
|
|
from flask import current_app
|
|
db = getattr(current_app, 'db', None)
|
|
if db is not None:
|
|
return db
|
|
except RuntimeError:
|
|
pass
|
|
try:
|
|
dm = _get_dm()
|
|
return dm.db
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Messages
|
|
# =============================================================================
|
|
|
|
def recv_messages() -> Tuple[bool, str]:
|
|
"""
|
|
In v2, messages arrive via events (auto-fetching).
|
|
This is a no-op — kept for backward compatibility.
|
|
"""
|
|
return True, "Messages are received automatically via events"
|
|
|
|
|
|
def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Tuple[bool, str]:
|
|
"""Send a message to a channel."""
|
|
if reply_to:
|
|
text = f"@[{reply_to}] {text}"
|
|
|
|
try:
|
|
dm = _get_dm()
|
|
result = dm.send_channel_message(channel_index, text)
|
|
if result['success']:
|
|
return True, result.get('message', 'Message sent')
|
|
return False, result.get('error', 'Failed to send message')
|
|
except Exception as e:
|
|
logger.error(f"send_message error: {e}")
|
|
return False, str(e)
|
|
|
|
|
|
# =============================================================================
|
|
# Contacts
|
|
# =============================================================================
|
|
|
|
def get_contacts() -> Tuple[bool, str]:
|
|
"""Get contacts list as formatted text."""
|
|
try:
|
|
dm = _get_dm()
|
|
contacts = dm.get_contacts_from_device()
|
|
if not contacts:
|
|
return True, "No contacts"
|
|
lines = []
|
|
for c in contacts:
|
|
name = c.get('name', '?')
|
|
pk = c.get('public_key', '')[:12]
|
|
lines.append(f"{name} {pk}")
|
|
return True, "\n".join(lines)
|
|
except Exception as e:
|
|
logger.error(f"get_contacts error: {e}")
|
|
return False, str(e)
|
|
|
|
|
|
def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]:
|
|
"""Parse contacts output to extract names. In v2, reads from DB."""
|
|
try:
|
|
dm = _get_dm()
|
|
contacts = dm.db.get_contacts()
|
|
return [c['name'] for c in contacts if c.get('name')]
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def get_contacts_list() -> Tuple[bool, List[str], str]:
|
|
"""Get parsed list of contact names."""
|
|
try:
|
|
dm = _get_dm()
|
|
contacts = dm.db.get_contacts()
|
|
names = [c['name'] for c in contacts if c.get('name')]
|
|
return True, names, ""
|
|
except Exception as e:
|
|
return False, [], str(e)
|
|
|
|
|
|
def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]:
|
|
"""Get detailed list of all contacts from DB."""
|
|
try:
|
|
dm = _get_dm()
|
|
contacts = dm.db.get_contacts()
|
|
result = []
|
|
for c in contacts:
|
|
pk = c.get('public_key', '')
|
|
result.append({
|
|
'name': c.get('name', ''),
|
|
'public_key_prefix': pk[:12] if len(pk) >= 12 else pk,
|
|
'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'),
|
|
'path_or_mode': c.get('out_path', '') or 'Flood',
|
|
'raw_line': '',
|
|
})
|
|
return True, result, len(result), ""
|
|
except Exception as e:
|
|
return False, [], 0, str(e)
|
|
|
|
|
|
def _parse_last_advert(value) -> int:
|
|
"""Convert last_advert from DB (Unix timestamp string or ISO string) to Unix int."""
|
|
if not value:
|
|
return 0
|
|
# Try as Unix timestamp first
|
|
try:
|
|
ts = int(value)
|
|
if ts > 0:
|
|
return ts
|
|
except (ValueError, TypeError):
|
|
pass
|
|
# Try as ISO datetime string
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(str(value))
|
|
return int(dt.timestamp())
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return 0
|
|
|
|
|
|
def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]:
|
|
"""Get contacts actually on the device firmware (from mc.contacts)."""
|
|
try:
|
|
dm = _get_dm()
|
|
if not dm.mc or not dm.mc.contacts:
|
|
return True, {}, ""
|
|
contacts_dict = {}
|
|
for pk, contact in dm.mc.contacts.items():
|
|
last_adv = contact.get('last_advert')
|
|
contacts_dict[pk] = {
|
|
'public_key': pk,
|
|
'type': contact.get('type', 1),
|
|
'flags': contact.get('flags', 0),
|
|
'out_path_len': contact.get('out_path_len', -1),
|
|
'out_path': contact.get('out_path', ''),
|
|
'adv_name': contact.get('adv_name', contact.get('name', '')),
|
|
'last_advert': int(last_adv) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else 0,
|
|
'adv_lat': contact.get('adv_lat', 0.0),
|
|
'adv_lon': contact.get('adv_lon', 0.0),
|
|
'lastmod': '',
|
|
}
|
|
return True, contacts_dict, ""
|
|
except Exception as e:
|
|
return False, {}, str(e)
|
|
|
|
|
|
def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]:
|
|
"""Get all contacts as JSON dict (keyed by public_key)."""
|
|
try:
|
|
dm = _get_dm()
|
|
contacts = dm.db.get_contacts()
|
|
contacts_dict = {}
|
|
for c in contacts:
|
|
pk = c.get('public_key', '')
|
|
contacts_dict[pk] = {
|
|
'public_key': pk,
|
|
'type': c.get('type', 0),
|
|
'adv_name': c.get('name', ''),
|
|
'flags': c.get('flags', 0),
|
|
'out_path_len': c.get('out_path_len', -1),
|
|
'out_path': c.get('out_path', ''),
|
|
'last_advert': _parse_last_advert(c.get('last_advert')),
|
|
'adv_lat': c.get('adv_lat'),
|
|
'adv_lon': c.get('adv_lon'),
|
|
'lastmod': c.get('lastmod', ''),
|
|
}
|
|
return True, contacts_dict, ""
|
|
except Exception as e:
|
|
return False, {}, str(e)
|
|
|
|
|
|
def delete_contact(selector: str) -> Tuple[bool, str]:
|
|
"""Delete a contact by public key or name."""
|
|
if not selector or not selector.strip():
|
|
return False, "Contact selector is required"
|
|
|
|
selector = selector.strip()
|
|
|
|
try:
|
|
dm = _get_dm()
|
|
# Try as public key first
|
|
contact = dm.db.get_contact(selector)
|
|
if contact:
|
|
result = dm.delete_contact(selector)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
|
|
# Try to find by name
|
|
contacts = dm.db.get_contacts()
|
|
for c in contacts:
|
|
if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector):
|
|
result = dm.delete_contact(c['public_key'])
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
|
|
return False, f"Contact not found: {selector}"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def delete_cached_contact(public_key: str) -> Tuple[bool, str]:
|
|
"""Hard-delete a cache-only contact from the database."""
|
|
if not public_key or not public_key.strip():
|
|
return False, "Public key is required"
|
|
|
|
try:
|
|
dm = _get_dm()
|
|
result = dm.delete_cached_contact(public_key.strip())
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]:
|
|
"""Remove contacts inactive for specified hours. Simplified in v2."""
|
|
# TODO: implement time-based cleanup via database query
|
|
return False, "Contact cleanup not yet implemented in v2"
|
|
|
|
|
|
def get_pending_contacts() -> Tuple[bool, List[Dict], str]:
|
|
"""Get list of contacts awaiting manual approval."""
|
|
try:
|
|
dm = _get_dm()
|
|
pending = dm.get_pending_contacts()
|
|
return True, pending, ""
|
|
except Exception as e:
|
|
return False, [], str(e)
|
|
|
|
|
|
def approve_pending_contact(public_key: str) -> Tuple[bool, str]:
|
|
"""Approve a pending contact."""
|
|
try:
|
|
dm = _get_dm()
|
|
result = dm.approve_contact(public_key)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
# =============================================================================
|
|
# Device Info
|
|
# =============================================================================
|
|
|
|
def get_device_info() -> Tuple[bool, str]:
|
|
"""Get device information."""
|
|
try:
|
|
dm = _get_dm()
|
|
info = dm.get_device_info()
|
|
if info:
|
|
lines = [f"{k}: {v}" for k, v in info.items()]
|
|
return True, "\n".join(lines)
|
|
return False, "No device info available"
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def check_connection() -> bool:
|
|
"""Check if device is connected."""
|
|
try:
|
|
dm = _get_dm()
|
|
return dm.is_connected
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# =============================================================================
|
|
# Channels
|
|
# =============================================================================
|
|
|
|
def get_channels() -> Tuple[bool, List[Dict]]:
|
|
"""Get list of configured channels."""
|
|
try:
|
|
dm = _get_dm()
|
|
channels = []
|
|
for idx in range(dm._max_channels):
|
|
info = dm.get_channel_info(idx)
|
|
if info and info.get('name'):
|
|
channels.append({
|
|
'index': idx,
|
|
'name': info.get('name', ''),
|
|
'key': info.get('secret', info.get('key', '')),
|
|
})
|
|
return True, channels
|
|
except Exception as e:
|
|
logger.error(f"get_channels error: {e}")
|
|
return False, []
|
|
|
|
|
|
def add_channel(name: str) -> Tuple[bool, str, Optional[str]]:
|
|
"""Add a new channel."""
|
|
try:
|
|
dm = _get_dm()
|
|
# Find first free slot (1+, slot 0 is Public)
|
|
for idx in range(1, dm._max_channels):
|
|
info = dm.get_channel_info(idx)
|
|
if not info or not info.get('name'):
|
|
result = dm.set_channel(idx, name)
|
|
if result['success']:
|
|
return True, f"Channel '{name}' created at slot {idx}", None
|
|
return False, result.get('error', 'Failed'), None
|
|
return False, "No free channel slots available", None
|
|
except Exception as e:
|
|
return False, str(e), None
|
|
|
|
|
|
def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Set/join a channel."""
|
|
try:
|
|
dm = _get_dm()
|
|
secret = bytes.fromhex(key) if key else None
|
|
result = dm.set_channel(index, name, secret)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def remove_channel(index: int) -> Tuple[bool, str]:
|
|
"""Remove a channel."""
|
|
if index == 0:
|
|
return False, "Cannot remove Public channel (channel 0)"
|
|
|
|
try:
|
|
dm = _get_dm()
|
|
result = dm.remove_channel(index)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
# =============================================================================
|
|
# Advertisement
|
|
# =============================================================================
|
|
|
|
def advert() -> Tuple[bool, str]:
|
|
"""Send a single advertisement."""
|
|
try:
|
|
dm = _get_dm()
|
|
result = dm.send_advert(flood=False)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def floodadv() -> Tuple[bool, str]:
|
|
"""Send flood advertisement."""
|
|
try:
|
|
dm = _get_dm()
|
|
result = dm.send_advert(flood=True)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
# =============================================================================
|
|
# Direct Messages
|
|
# =============================================================================
|
|
|
|
def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]:
|
|
"""Send a direct message. Returns (success, result_dict)."""
|
|
if not recipient or not recipient.strip():
|
|
return False, {'error': "Recipient is required"}
|
|
if not text or not text.strip():
|
|
return False, {'error': "Message text is required"}
|
|
|
|
try:
|
|
dm = _get_dm()
|
|
recipient = recipient.strip()
|
|
|
|
# Try to find contact by name in mc.contacts (in-memory)
|
|
contact = None
|
|
if dm.mc:
|
|
contact = dm.mc.get_contact_by_name(recipient)
|
|
|
|
if contact:
|
|
pubkey = contact.get('public_key', recipient)
|
|
elif len(recipient) >= 12 and all(c in '0123456789abcdef' for c in recipient.lower()):
|
|
# Looks like a pubkey/prefix already
|
|
pubkey = recipient
|
|
else:
|
|
# Name not in mc.contacts — try DB lookup
|
|
db_contact = dm.db.get_contact_by_name(recipient)
|
|
if db_contact:
|
|
pubkey = db_contact['public_key']
|
|
else:
|
|
pubkey = recipient
|
|
|
|
result = dm.send_dm(pubkey, text.strip())
|
|
return result['success'], result
|
|
except Exception as e:
|
|
return False, {'error': str(e)}
|
|
|
|
|
|
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
|
|
"""Check delivery status for DMs by ACK codes."""
|
|
try:
|
|
dm = _get_dm()
|
|
ack_status = {}
|
|
for code in ack_codes:
|
|
ack = dm.db.get_ack_for_dm(code)
|
|
ack_status[code] = ack
|
|
return True, ack_status, ""
|
|
except Exception as e:
|
|
return False, {}, str(e)
|
|
|
|
|
|
# =============================================================================
|
|
# Device Settings
|
|
# =============================================================================
|
|
|
|
def get_device_settings() -> Tuple[bool, Dict]:
|
|
"""Get persistent device settings from database."""
|
|
try:
|
|
db = _get_db()
|
|
manual = db.get_setting_json('manual_add_contacts', False) if db else False
|
|
return True, {'manual_add_contacts': manual}
|
|
except Exception as e:
|
|
logger.error(f"Failed to read device settings: {e}")
|
|
return False, {'manual_add_contacts': False}
|
|
|
|
|
|
def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]:
|
|
"""Enable/disable manual contact approval."""
|
|
try:
|
|
dm_inst = _get_dm()
|
|
result = dm_inst.set_manual_add_contacts(enabled)
|
|
if result['success']:
|
|
# Persist to database
|
|
db = _get_db()
|
|
if db:
|
|
db.set_setting_json('manual_add_contacts', enabled)
|
|
return result['success'], result.get('message', result.get('error', ''))
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]:
|
|
"""
|
|
v2: Get device name from DeviceManager instead of bridge.
|
|
Kept for backward compatibility with any code that still calls this.
|
|
"""
|
|
try:
|
|
dm = _get_dm()
|
|
if dm.is_connected:
|
|
return dm.device_name, "device"
|
|
except Exception:
|
|
pass
|
|
return config.MC_DEVICE_NAME, "fallback"
|