Files
mc-webui/app/meshcore/cli.py
T
MarekWo 67c59cc341 feat(channels): backend resend endpoint via CMD_SEND_RAW_PACKET
PR #3 of 5. Adds POST /api/messages/<msg_id>/resend, which re-broadcasts an
own channel message verbatim using the raw_packet bytes captured at send
time. Pushes the wire bytes directly through companion command 0x41
(CMD_SEND_RAW_PACKET), bypassing the higher-level send paths so repeaters
dedupe by packet hash via Mesh::hasSeen — only previously-unreached nodes
will pick up the resend.

Returns 404 for unknown msg_id, 400 for not-own / missing snapshot /
disconnected device, 500 for unexpected device errors.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-09 12:39:34 +02:00

522 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge).
Function signatures preserved for backward compatibility with api.py.
"""
import logging
from typing import Tuple, Optional, List, Dict
from app.config import config
logger = logging.getLogger(__name__)
class MeshCLIError(Exception):
"""Custom exception for meshcli command failures"""
pass
def _get_dm():
"""Get the DeviceManager instance — try Flask app context first, then module global."""
try:
from flask import current_app
dm = getattr(current_app, 'device_manager', None)
if dm is not None:
return dm
except RuntimeError:
pass # Outside of Flask request context
from app.main import device_manager
if device_manager is None:
raise MeshCLIError("DeviceManager not initialized")
return device_manager
def _get_db():
"""Get Database instance — try Flask app context first, then DeviceManager."""
try:
from flask import current_app
db = getattr(current_app, 'db', None)
if db is not None:
return db
except RuntimeError:
pass
try:
dm = _get_dm()
return dm.db
except Exception:
return None
# =============================================================================
# Messages
# =============================================================================
def recv_messages() -> Tuple[bool, str]:
"""
In v2, messages arrive via events (auto-fetching).
This is a no-op — kept for backward compatibility.
"""
return True, "Messages are received automatically via events"
def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Dict:
"""Send a message to a channel. Returns result dict with id and timestamp."""
if reply_to:
text = f"@[{reply_to}] {text}"
try:
dm = _get_dm()
return dm.send_channel_message(channel_index, text)
except Exception as e:
logger.error(f"send_message error: {e}")
return {'success': False, 'error': str(e)}
def resend_channel_message(msg_id: int) -> Dict:
"""Re-broadcast an own channel message verbatim (raw resend with same packet hash)."""
try:
dm = _get_dm()
return dm.resend_channel_message(msg_id)
except Exception as e:
logger.error(f"resend_channel_message error: {e}")
return {'success': False, 'error': str(e)}
# =============================================================================
# Contacts
# =============================================================================
def get_contacts() -> Tuple[bool, str]:
"""Get contacts list as formatted text."""
try:
dm = _get_dm()
contacts = dm.get_contacts_from_device()
if not contacts:
return True, "No contacts"
lines = []
for c in contacts:
name = c.get('name', '?')
pk = c.get('public_key', '')[:12]
lines.append(f"{name} {pk}")
return True, "\n".join(lines)
except Exception as e:
logger.error(f"get_contacts error: {e}")
return False, str(e)
def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]:
"""Parse contacts output to extract names. In v2, reads from DB."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
return [c['name'] for c in contacts if c.get('name')]
except Exception:
return []
def get_contacts_list() -> Tuple[bool, List[str], str]:
"""Get parsed list of contact names."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
names = [c['name'] for c in contacts if c.get('name')]
return True, names, ""
except Exception as e:
return False, [], str(e)
def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]:
"""Get detailed list of all contacts from DB."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
result = []
for c in contacts:
pk = c.get('public_key', '')
result.append({
'name': c.get('name', ''),
'public_key_prefix': pk[:12] if len(pk) >= 12 else pk,
'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'),
'path_or_mode': c.get('out_path', '') or 'Flood',
'raw_line': '',
})
return True, result, len(result), ""
except Exception as e:
return False, [], 0, str(e)
def _parse_last_advert(value) -> int:
"""Convert last_advert from DB (Unix timestamp string or ISO string) to Unix int."""
if not value:
return 0
# Try as Unix timestamp first
try:
ts = int(value)
if ts > 0:
return ts
except (ValueError, TypeError):
pass
# Try as ISO datetime string
try:
from datetime import datetime
dt = datetime.fromisoformat(str(value))
return int(dt.timestamp())
except (ValueError, TypeError):
pass
return 0
def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]:
"""Get contacts actually on the device firmware (from mc.contacts).
Refreshes from device if contacts_dirty flag is set (e.g., after
receiving adverts that may carry updated names/paths).
"""
try:
dm = _get_dm()
if not dm.mc:
return True, {}, ""
# Refresh contacts from device if dirty (name changes, path updates, etc.)
if dm.mc.contacts_dirty:
dm.execute(dm.mc.ensure_contacts(follow=True))
dm._sync_contacts_to_db()
if not dm.mc.contacts:
return True, {}, ""
contacts_dict = {}
for pk, contact in dm.mc.contacts.items():
last_adv = contact.get('last_advert')
contacts_dict[pk] = {
'public_key': pk,
'type': contact.get('type', 1),
'flags': contact.get('flags', 0),
'out_path_len': contact.get('out_path_len', -1),
'out_path': contact.get('out_path', ''),
'out_path_hash_mode': contact.get('out_path_hash_mode', 0),
'adv_name': contact.get('adv_name', contact.get('name', '')),
'last_advert': int(last_adv) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else 0,
'adv_lat': contact.get('adv_lat', 0.0),
'adv_lon': contact.get('adv_lon', 0.0),
'lastmod': '',
}
return True, contacts_dict, ""
except Exception as e:
return False, {}, str(e)
def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]:
"""Get all contacts as JSON dict (keyed by public_key)."""
try:
dm = _get_dm()
contacts = dm.db.get_contacts()
contacts_dict = {}
for c in contacts:
pk = c.get('public_key', '')
contacts_dict[pk] = {
'public_key': pk,
'type': c.get('type', 0),
'adv_name': c.get('name', ''),
'flags': c.get('flags', 0),
'out_path_len': c.get('out_path_len', -1),
'out_path': c.get('out_path', ''),
'last_advert': _parse_last_advert(c.get('last_advert')),
'adv_lat': c.get('adv_lat'),
'adv_lon': c.get('adv_lon'),
'lastmod': c.get('lastmod', ''),
}
return True, contacts_dict, ""
except Exception as e:
return False, {}, str(e)
def delete_contact(selector: str) -> Tuple[bool, str]:
"""Delete a contact by public key or name."""
if not selector or not selector.strip():
return False, "Contact selector is required"
selector = selector.strip()
try:
dm = _get_dm()
# Try as public key first
contact = dm.db.get_contact(selector)
if contact:
result = dm.delete_contact(selector)
return result['success'], result.get('message', result.get('error', ''))
# Try to find by name
contacts = dm.db.get_contacts()
for c in contacts:
if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector):
result = dm.delete_contact(c['public_key'])
return result['success'], result.get('message', result.get('error', ''))
return False, f"Contact not found: {selector}"
except Exception as e:
return False, str(e)
def delete_cached_contact(public_key: str) -> Tuple[bool, str]:
"""Hard-delete a cache-only contact from the database."""
if not public_key or not public_key.strip():
return False, "Public key is required"
try:
dm = _get_dm()
result = dm.delete_cached_contact(public_key.strip())
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]:
"""Remove contacts inactive for specified hours. Simplified in v2."""
# TODO: implement time-based cleanup via database query
return False, "Contact cleanup not yet implemented in v2"
def get_pending_contacts() -> Tuple[bool, List[Dict], str]:
"""Get list of contacts awaiting manual approval."""
try:
dm = _get_dm()
pending = dm.get_pending_contacts()
return True, pending, ""
except Exception as e:
return False, [], str(e)
def approve_pending_contact(public_key: str) -> Tuple[bool, str]:
"""Approve a pending contact."""
try:
dm = _get_dm()
result = dm.approve_contact(public_key)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Device Info
# =============================================================================
def get_device_info() -> Tuple[bool, str]:
"""Get device information."""
try:
dm = _get_dm()
info = dm.get_device_info()
if info:
lines = [f"{k}: {v}" for k, v in info.items()]
return True, "\n".join(lines)
return False, "No device info available"
except Exception as e:
return False, str(e)
def check_connection() -> bool:
"""Check if device is connected."""
try:
dm = _get_dm()
return dm.is_connected
except Exception:
return False
# =============================================================================
# Channels
# =============================================================================
def get_channels() -> Tuple[bool, List[Dict]]:
"""Get list of configured channels from the local cache.
Reads from the channels table in the DB, which is kept in sync with the
device by:
- _load_channel_secrets() on every connect (also prunes stale rows),
- set_channel() / remove_channel() updating the DB synchronously,
- _refresh_channel_secret() refreshing single rows on per-send refresh.
Iterating device slots here used to cost 40 × USB round-trips on every
/api/channels hit, and returned only "Public" the moment the firmware
stalled mid-iteration (other slots came back empty without raising any
timeout exception). The DB is the authoritative cache — use it.
"""
try:
dm = _get_dm()
rows = dm.db.get_channels()
channels = [
{
'index': row['idx'],
'name': row['name'] or '',
'key': row['secret'] or '',
}
for row in rows
if row.get('name')
]
channels.sort(key=lambda c: c['index'])
return True, channels
except Exception as e:
logger.error(f"get_channels error: {e}")
return False, []
def add_channel(name: str) -> Tuple[bool, str, Optional[str]]:
"""Add a new channel."""
try:
dm = _get_dm()
# Find first free slot (1+, slot 0 is Public)
for idx in range(1, dm._max_channels):
info = dm.get_channel_info(idx)
if not info or not info.get('name'):
result = dm.set_channel(idx, name)
if result['success']:
return True, f"Channel '{name}' created at slot {idx}", None
return False, result.get('error', 'Failed'), None
return False, "No free channel slots available", None
except Exception as e:
return False, str(e), None
def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]:
"""Set/join a channel."""
try:
dm = _get_dm()
secret = bytes.fromhex(key) if key else None
result = dm.set_channel(index, name, secret)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def remove_channel(index: int) -> Tuple[bool, str]:
"""Remove a channel."""
if index == 0:
return False, "Cannot remove Public channel (channel 0)"
try:
dm = _get_dm()
result = dm.remove_channel(index)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Advertisement
# =============================================================================
def advert() -> Tuple[bool, str]:
"""Send a single advertisement."""
try:
dm = _get_dm()
result = dm.send_advert(flood=False)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def floodadv() -> Tuple[bool, str]:
"""Send flood advertisement."""
try:
dm = _get_dm()
result = dm.send_advert(flood=True)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
# =============================================================================
# Direct Messages
# =============================================================================
def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]:
"""Send a direct message. Returns (success, result_dict)."""
if not recipient or not recipient.strip():
return False, {'error': "Recipient is required"}
if not text or not text.strip():
return False, {'error': "Message text is required"}
try:
dm = _get_dm()
recipient = recipient.strip()
# Try to find contact by name in mc.contacts (in-memory)
contact = None
if dm.mc:
contact = dm.mc.get_contact_by_name(recipient)
if contact:
pubkey = contact.get('public_key', recipient)
elif len(recipient) >= 12 and all(c in '0123456789abcdef' for c in recipient.lower()):
# Looks like a pubkey/prefix already
pubkey = recipient
else:
# Name not in mc.contacts — try DB lookup
db_contact = dm.db.get_contact_by_name(recipient)
if db_contact:
pubkey = db_contact['public_key']
else:
pubkey = recipient
result = dm.send_dm(pubkey, text.strip())
return result['success'], result
except Exception as e:
return False, {'error': str(e)}
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
"""Check delivery status for DMs by ACK codes."""
try:
dm = _get_dm()
ack_status = {}
for code in ack_codes:
ack = dm.db.get_ack_for_dm(code)
ack_status[code] = ack
return True, ack_status, ""
except Exception as e:
return False, {}, str(e)
# =============================================================================
# Device Settings
# =============================================================================
def get_device_settings() -> Tuple[bool, Dict]:
"""Get persistent device settings from database."""
try:
db = _get_db()
manual = db.get_setting_json('manual_add_contacts', False) if db else False
return True, {'manual_add_contacts': manual}
except Exception as e:
logger.error(f"Failed to read device settings: {e}")
return False, {'manual_add_contacts': False}
def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]:
"""Enable/disable manual contact approval."""
try:
dm_inst = _get_dm()
result = dm_inst.set_manual_add_contacts(enabled)
if result['success']:
# Persist to database
db = _get_db()
if db:
db.set_setting_json('manual_add_contacts', enabled)
return result['success'], result.get('message', result.get('error', ''))
except Exception as e:
return False, str(e)
def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]:
"""
v2: Get device name from DeviceManager instead of bridge.
Kept for backward compatibility with any code that still calls this.
"""
try:
dm = _get_dm()
if dm.is_connected:
return dm.device_name, "device"
except Exception:
pass
return config.MC_DEVICE_NAME, "fallback"