mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-06-11 17:24:54 +02:00
67c59cc341
PR #3 of 5. Adds POST /api/messages/<msg_id>/resend, which re-broadcasts an own channel message verbatim using the raw_packet bytes captured at send time. Pushes the wire bytes directly through companion command 0x41 (CMD_SEND_RAW_PACKET), bypassing the higher-level send paths so repeaters dedupe by packet hash via Mesh::hasSeen — only previously-unreached nodes will pick up the resend. Returns 404 for unknown msg_id, 400 for not-own / missing snapshot / disconnected device, 500 for unexpected device errors. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
522 lines
17 KiB
Python
522 lines
17 KiB
Python
"""
|
||
MeshCore CLI wrapper — v2: delegates to DeviceManager (no bridge).
|
||
|
||
Function signatures preserved for backward compatibility with api.py.
|
||
"""
|
||
|
||
import logging
|
||
from typing import Tuple, Optional, List, Dict
|
||
from app.config import config
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MeshCLIError(Exception):
|
||
"""Custom exception for meshcli command failures"""
|
||
pass
|
||
|
||
|
||
def _get_dm():
|
||
"""Get the DeviceManager instance — try Flask app context first, then module global."""
|
||
try:
|
||
from flask import current_app
|
||
dm = getattr(current_app, 'device_manager', None)
|
||
if dm is not None:
|
||
return dm
|
||
except RuntimeError:
|
||
pass # Outside of Flask request context
|
||
|
||
from app.main import device_manager
|
||
if device_manager is None:
|
||
raise MeshCLIError("DeviceManager not initialized")
|
||
return device_manager
|
||
|
||
|
||
def _get_db():
|
||
"""Get Database instance — try Flask app context first, then DeviceManager."""
|
||
try:
|
||
from flask import current_app
|
||
db = getattr(current_app, 'db', None)
|
||
if db is not None:
|
||
return db
|
||
except RuntimeError:
|
||
pass
|
||
try:
|
||
dm = _get_dm()
|
||
return dm.db
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
# =============================================================================
|
||
# Messages
|
||
# =============================================================================
|
||
|
||
def recv_messages() -> Tuple[bool, str]:
|
||
"""
|
||
In v2, messages arrive via events (auto-fetching).
|
||
This is a no-op — kept for backward compatibility.
|
||
"""
|
||
return True, "Messages are received automatically via events"
|
||
|
||
|
||
def send_message(text: str, reply_to: Optional[str] = None, channel_index: int = 0) -> Dict:
|
||
"""Send a message to a channel. Returns result dict with id and timestamp."""
|
||
if reply_to:
|
||
text = f"@[{reply_to}] {text}"
|
||
|
||
try:
|
||
dm = _get_dm()
|
||
return dm.send_channel_message(channel_index, text)
|
||
except Exception as e:
|
||
logger.error(f"send_message error: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
|
||
def resend_channel_message(msg_id: int) -> Dict:
|
||
"""Re-broadcast an own channel message verbatim (raw resend with same packet hash)."""
|
||
try:
|
||
dm = _get_dm()
|
||
return dm.resend_channel_message(msg_id)
|
||
except Exception as e:
|
||
logger.error(f"resend_channel_message error: {e}")
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
|
||
# =============================================================================
|
||
# Contacts
|
||
# =============================================================================
|
||
|
||
def get_contacts() -> Tuple[bool, str]:
|
||
"""Get contacts list as formatted text."""
|
||
try:
|
||
dm = _get_dm()
|
||
contacts = dm.get_contacts_from_device()
|
||
if not contacts:
|
||
return True, "No contacts"
|
||
lines = []
|
||
for c in contacts:
|
||
name = c.get('name', '?')
|
||
pk = c.get('public_key', '')[:12]
|
||
lines.append(f"{name} {pk}")
|
||
return True, "\n".join(lines)
|
||
except Exception as e:
|
||
logger.error(f"get_contacts error: {e}")
|
||
return False, str(e)
|
||
|
||
|
||
def parse_contacts(output: str, filter_types: Optional[List[str]] = None) -> List[str]:
|
||
"""Parse contacts output to extract names. In v2, reads from DB."""
|
||
try:
|
||
dm = _get_dm()
|
||
contacts = dm.db.get_contacts()
|
||
return [c['name'] for c in contacts if c.get('name')]
|
||
except Exception:
|
||
return []
|
||
|
||
|
||
def get_contacts_list() -> Tuple[bool, List[str], str]:
|
||
"""Get parsed list of contact names."""
|
||
try:
|
||
dm = _get_dm()
|
||
contacts = dm.db.get_contacts()
|
||
names = [c['name'] for c in contacts if c.get('name')]
|
||
return True, names, ""
|
||
except Exception as e:
|
||
return False, [], str(e)
|
||
|
||
|
||
def get_all_contacts_detailed() -> Tuple[bool, List[Dict], int, str]:
|
||
"""Get detailed list of all contacts from DB."""
|
||
try:
|
||
dm = _get_dm()
|
||
contacts = dm.db.get_contacts()
|
||
result = []
|
||
for c in contacts:
|
||
pk = c.get('public_key', '')
|
||
result.append({
|
||
'name': c.get('name', ''),
|
||
'public_key_prefix': pk[:12] if len(pk) >= 12 else pk,
|
||
'type_label': {0: 'COM', 1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}.get(c.get('type', 1), 'UNKNOWN'),
|
||
'path_or_mode': c.get('out_path', '') or 'Flood',
|
||
'raw_line': '',
|
||
})
|
||
return True, result, len(result), ""
|
||
except Exception as e:
|
||
return False, [], 0, str(e)
|
||
|
||
|
||
def _parse_last_advert(value) -> int:
|
||
"""Convert last_advert from DB (Unix timestamp string or ISO string) to Unix int."""
|
||
if not value:
|
||
return 0
|
||
# Try as Unix timestamp first
|
||
try:
|
||
ts = int(value)
|
||
if ts > 0:
|
||
return ts
|
||
except (ValueError, TypeError):
|
||
pass
|
||
# Try as ISO datetime string
|
||
try:
|
||
from datetime import datetime
|
||
dt = datetime.fromisoformat(str(value))
|
||
return int(dt.timestamp())
|
||
except (ValueError, TypeError):
|
||
pass
|
||
return 0
|
||
|
||
|
||
def get_contacts_with_last_seen() -> Tuple[bool, Dict[str, Dict], str]:
|
||
"""Get contacts actually on the device firmware (from mc.contacts).
|
||
|
||
Refreshes from device if contacts_dirty flag is set (e.g., after
|
||
receiving adverts that may carry updated names/paths).
|
||
"""
|
||
try:
|
||
dm = _get_dm()
|
||
if not dm.mc:
|
||
return True, {}, ""
|
||
|
||
# Refresh contacts from device if dirty (name changes, path updates, etc.)
|
||
if dm.mc.contacts_dirty:
|
||
dm.execute(dm.mc.ensure_contacts(follow=True))
|
||
dm._sync_contacts_to_db()
|
||
|
||
if not dm.mc.contacts:
|
||
return True, {}, ""
|
||
contacts_dict = {}
|
||
for pk, contact in dm.mc.contacts.items():
|
||
last_adv = contact.get('last_advert')
|
||
contacts_dict[pk] = {
|
||
'public_key': pk,
|
||
'type': contact.get('type', 1),
|
||
'flags': contact.get('flags', 0),
|
||
'out_path_len': contact.get('out_path_len', -1),
|
||
'out_path': contact.get('out_path', ''),
|
||
'out_path_hash_mode': contact.get('out_path_hash_mode', 0),
|
||
'adv_name': contact.get('adv_name', contact.get('name', '')),
|
||
'last_advert': int(last_adv) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else 0,
|
||
'adv_lat': contact.get('adv_lat', 0.0),
|
||
'adv_lon': contact.get('adv_lon', 0.0),
|
||
'lastmod': '',
|
||
}
|
||
return True, contacts_dict, ""
|
||
except Exception as e:
|
||
return False, {}, str(e)
|
||
|
||
|
||
def get_contacts_json() -> Tuple[bool, Dict[str, Dict], str]:
|
||
"""Get all contacts as JSON dict (keyed by public_key)."""
|
||
try:
|
||
dm = _get_dm()
|
||
contacts = dm.db.get_contacts()
|
||
contacts_dict = {}
|
||
for c in contacts:
|
||
pk = c.get('public_key', '')
|
||
contacts_dict[pk] = {
|
||
'public_key': pk,
|
||
'type': c.get('type', 0),
|
||
'adv_name': c.get('name', ''),
|
||
'flags': c.get('flags', 0),
|
||
'out_path_len': c.get('out_path_len', -1),
|
||
'out_path': c.get('out_path', ''),
|
||
'last_advert': _parse_last_advert(c.get('last_advert')),
|
||
'adv_lat': c.get('adv_lat'),
|
||
'adv_lon': c.get('adv_lon'),
|
||
'lastmod': c.get('lastmod', ''),
|
||
}
|
||
return True, contacts_dict, ""
|
||
except Exception as e:
|
||
return False, {}, str(e)
|
||
|
||
|
||
def delete_contact(selector: str) -> Tuple[bool, str]:
|
||
"""Delete a contact by public key or name."""
|
||
if not selector or not selector.strip():
|
||
return False, "Contact selector is required"
|
||
|
||
selector = selector.strip()
|
||
|
||
try:
|
||
dm = _get_dm()
|
||
# Try as public key first
|
||
contact = dm.db.get_contact(selector)
|
||
if contact:
|
||
result = dm.delete_contact(selector)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
|
||
# Try to find by name
|
||
contacts = dm.db.get_contacts()
|
||
for c in contacts:
|
||
if c.get('name', '').strip() == selector or c.get('public_key', '').startswith(selector):
|
||
result = dm.delete_contact(c['public_key'])
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
|
||
return False, f"Contact not found: {selector}"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def delete_cached_contact(public_key: str) -> Tuple[bool, str]:
|
||
"""Hard-delete a cache-only contact from the database."""
|
||
if not public_key or not public_key.strip():
|
||
return False, "Public key is required"
|
||
|
||
try:
|
||
dm = _get_dm()
|
||
result = dm.delete_cached_contact(public_key.strip())
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def clean_inactive_contacts(hours: int = 48) -> Tuple[bool, str]:
|
||
"""Remove contacts inactive for specified hours. Simplified in v2."""
|
||
# TODO: implement time-based cleanup via database query
|
||
return False, "Contact cleanup not yet implemented in v2"
|
||
|
||
|
||
def get_pending_contacts() -> Tuple[bool, List[Dict], str]:
|
||
"""Get list of contacts awaiting manual approval."""
|
||
try:
|
||
dm = _get_dm()
|
||
pending = dm.get_pending_contacts()
|
||
return True, pending, ""
|
||
except Exception as e:
|
||
return False, [], str(e)
|
||
|
||
|
||
def approve_pending_contact(public_key: str) -> Tuple[bool, str]:
|
||
"""Approve a pending contact."""
|
||
try:
|
||
dm = _get_dm()
|
||
result = dm.approve_contact(public_key)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
# =============================================================================
|
||
# Device Info
|
||
# =============================================================================
|
||
|
||
def get_device_info() -> Tuple[bool, str]:
|
||
"""Get device information."""
|
||
try:
|
||
dm = _get_dm()
|
||
info = dm.get_device_info()
|
||
if info:
|
||
lines = [f"{k}: {v}" for k, v in info.items()]
|
||
return True, "\n".join(lines)
|
||
return False, "No device info available"
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def check_connection() -> bool:
|
||
"""Check if device is connected."""
|
||
try:
|
||
dm = _get_dm()
|
||
return dm.is_connected
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
# =============================================================================
|
||
# Channels
|
||
# =============================================================================
|
||
|
||
def get_channels() -> Tuple[bool, List[Dict]]:
|
||
"""Get list of configured channels from the local cache.
|
||
|
||
Reads from the channels table in the DB, which is kept in sync with the
|
||
device by:
|
||
- _load_channel_secrets() on every connect (also prunes stale rows),
|
||
- set_channel() / remove_channel() updating the DB synchronously,
|
||
- _refresh_channel_secret() refreshing single rows on per-send refresh.
|
||
|
||
Iterating device slots here used to cost 40 × USB round-trips on every
|
||
/api/channels hit, and returned only "Public" the moment the firmware
|
||
stalled mid-iteration (other slots came back empty without raising any
|
||
timeout exception). The DB is the authoritative cache — use it.
|
||
"""
|
||
try:
|
||
dm = _get_dm()
|
||
rows = dm.db.get_channels()
|
||
channels = [
|
||
{
|
||
'index': row['idx'],
|
||
'name': row['name'] or '',
|
||
'key': row['secret'] or '',
|
||
}
|
||
for row in rows
|
||
if row.get('name')
|
||
]
|
||
channels.sort(key=lambda c: c['index'])
|
||
return True, channels
|
||
except Exception as e:
|
||
logger.error(f"get_channels error: {e}")
|
||
return False, []
|
||
|
||
|
||
def add_channel(name: str) -> Tuple[bool, str, Optional[str]]:
|
||
"""Add a new channel."""
|
||
try:
|
||
dm = _get_dm()
|
||
# Find first free slot (1+, slot 0 is Public)
|
||
for idx in range(1, dm._max_channels):
|
||
info = dm.get_channel_info(idx)
|
||
if not info or not info.get('name'):
|
||
result = dm.set_channel(idx, name)
|
||
if result['success']:
|
||
return True, f"Channel '{name}' created at slot {idx}", None
|
||
return False, result.get('error', 'Failed'), None
|
||
return False, "No free channel slots available", None
|
||
except Exception as e:
|
||
return False, str(e), None
|
||
|
||
|
||
def set_channel(index: int, name: str, key: Optional[str] = None) -> Tuple[bool, str]:
|
||
"""Set/join a channel."""
|
||
try:
|
||
dm = _get_dm()
|
||
secret = bytes.fromhex(key) if key else None
|
||
result = dm.set_channel(index, name, secret)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def remove_channel(index: int) -> Tuple[bool, str]:
|
||
"""Remove a channel."""
|
||
if index == 0:
|
||
return False, "Cannot remove Public channel (channel 0)"
|
||
|
||
try:
|
||
dm = _get_dm()
|
||
result = dm.remove_channel(index)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
# =============================================================================
|
||
# Advertisement
|
||
# =============================================================================
|
||
|
||
def advert() -> Tuple[bool, str]:
|
||
"""Send a single advertisement."""
|
||
try:
|
||
dm = _get_dm()
|
||
result = dm.send_advert(flood=False)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def floodadv() -> Tuple[bool, str]:
|
||
"""Send flood advertisement."""
|
||
try:
|
||
dm = _get_dm()
|
||
result = dm.send_advert(flood=True)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
# =============================================================================
|
||
# Direct Messages
|
||
# =============================================================================
|
||
|
||
def send_dm(recipient: str, text: str) -> Tuple[bool, Dict]:
|
||
"""Send a direct message. Returns (success, result_dict)."""
|
||
if not recipient or not recipient.strip():
|
||
return False, {'error': "Recipient is required"}
|
||
if not text or not text.strip():
|
||
return False, {'error': "Message text is required"}
|
||
|
||
try:
|
||
dm = _get_dm()
|
||
recipient = recipient.strip()
|
||
|
||
# Try to find contact by name in mc.contacts (in-memory)
|
||
contact = None
|
||
if dm.mc:
|
||
contact = dm.mc.get_contact_by_name(recipient)
|
||
|
||
if contact:
|
||
pubkey = contact.get('public_key', recipient)
|
||
elif len(recipient) >= 12 and all(c in '0123456789abcdef' for c in recipient.lower()):
|
||
# Looks like a pubkey/prefix already
|
||
pubkey = recipient
|
||
else:
|
||
# Name not in mc.contacts — try DB lookup
|
||
db_contact = dm.db.get_contact_by_name(recipient)
|
||
if db_contact:
|
||
pubkey = db_contact['public_key']
|
||
else:
|
||
pubkey = recipient
|
||
|
||
result = dm.send_dm(pubkey, text.strip())
|
||
return result['success'], result
|
||
except Exception as e:
|
||
return False, {'error': str(e)}
|
||
|
||
|
||
def check_dm_delivery(ack_codes: list) -> Tuple[bool, Dict, str]:
|
||
"""Check delivery status for DMs by ACK codes."""
|
||
try:
|
||
dm = _get_dm()
|
||
ack_status = {}
|
||
for code in ack_codes:
|
||
ack = dm.db.get_ack_for_dm(code)
|
||
ack_status[code] = ack
|
||
return True, ack_status, ""
|
||
except Exception as e:
|
||
return False, {}, str(e)
|
||
|
||
|
||
# =============================================================================
|
||
# Device Settings
|
||
# =============================================================================
|
||
|
||
def get_device_settings() -> Tuple[bool, Dict]:
|
||
"""Get persistent device settings from database."""
|
||
try:
|
||
db = _get_db()
|
||
manual = db.get_setting_json('manual_add_contacts', False) if db else False
|
||
return True, {'manual_add_contacts': manual}
|
||
except Exception as e:
|
||
logger.error(f"Failed to read device settings: {e}")
|
||
return False, {'manual_add_contacts': False}
|
||
|
||
|
||
def set_manual_add_contacts(enabled: bool) -> Tuple[bool, str]:
|
||
"""Enable/disable manual contact approval."""
|
||
try:
|
||
dm_inst = _get_dm()
|
||
result = dm_inst.set_manual_add_contacts(enabled)
|
||
if result['success']:
|
||
# Persist to database
|
||
db = _get_db()
|
||
if db:
|
||
db.set_setting_json('manual_add_contacts', enabled)
|
||
return result['success'], result.get('message', result.get('error', ''))
|
||
except Exception as e:
|
||
return False, str(e)
|
||
|
||
|
||
def fetch_device_name_from_bridge(max_retries: int = 3, retry_delay: float = 2.0) -> Tuple[Optional[str], str]:
|
||
"""
|
||
v2: Get device name from DeviceManager instead of bridge.
|
||
Kept for backward compatibility with any code that still calls this.
|
||
"""
|
||
try:
|
||
dm = _get_dm()
|
||
if dm.is_connected:
|
||
return dm.device_name, "device"
|
||
except Exception:
|
||
pass
|
||
return config.MC_DEVICE_NAME, "fallback"
|