Files
mc-webui/app/meshcore/cli.py
MarekWo 33a71bed17 refactor(ui): rename contact type label CLI to COM (companion)
The MeshCore community uses "companion" not "client" for type 1 nodes.
Rename the CLI label to COM across all UI, API, JS, and docs to align
with official terminology. Includes cache migration for old CLI entries.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 14:37:30 +01:00

488 lines
16 KiB
Python

"""
MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge).
Function signatures preserved for backward compatibility with api.py.
"""
import logging
from typing import Tuple, Optional, List, Dict
from app.config import config
logger = logging.getLogger(__name__)
class MeshCLIError(Exception):
"""Custom exception for meshcli command failures"""
pass
def _get_dm():
"""Get the DeviceManager instance — try Flask app context first, then module global."""
try:
from flask import current_app
dm = getattr(current_app, 'device_manager', None)
if dm is not None:
return dm
except RuntimeError:
pass # Outside of Flask request context
from app.main import device_manager
if device_manager is None:
raise MeshCLIError("DeviceManager not initialized")
return device_manager
def _get_db():
"""Get Database instance — try Flask app context first, then DeviceManager."""
try:
from flask import current_app
db = getattr(current_app, 'db', None)
if db is not None:
return db
except RuntimeError:
pass
try:
dm = _get_dm()
return dm.db
except Exception:
return None
# =============================================================================
# Messages
# =============================================================================
def recv_messages() -> Tuple[bool, str]:
"""
In v2, messages arrive via events (auto-fetching).
This is a no-op — kept for backward compatibility.
"""
return True, "Messages are received automatically via events"
def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Tuple[bool, str]:
"""Send a message to a channel."""
if reply_to:
text = f"@[{reply_to}] {text}"
try:
dm = _get_dm()
result = dm.send_channel_message(channel_index, text)
if result['success']:
return True, result.get('message', 'Message sent')
return False, result.get('error', 'Failed to send message')
except Exception as e:
logger.error(f"send_message error: {e}")
return False, str(e)
# =============================================================================
# Contacts
# =============================================================================
def get_contacts() -> Tuple[bool, str]:
"""Get contacts list as formatted text."""
try:
dm = _get_dm()
contacts = dm.get_contacts_from_device()
if not contacts:
return True, "No contacts"
lines = []
for c in contacts:
name = c.get('name', '?')
pk = c.get('public_key', '')[:12]
lines.append(f"{name} {pk}")
return True, "\n".join(lines)
except Exception as e:
logger.error(f"get_contacts error: {e}")
return False, str(e)
def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]:
"""Parse contacts output to extract names. In v2, reads from DB."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
return [c['name'] for c in contacts if c.get('name')]
except Exception:
return []
def get_contacts_list() -> Tuple[bool, List[str], str]:
"""Get parsed list of contact names."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
names = [c['name'] for c in contacts if c.get('name')]
return True, names, ""
except Exception as e:
return False, [], str(e)
def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]:
"""Get detailed list of all contacts from DB."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
result = []
for c in contacts:
pk = c.get('public_key', '')
result.append({
'name': c.get('name', ''),
'public_key_prefix': pk[:12] if len(pk) >= 12 else pk,
'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'),
'path_or_mode': c.get('out_path', '') or 'Flood',
'raw_line': '',
})
return True, result, len(result), ""
except Exception as e:
return False, [], 0, str(e)
def _parse_last_advert(value) -> int:
"""Convert last_advert from DB (Unix timestamp string or ISO string) to Unix int."""
if not value:
return 0
# Try as Unix timestamp first
try:
ts = int(value)
if ts > 0:
return ts
except (ValueError, TypeError):
pass
# Try as ISO datetime string
try:
from datetime import datetime
dt = datetime.fromisoformat(str(value))
return int(dt.timestamp())
except (ValueError, TypeError):
pass
return 0
def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]:
"""Get contacts actually on the device firmware (from mc.contacts)."""
try:
dm = _get_dm()
if not dm.mc or not dm.mc.contacts:
return True, {}, ""
contacts_dict = {}
for pk, contact in dm.mc.contacts.items():
last_adv = contact.get('last_advert')
contacts_dict[pk] = {
'public_key': pk,
'type': contact.get('type', 1),
'flags': contact.get('flags', 0),
'out_path_len': contact.get('out_path_len', -1),
'out_path': contact.get('out_path', ''),
'adv_name': contact.get('adv_name', contact.get('name', '')),
'last_advert': int(last_adv) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else 0,
'adv_lat': contact.get('adv_lat', 0.0),
'adv_lon': contact.get('adv_lon', 0.0),
'lastmod': '',
}
return True, contacts_dict, ""
except Exception as e:
return False, {}, str(e)
def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]:
"""Get all contacts as JSON dict (keyed by public_key)."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
contacts_dict = {}
for c in contacts:
pk = c.get('public_key', '')
contacts_dict[pk] = {
'public_key': pk,
'type': c.get('type', 0),
'adv_name': c.get('name', ''),
'flags': c.get('flags', 0),
'out_path_len': c.get('out_path_len', -1),
'out_path': c.get('out_path', ''),
'last_advert': _parse_last_advert(c.get('last_advert')),
'adv_lat': c.get('adv_lat'),
'adv_lon': c.get('adv_lon'),
'lastmod': c.get('lastmod', ''),
}
return True, contacts_dict, ""
except Exception as e:
return False, {}, str(e)
def delete_contact(selector: str) -> Tuple[bool, str]:
"""Delete a contact by public key or name."""
if not selector or not selector.strip():
return False, "Contact selector is required"
selector = selector.strip()
try:
dm = _get_dm()
# Try as public key first
contact = dm.db.get_contact(selector)
if contact:
result = dm.delete_contact(selector)
return result['success'], result.get('message', result.get('error', ''))
# Try to find by name
contacts = dm.db.get_contacts()
for c in contacts:
if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector):
result = dm.delete_contact(c['public_key'])
return result['success'], result.get('message', result.get('error', ''))
return False, f"Contact not found: {selector}"
except Exception as e:
return False, str(e)
def delete_cached_contact(public_key: str) -> Tuple[bool, str]:
"""Hard-delete a cache-only contact from the database."""
if not public_key or not public_key.strip():
return False, "Public key is required"
try:
dm = _get_dm()
result = dm.delete_cached_contact(public_key.strip())
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]:
"""Remove contacts inactive for specified hours. Simplified in v2."""
# TODO: implement time-based cleanup via database query
return False, "Contact cleanup not yet implemented in v2"
def get_pending_contacts() -> Tuple[bool, List[Dict], str]:
"""Get list of contacts awaiting manual approval."""
try:
dm = _get_dm()
pending = dm.get_pending_contacts()
return True, pending, ""
except Exception as e:
return False, [], str(e)
def approve_pending_contact(public_key: str) -> Tuple[bool, str]:
"""Approve a pending contact."""
try:
dm = _get_dm()
result = dm.approve_contact(public_key)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Device Info
# =============================================================================
def get_device_info() -> Tuple[bool, str]:
"""Get device information."""
try:
dm = _get_dm()
info = dm.get_device_info()
if info:
lines = [f"{k}: {v}" for k, v in info.items()]
return True, "\n".join(lines)
return False, "No device info available"
except Exception as e:
return False, str(e)
def check_connection() -> bool:
"""Check if device is connected."""
try:
dm = _get_dm()
return dm.is_connected
except Exception:
return False
# =============================================================================
# Channels
# =============================================================================
def get_channels() -> Tuple[bool, List[Dict]]:
"""Get list of configured channels."""
try:
dm = _get_dm()
channels = []
for idx in range(dm._max_channels):
info = dm.get_channel_info(idx)
if info and info.get('name'):
channels.append({
'index': idx,
'name': info.get('name', ''),
'key': info.get('secret', info.get('key', '')),
})
return True, channels
except Exception as e:
logger.error(f"get_channels error: {e}")
return False, []
def add_channel(name: str) -> Tuple[bool, str, Optional[str]]:
"""Add a new channel."""
try:
dm = _get_dm()
# Find first free slot (1+, slot 0 is Public)
for idx in range(1, dm._max_channels):
info = dm.get_channel_info(idx)
if not info or not info.get('name'):
result = dm.set_channel(idx, name)
if result['success']:
return True, f"Channel '{name}' created at slot {idx}", None
return False, result.get('error', 'Failed'), None
return False, "No free channel slots available", None
except Exception as e:
return False, str(e), None
def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]:
"""Set/join a channel."""
try:
dm = _get_dm()
secret = bytes.fromhex(key) if key else None
result = dm.set_channel(index, name, secret)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def remove_channel(index: int) -> Tuple[bool, str]:
"""Remove a channel."""
if index == 0:
return False, "Cannot remove Public channel (channel 0)"
try:
dm = _get_dm()
result = dm.remove_channel(index)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Advertisement
# =============================================================================
def advert() -> Tuple[bool, str]:
"""Send a single advertisement."""
try:
dm = _get_dm()
result = dm.send_advert(flood=False)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def floodadv() -> Tuple[bool, str]:
"""Send flood advertisement."""
try:
dm = _get_dm()
result = dm.send_advert(flood=True)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Direct Messages
# =============================================================================
def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]:
"""Send a direct message. Returns (success, result_dict)."""
if not recipient or not recipient.strip():
return False, {'error': "Recipient is required"}
if not text or not text.strip():
return False, {'error': "Message text is required"}
try:
dm = _get_dm()
recipient = recipient.strip()
# Try to find contact by name in mc.contacts (in-memory)
contact = None
if dm.mc:
contact = dm.mc.get_contact_by_name(recipient)
if contact:
pubkey = contact.get('public_key', recipient)
elif len(recipient) >= 12 and all(c in '0123456789abcdef' for c in recipient.lower()):
# Looks like a pubkey/prefix already
pubkey = recipient
else:
# Name not in mc.contacts — try DB lookup
db_contact = dm.db.get_contact_by_name(recipient)
if db_contact:
pubkey = db_contact['public_key']
else:
pubkey = recipient
result = dm.send_dm(pubkey, text.strip())
return result['success'], result
except Exception as e:
return False, {'error': str(e)}
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
"""Check delivery status for DMs by ACK codes."""
try:
dm = _get_dm()
ack_status = {}
for code in ack_codes:
ack = dm.db.get_ack_for_dm(code)
ack_status[code] = ack
return True, ack_status, ""
except Exception as e:
return False, {}, str(e)
# =============================================================================
# Device Settings
# =============================================================================
def get_device_settings() -> Tuple[bool, Dict]:
"""Get persistent device settings from database."""
try:
db = _get_db()
manual = db.get_setting_json('manual_add_contacts', False) if db else False
return True, {'manual_add_contacts': manual}
except Exception as e:
logger.error(f"Failed to read device settings: {e}")
return False, {'manual_add_contacts': False}
def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]:
"""Enable/disable manual contact approval."""
try:
dm_inst = _get_dm()
result = dm_inst.set_manual_add_contacts(enabled)
if result['success']:
# Persist to database
db = _get_db()
if db:
db.set_setting_json('manual_add_contacts', enabled)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]:
"""
v2: Get device name from DeviceManager instead of bridge.
Kept for backward compatibility with any code that still calls this.
"""
try:
dm = _get_dm()
if dm.is_connected:
return dm.device_name, "device"
except Exception:
pass
return config.MC_DEVICE_NAME, "fallback"