Files
mc-webui/app/meshcore/cli.py
T
MarekWo 1d47c9c0e8 fix(perf): polling-only Socket.IO + channels DB fallback on USB timeout
Werkzeug dev server can't upgrade WebSockets, so every io() upgrade attempt
returned HTTP 500 and clients fell into a polling/upgrade reconnect loop —
visible as 10-15s freezes on app load. Force transports: ['polling'] on
/chat, /console and /logs clients; long-poll keeps real-time pushes
working with ~1-2s latency.

When the MeshCore device briefly stalls, get_channel_info() used to block
on the default 30s timeout per slot, so iterating max_channels slots could
take minutes; in practice only Public answered and the rest timed out,
leaving the UI with just one channel. Drop per-call timeout to 3s, raise
TimeoutError to the caller, and have cli.get_channels() break on first
timeout and merge the remaining slots from the channels table in the DB
(which already mirrors device state via upsert_channel).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-07 07:31:47 +02:00

547 lines
18 KiB
Python

"""
MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge).
Function signatures preserved for backward compatibility with api.py.
"""
import logging
from concurrent.futures import TimeoutError as FuturesTimeoutError
from typing import Tuple, Optional, List, Dict
from app.config import config
logger = logging.getLogger(__name__)
class MeshCLIError(Exception):
"""Custom exception for meshcli command failures"""
pass
def _get_dm():
"""Get the DeviceManager instance — try Flask app context first, then module global."""
try:
from flask import current_app
dm = getattr(current_app, 'device_manager', None)
if dm is not None:
return dm
except RuntimeError:
pass # Outside of Flask request context
from app.main import device_manager
if device_manager is None:
raise MeshCLIError("DeviceManager not initialized")
return device_manager
def _get_db():
"""Get Database instance — try Flask app context first, then DeviceManager."""
try:
from flask import current_app
db = getattr(current_app, 'db', None)
if db is not None:
return db
except RuntimeError:
pass
try:
dm = _get_dm()
return dm.db
except Exception:
return None
# =============================================================================
# Messages
# =============================================================================
def recv_messages() -> Tuple[bool, str]:
"""
In v2, messages arrive via events (auto-fetching).
This is a no-op — kept for backward compatibility.
"""
return True, "Messages are received automatically via events"
def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Dict:
"""Send a message to a channel. Returns result dict with id and timestamp."""
if reply_to:
text = f"@[{reply_to}] {text}"
try:
dm = _get_dm()
return dm.send_channel_message(channel_index, text)
except Exception as e:
logger.error(f"send_message error: {e}")
return {'success': False, 'error': str(e)}
# =============================================================================
# Contacts
# =============================================================================
def get_contacts() -> Tuple[bool, str]:
"""Get contacts list as formatted text."""
try:
dm = _get_dm()
contacts = dm.get_contacts_from_device()
if not contacts:
return True, "No contacts"
lines = []
for c in contacts:
name = c.get('name', '?')
pk = c.get('public_key', '')[:12]
lines.append(f"{name} {pk}")
return True, "\n".join(lines)
except Exception as e:
logger.error(f"get_contacts error: {e}")
return False, str(e)
def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]:
"""Parse contacts output to extract names. In v2, reads from DB."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
return [c['name'] for c in contacts if c.get('name')]
except Exception:
return []
def get_contacts_list() -> Tuple[bool, List[str], str]:
"""Get parsed list of contact names."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
names = [c['name'] for c in contacts if c.get('name')]
return True, names, ""
except Exception as e:
return False, [], str(e)
def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]:
"""Get detailed list of all contacts from DB."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
result = []
for c in contacts:
pk = c.get('public_key', '')
result.append({
'name': c.get('name', ''),
'public_key_prefix': pk[:12] if len(pk) >= 12 else pk,
'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'),
'path_or_mode': c.get('out_path', '') or 'Flood',
'raw_line': '',
})
return True, result, len(result), ""
except Exception as e:
return False, [], 0, str(e)
def _parse_last_advert(value) -> int:
"""Convert last_advert from DB (Unix timestamp string or ISO string) to Unix int."""
if not value:
return 0
# Try as Unix timestamp first
try:
ts = int(value)
if ts > 0:
return ts
except (ValueError, TypeError):
pass
# Try as ISO datetime string
try:
from datetime import datetime
dt = datetime.fromisoformat(str(value))
return int(dt.timestamp())
except (ValueError, TypeError):
pass
return 0
def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]:
"""Get contacts actually on the device firmware (from mc.contacts).
Refreshes from device if contacts_dirty flag is set (e.g., after
receiving adverts that may carry updated names/paths).
"""
try:
dm = _get_dm()
if not dm.mc:
return True, {}, ""
# Refresh contacts from device if dirty (name changes, path updates, etc.)
if dm.mc.contacts_dirty:
dm.execute(dm.mc.ensure_contacts(follow=True))
dm._sync_contacts_to_db()
if not dm.mc.contacts:
return True, {}, ""
contacts_dict = {}
for pk, contact in dm.mc.contacts.items():
last_adv = contact.get('last_advert')
contacts_dict[pk] = {
'public_key': pk,
'type': contact.get('type', 1),
'flags': contact.get('flags', 0),
'out_path_len': contact.get('out_path_len', -1),
'out_path': contact.get('out_path', ''),
'out_path_hash_mode': contact.get('out_path_hash_mode', 0),
'adv_name': contact.get('adv_name', contact.get('name', '')),
'last_advert': int(last_adv) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else 0,
'adv_lat': contact.get('adv_lat', 0.0),
'adv_lon': contact.get('adv_lon', 0.0),
'lastmod': '',
}
return True, contacts_dict, ""
except Exception as e:
return False, {}, str(e)
def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]:
"""Get all contacts as JSON dict (keyed by public_key)."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
contacts_dict = {}
for c in contacts:
pk = c.get('public_key', '')
contacts_dict[pk] = {
'public_key': pk,
'type': c.get('type', 0),
'adv_name': c.get('name', ''),
'flags': c.get('flags', 0),
'out_path_len': c.get('out_path_len', -1),
'out_path': c.get('out_path', ''),
'last_advert': _parse_last_advert(c.get('last_advert')),
'adv_lat': c.get('adv_lat'),
'adv_lon': c.get('adv_lon'),
'lastmod': c.get('lastmod', ''),
}
return True, contacts_dict, ""
except Exception as e:
return False, {}, str(e)
def delete_contact(selector: str) -> Tuple[bool, str]:
"""Delete a contact by public key or name."""
if not selector or not selector.strip():
return False, "Contact selector is required"
selector = selector.strip()
try:
dm = _get_dm()
# Try as public key first
contact = dm.db.get_contact(selector)
if contact:
result = dm.delete_contact(selector)
return result['success'], result.get('message', result.get('error', ''))
# Try to find by name
contacts = dm.db.get_contacts()
for c in contacts:
if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector):
result = dm.delete_contact(c['public_key'])
return result['success'], result.get('message', result.get('error', ''))
return False, f"Contact not found: {selector}"
except Exception as e:
return False, str(e)
def delete_cached_contact(public_key: str) -> Tuple[bool, str]:
"""Hard-delete a cache-only contact from the database."""
if not public_key or not public_key.strip():
return False, "Public key is required"
try:
dm = _get_dm()
result = dm.delete_cached_contact(public_key.strip())
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]:
"""Remove contacts inactive for specified hours. Simplified in v2."""
# TODO: implement time-based cleanup via database query
return False, "Contact cleanup not yet implemented in v2"
def get_pending_contacts() -> Tuple[bool, List[Dict], str]:
"""Get list of contacts awaiting manual approval."""
try:
dm = _get_dm()
pending = dm.get_pending_contacts()
return True, pending, ""
except Exception as e:
return False, [], str(e)
def approve_pending_contact(public_key: str) -> Tuple[bool, str]:
"""Approve a pending contact."""
try:
dm = _get_dm()
result = dm.approve_contact(public_key)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Device Info
# =============================================================================
def get_device_info() -> Tuple[bool, str]:
"""Get device information."""
try:
dm = _get_dm()
info = dm.get_device_info()
if info:
lines = [f"{k}: {v}" for k, v in info.items()]
return True, "\n".join(lines)
return False, "No device info available"
except Exception as e:
return False, str(e)
def check_connection() -> bool:
"""Check if device is connected."""
try:
dm = _get_dm()
return dm.is_connected
except Exception:
return False
# =============================================================================
# Channels
# =============================================================================
def get_channels() -> Tuple[bool, List[Dict]]:
"""Get list of configured channels.
When the USB device is briefly unresponsive a single get_channel_info()
times out (3 s) and the rest of the slots would too — so we stop hitting
the device and merge whatever we got with the locally cached channels in
the DB. This guarantees the UI shows all channels instead of just Public.
"""
try:
dm = _get_dm()
channels = []
seen_idx = set()
device_partial = False
for idx in range(dm._max_channels):
try:
info = dm.get_channel_info(idx)
except FuturesTimeoutError:
logger.warning(
f"get_channels: device timeout at slot {idx}"
f"falling back to DB for remaining slots"
)
device_partial = True
break
if info and info.get('name'):
channels.append({
'index': idx,
'name': info.get('name', ''),
'key': info.get('secret', info.get('key', '')),
})
seen_idx.add(idx)
# Keep the DB in sync with what the device just told us
try:
secret_hex = info.get('secret', '') or None
dm.db.upsert_channel(idx, info.get('name', ''), secret_hex)
except Exception as e:
logger.debug(f"upsert_channel({idx}) failed: {e}")
if device_partial:
try:
for row in dm.db.get_channels():
db_idx = row.get('idx')
if db_idx is None or db_idx in seen_idx:
continue
name = row.get('name') or ''
if not name:
continue
channels.append({
'index': db_idx,
'name': name,
'key': row.get('secret', '') or '',
})
channels.sort(key=lambda c: c['index'])
logger.info(
f"get_channels: returned {len(channels)} channels "
f"({len(seen_idx)} from device + DB fallback)"
)
except Exception as e:
logger.error(f"get_channels DB fallback failed: {e}")
return True, channels
except Exception as e:
logger.error(f"get_channels error: {e}")
return False, []
def add_channel(name: str) -> Tuple[bool, str, Optional[str]]:
"""Add a new channel."""
try:
dm = _get_dm()
# Find first free slot (1+, slot 0 is Public)
for idx in range(1, dm._max_channels):
info = dm.get_channel_info(idx)
if not info or not info.get('name'):
result = dm.set_channel(idx, name)
if result['success']:
return True, f"Channel '{name}' created at slot {idx}", None
return False, result.get('error', 'Failed'), None
return False, "No free channel slots available", None
except Exception as e:
return False, str(e), None
def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]:
"""Set/join a channel."""
try:
dm = _get_dm()
secret = bytes.fromhex(key) if key else None
result = dm.set_channel(index, name, secret)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def remove_channel(index: int) -> Tuple[bool, str]:
"""Remove a channel."""
if index == 0:
return False, "Cannot remove Public channel (channel 0)"
try:
dm = _get_dm()
result = dm.remove_channel(index)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Advertisement
# =============================================================================
def advert() -> Tuple[bool, str]:
"""Send a single advertisement."""
try:
dm = _get_dm()
result = dm.send_advert(flood=False)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def floodadv() -> Tuple[bool, str]:
"""Send flood advertisement."""
try:
dm = _get_dm()
result = dm.send_advert(flood=True)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Direct Messages
# =============================================================================
def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]:
"""Send a direct message. Returns (success, result_dict)."""
if not recipient or not recipient.strip():
return False, {'error': "Recipient is required"}
if not text or not text.strip():
return False, {'error': "Message text is required"}
try:
dm = _get_dm()
recipient = recipient.strip()
# Try to find contact by name in mc.contacts (in-memory)
contact = None
if dm.mc:
contact = dm.mc.get_contact_by_name(recipient)
if contact:
pubkey = contact.get('public_key', recipient)
elif len(recipient) >= 12 and all(c in '0123456789abcdef' for c in recipient.lower()):
# Looks like a pubkey/prefix already
pubkey = recipient
else:
# Name not in mc.contacts — try DB lookup
db_contact = dm.db.get_contact_by_name(recipient)
if db_contact:
pubkey = db_contact['public_key']
else:
pubkey = recipient
result = dm.send_dm(pubkey, text.strip())
return result['success'], result
except Exception as e:
return False, {'error': str(e)}
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
"""Check delivery status for DMs by ACK codes."""
try:
dm = _get_dm()
ack_status = {}
for code in ack_codes:
ack = dm.db.get_ack_for_dm(code)
ack_status[code] = ack
return True, ack_status, ""
except Exception as e:
return False, {}, str(e)
# =============================================================================
# Device Settings
# =============================================================================
def get_device_settings() -> Tuple[bool, Dict]:
"""Get persistent device settings from database."""
try:
db = _get_db()
manual = db.get_setting_json('manual_add_contacts', False) if db else False
return True, {'manual_add_contacts': manual}
except Exception as e:
logger.error(f"Failed to read device settings: {e}")
return False, {'manual_add_contacts': False}
def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]:
"""Enable/disable manual contact approval."""
try:
dm_inst = _get_dm()
result = dm_inst.set_manual_add_contacts(enabled)
if result['success']:
# Persist to database
db = _get_db()
if db:
db.set_setting_json('manual_add_contacts', enabled)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]:
"""
v2: Get device name from DeviceManager instead of bridge.
Kept for backward compatibility with any code that still calls this.
"""
try:
dm = _get_dm()
if dm.is_connected:
return dm.device_name, "device"
except Exception:
pass
return config.MC_DEVICE_NAME, "fallback"