Files
mc-webui/app/device_manager.py
MarekWo e817181261 fix(websocket): include SNR, hops, route and analyzer URL in channel message events
The SocketIO new_message emit for channel messages was missing snr,
path_len, pkt_payload and analyzer_url fields, causing messages received
via WebSocket to render without metadata until a full page refresh.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 12:12:41 +01:00

1435 lines
57 KiB
Python

"""
DeviceManager — manages MeshCore device connection for mc-webui v2.
Runs the meshcore async event loop in a dedicated background thread.
Flask routes call sync command methods that bridge to the async loop.
Event handlers capture incoming data and write to Database + emit SocketIO.
"""
import asyncio
import hashlib
import json
import logging
import threading
import time
from typing import Optional, Any, Dict, List
ANALYZER_BASE_URL = 'https://analyzer.letsmesh.net/packets?packet_hash='
GRP_TXT_TYPE_BYTE = 0x05
logger = logging.getLogger(__name__)
def _to_str(val) -> str:
"""Convert bytes or other types to string. Used for expected_ack, pkt_payload, etc."""
if val is None:
return ''
if isinstance(val, bytes):
return val.hex()
return str(val)
class DeviceManager:
"""
Manages MeshCore device connection.
Usage:
dm = DeviceManager(config, db, socketio)
dm.start() # spawns background thread, connects to device
...
dm.stop() # disconnect and stop background thread
"""
def __init__(self, config, db, socketio=None):
self.config = config
self.db = db
self.socketio = socketio
self.mc = None # meshcore.MeshCore instance
self._loop = None # asyncio event loop (in background thread)
self._thread = None # background thread
self._connected = False
self._device_name = None
self._self_info = None
self._subscriptions = [] # active event subscriptions
self._channel_secrets = {} # {channel_idx: secret_hex} for pkt_payload
self._max_channels = 8 # updated from device_info at connect
self._pending_echo = None # {'timestamp': float, 'channel_idx': int, 'msg_id': int, 'pkt_payload': str|None}
self._echo_lock = threading.Lock()
self._pending_acks = {} # {ack_code_hex: dm_id} — maps retry acks to DM
self._retry_tasks = {} # {dm_id: asyncio.Task} — active retry coroutines
@property
def is_connected(self) -> bool:
return self._connected and self.mc is not None
@property
def device_name(self) -> str:
return self._device_name or self.config.MC_DEVICE_NAME
@property
def self_info(self) -> Optional[dict]:
return self._self_info
# ================================================================
# Lifecycle
# ================================================================
def start(self):
"""Start the device manager background thread and connect."""
if self._thread and self._thread.is_alive():
logger.warning("DeviceManager already running")
return
self._loop = asyncio.new_event_loop()
self._thread = threading.Thread(
target=self._run_loop, daemon=True, name="device-manager"
)
self._thread.start()
logger.info("DeviceManager background thread started")
def _run_loop(self):
"""Run the async event loop in the background thread."""
asyncio.set_event_loop(self._loop)
self._loop.run_until_complete(self._connect_with_retry())
self._loop.run_forever()
async def _connect_with_retry(self, max_retries: int = 10, base_delay: float = 5.0):
"""Try to connect to device, retrying on failure."""
for attempt in range(1, max_retries + 1):
try:
await self._connect()
if self._connected:
return # success
except Exception as e:
logger.error(f"Connection attempt {attempt}/{max_retries} failed: {e}")
if attempt < max_retries:
delay = min(base_delay * attempt, 30.0)
logger.info(f"Retrying in {delay:.0f}s...")
await asyncio.sleep(delay)
logger.error(f"Failed to connect after {max_retries} attempts")
def _detect_serial_port(self) -> str:
"""Auto-detect serial port when configured as 'auto'."""
port = self.config.MC_SERIAL_PORT
if port.lower() != 'auto':
return port
from pathlib import Path
by_id = Path('/dev/serial/by-id')
if by_id.exists():
devices = list(by_id.iterdir())
if len(devices) == 1:
resolved = str(devices[0].resolve())
logger.info(f"Auto-detected serial port: {resolved}")
return resolved
elif len(devices) > 1:
logger.warning(f"Multiple serial devices found: {[d.name for d in devices]}")
else:
logger.warning("No serial devices found in /dev/serial/by-id")
# Fallback: try common paths
for candidate in ['/dev/ttyUSB0', '/dev/ttyACM0', '/dev/ttyUSB1', '/dev/ttyACM1']:
if Path(candidate).exists():
logger.info(f"Auto-detected serial port (fallback): {candidate}")
return candidate
raise RuntimeError("No serial port detected. Set MC_SERIAL_PORT explicitly.")
async def _connect(self):
"""Connect to device via serial or TCP and subscribe to events."""
from meshcore import MeshCore
try:
if self.config.use_tcp:
logger.info(f"Connecting via TCP: {self.config.MC_TCP_HOST}:{self.config.MC_TCP_PORT}")
self.mc = await MeshCore.create_tcp(
host=self.config.MC_TCP_HOST,
port=self.config.MC_TCP_PORT,
auto_reconnect=self.config.MC_AUTO_RECONNECT,
)
else:
port = self._detect_serial_port()
logger.info(f"Connecting via serial: {port}")
self.mc = await MeshCore.create_serial(
port=port,
auto_reconnect=self.config.MC_AUTO_RECONNECT,
)
# Read device info
self._self_info = getattr(self.mc, 'self_info', None)
if not self._self_info:
logger.error("Device connected but self_info is empty — device may not be responding")
self.mc = None
return
self._device_name = self._self_info.get('name', self.config.MC_DEVICE_NAME)
self._connected = True
# Store device info in database
self.db.set_device_info(
public_key=self._self_info.get('public_key', ''),
name=self._device_name,
self_info=json.dumps(self._self_info, default=str)
)
# Fetch device_info for max_channels
try:
dev_info_event = await self.mc.commands.send_device_query()
if dev_info_event and hasattr(dev_info_event, 'payload'):
dev_info = dev_info_event.payload or {}
self._max_channels = dev_info.get('max_channels', 8)
logger.info(f"Device max_channels: {self._max_channels}")
except Exception as e:
logger.warning(f"Could not fetch device_info: {e}")
# Workaround: meshcore lib 2.2.21 has a bug where list.extend()
# return value (None) corrupts reader.channels for idx >= 20.
# Pre-allocate the channels list to max_channels to avoid this.
reader = getattr(self.mc, '_reader', None)
if reader and hasattr(reader, 'channels'):
current = reader.channels or []
if len(current) < self._max_channels:
reader.channels = current + [{} for _ in range(self._max_channels - len(current))]
logger.debug(f"Pre-allocated reader.channels to {len(reader.channels)} slots")
logger.info(f"Connected to device: {self._device_name} "
f"(key: {self._self_info.get('public_key', '?')[:8]}...)")
# Subscribe to events
await self._subscribe_events()
# Enable auto-refresh of contacts on adverts/path updates
# Keep auto_update_contacts OFF to avoid serial blocking on every
# ADVERTISEMENT event (324 contacts = several seconds of serial I/O).
# We sync contacts at startup and handle NEW_CONTACT events individually.
self.mc.auto_update_contacts = False
# Fetch initial contacts from device
await self.mc.ensure_contacts()
self._sync_contacts_to_db()
# Cache channel secrets for pkt_payload computation
await self._load_channel_secrets()
# Start auto message fetching (events fire on new messages)
await self.mc.start_auto_message_fetching()
except Exception as e:
logger.error(f"Device connection failed: {e}")
self._connected = False
async def _load_channel_secrets(self):
"""Load channel secrets from device for pkt_payload computation."""
consecutive_empty = 0
try:
for idx in range(self._max_channels):
try:
event = await self.mc.commands.get_channel(idx)
except Exception:
consecutive_empty += 1
if consecutive_empty >= 3:
break # likely past last configured channel
continue
if event:
data = getattr(event, 'payload', None) or {}
secret = data.get('channel_secret', data.get('secret', b''))
if isinstance(secret, bytes):
secret = secret.hex()
if secret and len(secret) == 32:
self._channel_secrets[idx] = secret
consecutive_empty = 0
else:
consecutive_empty += 1
else:
consecutive_empty += 1
if consecutive_empty >= 3:
break # stop after 3 consecutive empty channels
logger.info(f"Cached {len(self._channel_secrets)} channel secrets")
except Exception as e:
logger.error(f"Failed to load channel secrets: {e}")
async def _subscribe_events(self):
"""Subscribe to all relevant device events."""
from meshcore.events import EventType
handlers = [
(EventType.CHANNEL_MSG_RECV, self._on_channel_message),
(EventType.CONTACT_MSG_RECV, self._on_dm_received),
(EventType.MSG_SENT, self._on_msg_sent),
(EventType.ACK, self._on_ack),
(EventType.ADVERTISEMENT, self._on_advertisement),
(EventType.PATH_UPDATE, self._on_path_update),
(EventType.NEW_CONTACT, self._on_new_contact),
(EventType.RX_LOG_DATA, self._on_rx_log_data),
(EventType.DISCONNECTED, self._on_disconnected),
]
for event_type, handler in handlers:
sub = self.mc.subscribe(event_type, handler)
self._subscriptions.append(sub)
logger.debug(f"Subscribed to {event_type.value}")
def _sync_contacts_to_db(self):
"""Sync device contacts to database (bidirectional).
- Upserts device contacts with source='device'
- Downgrades DB contacts marked 'device' that are no longer on device to 'advert'
"""
if not self.mc or not self.mc.contacts:
return
device_keys = set()
for pubkey, contact in self.mc.contacts.items():
# last_advert from meshcore is Unix timestamp (int) or None
last_adv = contact.get('last_advert')
last_advert_val = str(int(last_adv)) if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0 else None
self.db.upsert_contact(
public_key=pubkey,
name=contact.get('adv_name', ''),
type=contact.get('type', 0),
flags=contact.get('flags', 0),
out_path=contact.get('out_path', ''),
out_path_len=contact.get('out_path_len', 0),
last_advert=last_advert_val,
adv_lat=contact.get('adv_lat'),
adv_lon=contact.get('adv_lon'),
source='device',
)
device_keys.add(pubkey.lower())
# Downgrade stale 'device' contacts to 'advert' (cache-only)
stale = self.db.downgrade_stale_device_contacts(device_keys)
if stale:
logger.info(f"Downgraded {stale} stale device contacts to cache")
logger.info(f"Synced {len(device_keys)} contacts from device to database")
def execute(self, coro, timeout: float = 30) -> Any:
"""
Execute an async coroutine from sync Flask context.
Blocks until the coroutine completes and returns the result.
"""
if not self._loop or not self._loop.is_running():
raise RuntimeError("DeviceManager event loop not running")
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
return future.result(timeout=timeout)
def stop(self):
"""Disconnect from device and stop the background thread."""
logger.info("Stopping DeviceManager...")
if self.mc and self._loop and self._loop.is_running():
try:
future = asyncio.run_coroutine_threadsafe(
self.mc.disconnect(), self._loop
)
future.result(timeout=5)
except Exception as e:
logger.warning(f"Error during disconnect: {e}")
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
if self._thread:
self._thread.join(timeout=5)
self._connected = False
self.mc = None
self._subscriptions.clear()
logger.info("DeviceManager stopped")
# ================================================================
# Event Handlers (async — run in device manager thread)
# ================================================================
async def _on_channel_message(self, event):
"""Handle incoming channel message."""
try:
data = getattr(event, 'payload', {})
ts = data.get('timestamp', int(time.time()))
raw_text = data.get('text', '')
channel_idx = data.get('channel_idx', 0)
# Parse sender from "SenderName: message" format
if ':' in raw_text:
sender, content = raw_text.split(':', 1)
sender = sender.strip()
content = content.strip()
else:
sender = 'Unknown'
content = raw_text
# Check if sender is blocked (store but don't emit)
blocked_names = self.db.get_blocked_contact_names()
is_blocked = sender in blocked_names
msg_id = self.db.insert_channel_message(
channel_idx=channel_idx,
sender=sender,
content=content,
timestamp=ts,
sender_timestamp=data.get('sender_timestamp'),
snr=data.get('SNR', data.get('snr')),
path_len=data.get('path_len'),
pkt_payload=data.get('pkt_payload'),
raw_json=json.dumps(data, default=str),
)
logger.info(f"Channel msg #{msg_id} from {sender} on ch{channel_idx}")
if is_blocked:
logger.debug(f"Blocked channel msg from {sender}, stored but not emitted")
return
if self.socketio:
snr = data.get('SNR', data.get('snr'))
path_len = data.get('path_len')
pkt_payload = data.get('pkt_payload')
# Compute analyzer URL from pkt_payload
analyzer_url = None
if pkt_payload:
try:
raw = bytes([GRP_TXT_TYPE_BYTE]) + bytes.fromhex(pkt_payload)
packet_hash = hashlib.sha256(raw).hexdigest()[:16].upper()
analyzer_url = f"{ANALYZER_BASE_URL}{packet_hash}"
except (ValueError, TypeError):
pass
self.socketio.emit('new_message', {
'type': 'channel',
'channel_idx': channel_idx,
'sender': sender,
'content': content,
'timestamp': ts,
'id': msg_id,
'snr': snr,
'path_len': path_len,
'pkt_payload': pkt_payload,
'analyzer_url': analyzer_url,
}, namespace='/chat')
logger.debug(f"SocketIO emitted new_message for ch{channel_idx} msg #{msg_id}")
except Exception as e:
logger.error(f"Error handling channel message: {e}")
async def _on_dm_received(self, event):
"""Handle incoming direct message."""
try:
data = getattr(event, 'payload', {})
ts = data.get('timestamp', int(time.time()))
content = data.get('text', '')
sender_key = data.get('public_key', data.get('pubkey_prefix', ''))
# Look up sender from contacts — resolve prefix to full public key
sender_name = ''
if sender_key and self.mc:
contact = self.mc.get_contact_by_key_prefix(sender_key)
if contact:
sender_name = contact.get('name', '')
full_key = contact.get('public_key', '')
if full_key:
sender_key = full_key
elif len(sender_key) < 64:
# Prefix not resolved from in-memory contacts — try DB
db_contact = self.db.get_contact_by_prefix(sender_key)
if db_contact and len(db_contact['public_key']) == 64:
sender_key = db_contact['public_key']
sender_name = db_contact.get('name', '')
# Receiver-side dedup: skip duplicate retries
sender_ts = data.get('sender_timestamp')
if sender_key and content:
if sender_ts:
existing = self.db.find_dm_duplicate(sender_key, content,
sender_timestamp=sender_ts)
else:
existing = self.db.find_dm_duplicate(sender_key, content,
window_seconds=300)
if existing:
logger.info(f"DM dedup: skipping retry from {sender_key[:8]}...")
return
# Check if sender is blocked
is_blocked = sender_key and self.db.is_contact_blocked(sender_key)
if sender_key:
# Only upsert with name if we have a real name (not just a prefix)
self.db.upsert_contact(
public_key=sender_key,
name=sender_name, # empty string won't overwrite existing name
source='message',
)
dm_id = self.db.insert_direct_message(
contact_pubkey=sender_key,
direction='in',
content=content,
timestamp=ts,
sender_timestamp=data.get('sender_timestamp'),
snr=data.get('SNR', data.get('snr')),
path_len=data.get('path_len'),
pkt_payload=data.get('pkt_payload'),
raw_json=json.dumps(data, default=str),
)
logger.info(f"DM #{dm_id} from {sender_name or sender_key[:12]}")
if is_blocked:
logger.debug(f"Blocked DM from {sender_key[:12]}, stored but not emitted")
return
if self.socketio:
self.socketio.emit('new_message', {
'type': 'dm',
'contact_pubkey': sender_key,
'sender': sender_name or sender_key[:12],
'content': content,
'timestamp': ts,
'id': dm_id,
}, namespace='/chat')
except Exception as e:
logger.error(f"Error handling DM: {e}")
async def _on_msg_sent(self, event):
"""Handle confirmation that our message was sent."""
try:
data = getattr(event, 'payload', {})
expected_ack = _to_str(data.get('expected_ack'))
msg_type = data.get('txt_type', 0)
# txt_type 0 = DM, 1 = channel
if msg_type == 0 and expected_ack:
# DM sent confirmation — store expected_ack for delivery tracking
logger.debug(f"DM sent, expected_ack={expected_ack}")
except Exception as e:
logger.error(f"Error handling msg_sent: {e}")
async def _on_ack(self, event):
"""Handle ACK (delivery confirmation for DM)."""
try:
data = getattr(event, 'payload', {})
# FIX: ACK event payload uses 'code', not 'expected_ack'
ack_code = _to_str(data.get('code', data.get('expected_ack')))
if not ack_code:
return
# Check if this ACK belongs to a pending DM retry
dm_id = self._pending_acks.get(ack_code)
# Only store if not already stored (retry task may have handled it)
existing = self.db.get_ack_for_dm(ack_code)
if existing:
return
self.db.insert_ack(
expected_ack=ack_code,
snr=data.get('snr'),
rssi=data.get('rssi'),
route_type=data.get('route_type', ''),
dm_id=dm_id,
)
logger.info(f"ACK received: {ack_code}" +
(f" (dm_id={dm_id})" if dm_id else ""))
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': ack_code,
'dm_id': dm_id,
'snr': data.get('snr'),
'rssi': data.get('rssi'),
'route_type': data.get('route_type', ''),
}, namespace='/chat')
except Exception as e:
logger.error(f"Error handling ACK: {e}")
async def _on_advertisement(self, event):
"""Handle received advertisement from another node.
ADVERTISEMENT payload only contains {'public_key': '...'}.
Full contact details (name, type, lat/lon) must be looked up
from mc.contacts which is synced at startup.
If the contact is unknown (new auto-add by firmware), refresh contacts.
"""
try:
data = getattr(event, 'payload', {})
pubkey = data.get('public_key', '')
if not pubkey:
return
# Look up full contact details from meshcore's contact list
contact = (self.mc.contacts or {}).get(pubkey, {})
name = contact.get('adv_name', contact.get('name', ''))
# Also check pending contacts (manual approval mode)
if not name:
pending = (self.mc.pending_contacts or {}).get(pubkey, {})
if pending:
name = pending.get('adv_name', pending.get('name', ''))
if not contact:
contact = pending
# If contact is still unknown, firmware may have just auto-added it.
if not name and pubkey not in (self.mc.contacts or {}):
logger.info(f"Unknown advert from {pubkey[:8]}..., refreshing contacts")
await self.mc.ensure_contacts(follow=True)
contact = (self.mc.contacts or {}).get(pubkey, {})
name = contact.get('adv_name', contact.get('name', ''))
adv_type = contact.get('type', data.get('adv_type', 0))
adv_lat = contact.get('adv_lat', data.get('adv_lat'))
adv_lon = contact.get('adv_lon', data.get('adv_lon'))
self.db.insert_advertisement(
public_key=pubkey,
name=name,
type=adv_type,
lat=adv_lat,
lon=adv_lon,
timestamp=int(time.time()),
snr=data.get('snr'),
)
# Upsert to contacts with last_advert timestamp
self.db.upsert_contact(
public_key=pubkey,
name=name,
type=adv_type,
adv_lat=adv_lat,
adv_lon=adv_lon,
last_advert=str(int(time.time())),
source='advert',
)
logger.info(f"Advert from '{name}' ({pubkey[:8]}...) type={adv_type}")
except Exception as e:
logger.error(f"Error handling advertisement: {e}")
async def _on_path_update(self, event):
"""Handle path update for a contact.
Also serves as backup delivery confirmation: when firmware sends
piggybacked ACK via flood, it fires both ACK and PATH_UPDATE events.
If the ACK event was missed, PATH_UPDATE can confirm delivery.
"""
try:
data = getattr(event, 'payload', {})
pubkey = data.get('public_key', '')
if not pubkey:
return
# Store path record (existing behavior)
self.db.insert_path(
contact_pubkey=pubkey,
path=data.get('path', ''),
snr=data.get('snr'),
path_len=data.get('path_len'),
)
logger.debug(f"Path update for {pubkey[:8]}...")
# Backup: check for pending DM to this contact
for ack_code, dm_id in list(self._pending_acks.items()):
dm = self.db.get_dm_by_id(dm_id)
if dm and dm.get('contact_pubkey') == pubkey and dm.get('direction') == 'out':
existing_ack = self.db.get_ack_for_dm(ack_code)
if not existing_ack:
self.db.insert_ack(
expected_ack=ack_code,
route_type='PATH_FLOOD',
dm_id=dm_id,
)
logger.info(f"PATH delivery confirmed for dm_id={dm_id}")
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': ack_code,
'dm_id': dm_id,
'route_type': 'PATH_FLOOD',
}, namespace='/chat')
break # Only confirm the most recent pending DM to this contact
except Exception as e:
logger.error(f"Error handling path update: {e}")
async def _on_rx_log_data(self, event):
"""Handle RX_LOG_DATA — RF log containing echoed/repeated packets.
Firmware sends LOG_DATA (0x88) packets for every repeated radio frame.
Payload format: header(1) [transport_code(4)] path_len(1) path(N) pkt_payload(rest)
We only process GRP_TXT (payload_type=0x05) for channel message echoes.
"""
try:
import io
data = getattr(event, 'payload', {})
payload_hex = data.get('payload', '')
logger.debug(f"RX_LOG_DATA received: {len(payload_hex)//2} bytes, snr={data.get('snr')}")
if not payload_hex:
return
pkt = bytes.fromhex(payload_hex)
pbuf = io.BytesIO(pkt)
header = pbuf.read(1)[0]
route_type = header & 0x03
payload_type = (header & 0x3C) >> 2
# Skip transport code for route_type 0 (flood) and 3
if route_type == 0x00 or route_type == 0x03:
pbuf.read(4) # discard transport code
path_len = pbuf.read(1)[0]
path = pbuf.read(path_len).hex()
pkt_payload = pbuf.read().hex()
# Only process GRP_TXT channel message echoes
if payload_type != 0x05:
return
if not pkt_payload:
return
snr = data.get('snr')
self._process_echo(pkt_payload, path, snr)
except Exception as e:
logger.error(f"Error handling RX_LOG_DATA: {e}")
def _get_channel_hash(self, channel_idx: int) -> str:
"""Get the expected channel hash byte (hex) for a channel index."""
import hashlib
secret_hex = self._channel_secrets.get(channel_idx)
if not secret_hex:
return None
return hashlib.sha256(bytes.fromhex(secret_hex)).digest()[0:1].hex()
def _process_echo(self, pkt_payload: str, path: str, snr: float = None):
"""Classify and store an echo: sent echo or incoming echo.
For sent messages: correlate with pending echo to get pkt_payload.
For incoming: store as echo keyed by pkt_payload for route display.
"""
with self._echo_lock:
current_time = time.time()
direction = 'incoming'
# Check if this matches a pending sent message
if self._pending_echo:
pe = self._pending_echo
age = current_time - pe['timestamp']
# Expire stale pending echo
if age > 60:
self._pending_echo = None
elif pe['pkt_payload'] is None:
# Validate channel hash before correlating — the first byte
# of pkt_payload is sha256(channel_secret)[0], must match
# the channel we sent on to avoid cross-channel mismatches
expected_hash = self._get_channel_hash(pe['channel_idx'])
echo_hash = pkt_payload[:2] if pkt_payload else None
if expected_hash and echo_hash and expected_hash == echo_hash:
# First echo after send — correlate pkt_payload with sent message
pe['pkt_payload'] = pkt_payload
direction = 'sent'
self.db.update_message_pkt_payload(pe['msg_id'], pkt_payload)
logger.info(f"Echo: correlated pkt_payload with sent msg #{pe['msg_id']}, path={path}")
elif expected_hash and echo_hash and expected_hash != echo_hash:
logger.debug(f"Echo: channel hash mismatch (expected {expected_hash}, got {echo_hash}) — not our sent msg")
elif pe['pkt_payload'] == pkt_payload:
# Additional echo for same sent message
direction = 'sent'
# Store echo in DB
self.db.insert_echo(
pkt_payload=pkt_payload,
path=path,
snr=snr,
direction=direction,
)
logger.debug(f"Echo ({direction}): path={path} snr={snr} pkt={pkt_payload[:16]}...")
# Emit SocketIO event for real-time UI update
if self.socketio:
self.socketio.emit('echo', {
'pkt_payload': pkt_payload,
'path': path,
'snr': snr,
'direction': direction,
}, namespace='/chat')
def _is_manual_approval_enabled(self) -> bool:
"""Check if manual contact approval is enabled (from persisted settings)."""
try:
from pathlib import Path
settings_path = Path(self.config.MC_CONFIG_DIR) / ".webui_settings.json"
if settings_path.exists():
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
return bool(settings.get('manual_add_contacts', False))
except Exception:
pass
return False
async def _on_new_contact(self, event):
"""Handle new contact discovered.
When manual approval is enabled, contacts go to pending list only.
When manual approval is off, contacts are auto-added to DB.
"""
try:
data = getattr(event, 'payload', {})
pubkey = data.get('public_key', '')
name = data.get('adv_name', data.get('name', ''))
if not pubkey:
return
# Ignored/blocked: still update cache but don't add to pending or device
if self.db.is_contact_ignored(pubkey) or self.db.is_contact_blocked(pubkey):
last_adv = data.get('last_advert')
last_advert_val = (
str(int(last_adv))
if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0
else str(int(time.time()))
)
self.db.upsert_contact(
public_key=pubkey,
name=name,
type=data.get('type', data.get('adv_type', 0)),
adv_lat=data.get('adv_lat'),
adv_lon=data.get('adv_lon'),
last_advert=last_advert_val,
source='advert',
)
logger.info(f"Ignored/blocked contact advert: {name} ({pubkey[:8]}...)")
return
if self._is_manual_approval_enabled():
# Manual mode: meshcore puts it in mc.pending_contacts for approval
logger.info(f"Pending contact (manual mode): {name} ({pubkey[:8]}...)")
# Also add to DB cache for @mentions and Cache filter
last_adv = data.get('last_advert')
last_advert_val = (
str(int(last_adv))
if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0
else str(int(time.time()))
)
self.db.upsert_contact(
public_key=pubkey,
name=name,
type=data.get('type', data.get('adv_type', 0)),
adv_lat=data.get('adv_lat'),
adv_lon=data.get('adv_lon'),
last_advert=last_advert_val,
source='advert', # cache-only until approved
)
if self.socketio:
self.socketio.emit('pending_contact', {
'public_key': pubkey,
'name': name,
'type': data.get('type', data.get('adv_type', 0)),
}, namespace='/chat')
return
# Auto mode: add to DB immediately
last_adv = data.get('last_advert')
last_advert_val = (
str(int(last_adv))
if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0
else str(int(time.time()))
)
self.db.upsert_contact(
public_key=pubkey,
name=name,
type=data.get('type', data.get('adv_type', 0)),
adv_lat=data.get('adv_lat'),
adv_lon=data.get('adv_lon'),
last_advert=last_advert_val,
source='device',
)
logger.info(f"New contact (auto-add): {name} ({pubkey[:8]}...)")
except Exception as e:
logger.error(f"Error handling new contact: {e}")
async def _on_disconnected(self, event):
"""Handle device disconnection."""
logger.warning("Device disconnected")
self._connected = False
if self.socketio:
self.socketio.emit('device_status', {
'connected': False,
}, namespace='/chat')
# ================================================================
# Command Methods (sync — called from Flask routes)
# ================================================================
def send_channel_message(self, channel_idx: int, text: str) -> Dict:
"""Send a message to a channel. Returns result dict."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
event = self.execute(self.mc.commands.send_chan_msg(channel_idx, text))
# Store the sent message in database
ts = int(time.time())
msg_id = self.db.insert_channel_message(
channel_idx=channel_idx,
sender=self.device_name,
content=text,
timestamp=ts,
is_own=True,
pkt_payload=getattr(event, 'data', {}).get('pkt_payload') if event else None,
)
# Register for echo correlation — first RX_LOG_DATA echo will
# provide the actual pkt_payload for this sent message
with self._echo_lock:
self._pending_echo = {
'timestamp': time.time(),
'channel_idx': channel_idx,
'msg_id': msg_id,
'pkt_payload': None,
}
# Emit SocketIO event so sender's UI updates immediately
if self.socketio:
self.socketio.emit('new_message', {
'type': 'channel',
'channel_idx': channel_idx,
'sender': self.device_name,
'content': text,
'timestamp': ts,
'is_own': True,
'id': msg_id,
}, namespace='/chat')
return {'success': True, 'message': 'Message sent', 'id': msg_id}
except Exception as e:
logger.error(f"Failed to send channel message: {e}")
return {'success': False, 'error': str(e)}
def send_dm(self, recipient_pubkey: str, text: str) -> Dict:
"""Send a direct message with background retry. Returns result dict."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
# Find contact in device's contact table
contact = self.mc.contacts.get(recipient_pubkey)
if not contact:
contact = self.mc.get_contact_by_key_prefix(recipient_pubkey)
if not contact:
# Contact must exist on device to send DM
return {'success': False,
'error': f'Contact not on device. '
f'Re-add {recipient_pubkey[:12]}... via Contacts page.'}
# Generate timestamp once — same for all retries (enables receiver dedup)
timestamp = int(time.time())
event = self.execute(
self.mc.commands.send_msg(contact, text,
timestamp=timestamp, attempt=0)
)
from meshcore.events import EventType
event_data = getattr(event, 'payload', {})
if event.type == EventType.ERROR:
err_detail = event_data.get('error', event_data.get('message', ''))
logger.warning(f"Device error sending DM to {recipient_pubkey[:12]}: "
f"payload={event_data}, contact_type={type(contact).__name__}")
return {'success': False, 'error': f'Device error sending DM: {err_detail}'}
ack = _to_str(event_data.get('expected_ack'))
suggested_timeout = event_data.get('suggested_timeout', 15000)
# Store sent DM in database (single record, not per-retry)
dm_id = self.db.insert_direct_message(
contact_pubkey=recipient_pubkey.lower(),
direction='out',
content=text,
timestamp=timestamp,
expected_ack=ack or None,
pkt_payload=_to_str(event_data.get('pkt_payload')) or None,
)
# Register ack → dm_id mapping for _on_ack handler
if ack:
self._pending_acks[ack] = dm_id
# Launch background retry task
task = asyncio.run_coroutine_threadsafe(
self._dm_retry_task(
dm_id, contact, text, timestamp,
ack, suggested_timeout
),
self._loop
)
self._retry_tasks[dm_id] = task
return {
'success': True,
'message': 'DM sent',
'id': dm_id,
'expected_ack': ack,
}
except Exception as e:
logger.error(f"Failed to send DM: {e}")
return {'success': False, 'error': str(e)}
async def _dm_retry_task(self, dm_id: int, contact, text: str,
timestamp: int, initial_ack: str,
suggested_timeout: int, max_attempts: int = 10):
"""Background retry with same timestamp for dedup on receiver."""
from meshcore.events import EventType
wait_s = max(suggested_timeout / 1000 * 1.2, 5.0)
# Wait for ACK on initial send
if initial_ack:
ack_event = await self.mc.dispatcher.wait_for_event(
EventType.ACK,
attribute_filters={"code": initial_ack},
timeout=wait_s
)
if ack_event:
self._confirm_delivery(dm_id, initial_ack, ack_event)
return
# Retry with same timestamp, incrementing attempt
for attempt in range(1, max_attempts):
# After 2 failed direct attempts, reset path to flood
if attempt >= 8:
try:
await self.mc.commands.reset_path(contact)
logger.info(f"DM retry {attempt}: reset path to flood")
except Exception:
pass
try:
result = await self.mc.commands.send_msg(
contact, text, timestamp=timestamp, attempt=attempt
)
except Exception as e:
logger.warning(f"DM retry {attempt}/{max_attempts}: send error: {e}")
continue
if result.type == EventType.ERROR:
logger.warning(f"DM retry {attempt}/{max_attempts}: device error")
continue
retry_ack = _to_str(result.payload.get('expected_ack'))
if retry_ack:
self._pending_acks[retry_ack] = dm_id
new_timeout = result.payload.get('suggested_timeout', suggested_timeout)
wait_s = max(new_timeout / 1000 * 1.2, 5.0)
if retry_ack:
ack_event = await self.mc.dispatcher.wait_for_event(
EventType.ACK,
attribute_filters={"code": retry_ack},
timeout=wait_s
)
if ack_event:
self._confirm_delivery(dm_id, retry_ack, ack_event)
return
logger.warning(f"DM retry exhausted ({max_attempts} attempts) for dm_id={dm_id}")
# Cleanup stale pending acks for this DM
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale:
self._pending_acks.pop(k, None)
self._retry_tasks.pop(dm_id, None)
def _confirm_delivery(self, dm_id: int, ack_code: str, ack_event):
"""Store ACK and notify frontend."""
data = getattr(ack_event, 'payload', {})
# Only store if not already stored by _on_ack handler
existing = self.db.get_ack_for_dm(ack_code)
if not existing:
self.db.insert_ack(
expected_ack=ack_code,
snr=data.get('snr'),
rssi=data.get('rssi'),
route_type=data.get('route_type', ''),
dm_id=dm_id,
)
logger.info(f"DM delivery confirmed: dm_id={dm_id}, ack={ack_code}")
if self.socketio:
self.socketio.emit('ack', {
'expected_ack': ack_code,
'dm_id': dm_id,
'snr': data.get('snr'),
}, namespace='/chat')
# Cleanup pending acks for this DM
stale = [k for k, v in self._pending_acks.items() if v == dm_id]
for k in stale:
self._pending_acks.pop(k, None)
self._retry_tasks.pop(dm_id, None)
def get_contacts_from_device(self) -> List[Dict]:
"""Refresh contacts from device and return the list."""
if not self.is_connected:
return []
try:
self.execute(self.mc.ensure_contacts(follow=True))
self._sync_contacts_to_db()
return self.db.get_contacts()
except Exception as e:
logger.error(f"Failed to get contacts: {e}")
return self.db.get_contacts() # return cached
def delete_contact(self, pubkey: str) -> Dict:
"""Delete a contact from device and soft-delete in database."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.remove_contact(pubkey))
self.db.delete_contact(pubkey) # soft-delete: sets source='deleted'
# Also remove from in-memory contacts cache
if self.mc.contacts and pubkey in self.mc.contacts:
del self.mc.contacts[pubkey]
return {'success': True, 'message': 'Contact deleted'}
except Exception as e:
logger.error(f"Failed to delete contact: {e}")
return {'success': False, 'error': str(e)}
def reset_path(self, pubkey: str) -> Dict:
"""Reset path to a contact."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.reset_path(pubkey))
return {'success': True, 'message': 'Path reset'}
except Exception as e:
logger.error(f"Failed to reset path: {e}")
return {'success': False, 'error': str(e)}
def get_device_info(self) -> Dict:
"""Get device info. Returns info dict or empty dict."""
if self._self_info:
return dict(self._self_info)
if not self.is_connected:
return {}
try:
event = self.execute(self.mc.commands.send_appstart())
if event and hasattr(event, 'data'):
self._self_info = getattr(event, 'payload', {})
return dict(self._self_info)
except Exception as e:
logger.error(f"Failed to get device info: {e}")
return {}
def get_channel_info(self, idx: int) -> Optional[Dict]:
"""Get info for a specific channel."""
if not self.is_connected:
return None
try:
event = self.execute(self.mc.commands.get_channel(idx))
if event:
data = getattr(event, 'payload', None) or getattr(event, 'data', None)
if data and isinstance(data, dict):
# Normalize keys: channel_name -> name, channel_secret -> secret
secret = data.get('channel_secret', data.get('secret', ''))
if isinstance(secret, bytes):
secret = secret.hex()
name = data.get('channel_name', data.get('name', ''))
if isinstance(name, str):
name = name.strip('\x00').strip()
return {
'name': name,
'secret': secret,
'channel_idx': data.get('channel_idx', idx),
}
except Exception as e:
logger.error(f"Failed to get channel {idx}: {e}")
return None
def set_channel(self, idx: int, name: str, secret: bytes = None) -> Dict:
"""Set/create a channel on the device."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.set_channel(idx, name, secret))
self.db.upsert_channel(idx, name, secret.hex() if secret else None)
return {'success': True, 'message': f'Channel {idx} set'}
except Exception as e:
logger.error(f"Failed to set channel: {e}")
return {'success': False, 'error': str(e)}
def remove_channel(self, idx: int) -> Dict:
"""Remove a channel from the device."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
# Set channel with empty name removes it
self.execute(self.mc.commands.set_channel(idx, '', None))
self.db.delete_channel(idx)
return {'success': True, 'message': f'Channel {idx} removed'}
except Exception as e:
logger.error(f"Failed to remove channel: {e}")
return {'success': False, 'error': str(e)}
def send_advert(self, flood: bool = False) -> Dict:
"""Send advertisement."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.send_advert(flood=flood))
return {'success': True, 'message': 'Advert sent'}
except Exception as e:
logger.error(f"Failed to send advert: {e}")
return {'success': False, 'error': str(e)}
def check_connection(self) -> bool:
"""Check if device is connected and responsive."""
if not self.is_connected:
return False
try:
self.execute(self.mc.commands.send_appstart(), timeout=5)
return True
except Exception:
return False
def set_manual_add_contacts(self, enabled: bool) -> Dict:
"""Enable/disable manual contact approval mode."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
self.execute(self.mc.commands.set_manual_add_contacts(enabled))
return {'success': True, 'message': f'Manual add contacts: {enabled}'}
except KeyError as e:
# Firmware may not support all fields needed by meshcore lib
logger.warning(f"set_manual_add_contacts unsupported by firmware: {e}")
return {'success': False, 'error': f'Firmware does not support this setting: {e}'}
except Exception as e:
logger.error(f"Failed to set manual_add_contacts: {e}")
return {'success': False, 'error': str(e)}
def get_pending_contacts(self) -> List[Dict]:
"""Get contacts pending manual approval."""
if not self.is_connected:
return []
try:
pending = self.mc.pending_contacts or {}
return [
{
'public_key': pk,
'name': c.get('adv_name', c.get('name', '')),
'type': c.get('type', c.get('adv_type', 0)),
'adv_lat': c.get('adv_lat'),
'adv_lon': c.get('adv_lon'),
'last_advert': c.get('last_advert'),
}
for pk, c in pending.items()
]
except Exception as e:
logger.error(f"Failed to get pending contacts: {e}")
return []
def approve_contact(self, pubkey: str) -> Dict:
"""Approve a pending contact."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
contact = (self.mc.pending_contacts or {}).get(pubkey)
if not contact:
return {'success': False, 'error': 'Contact not in pending list'}
self.execute(self.mc.commands.add_contact(contact))
# Refresh mc.contacts so send_dm can find the new contact
self.execute(self.mc.ensure_contacts(follow=True))
# Fallback: if ensure_contacts didn't pick up the new contact,
# add it manually to mc.contacts (firmware may need time)
if pubkey not in (self.mc.contacts or {}):
if self.mc.contacts is None:
self.mc.contacts = {}
self.mc.contacts[pubkey] = contact
logger.info(f"Manually added {pubkey[:12]}... to mc.contacts")
last_adv = contact.get('last_advert')
last_advert_val = (
str(int(last_adv))
if last_adv and isinstance(last_adv, (int, float)) and last_adv > 0
else str(int(time.time()))
)
self.db.upsert_contact(
public_key=pubkey,
name=contact.get('adv_name', contact.get('name', '')),
type=contact.get('type', contact.get('adv_type', 0)),
adv_lat=contact.get('adv_lat'),
adv_lon=contact.get('adv_lon'),
last_advert=last_advert_val,
source='device',
)
# Re-link orphaned DMs (from previous ON DELETE SET NULL)
contact_name = contact.get('adv_name', contact.get('name', ''))
self.db.relink_orphaned_dms(pubkey, name=contact_name)
# Remove from pending list after successful approval
self.mc.pending_contacts.pop(pubkey, None)
return {'success': True, 'message': 'Contact approved'}
except Exception as e:
logger.error(f"Failed to approve contact: {e}")
return {'success': False, 'error': str(e)}
def reject_contact(self, pubkey: str) -> Dict:
"""Reject a pending contact (remove from pending list without adding)."""
if not self.is_connected:
return {'success': False, 'error': 'Device not connected'}
try:
removed = self.mc.pending_contacts.pop(pubkey, None)
if removed:
return {'success': True, 'message': 'Contact rejected'}
return {'success': False, 'error': 'Contact not in pending list'}
except Exception as e:
logger.error(f"Failed to reject contact: {e}")
return {'success': False, 'error': str(e)}
def clear_pending_contacts(self) -> Dict:
"""Clear all pending contacts."""
try:
count = len(self.mc.pending_contacts) if self.mc and self.mc.pending_contacts else 0
if self.mc and self.mc.pending_contacts is not None:
self.mc.pending_contacts.clear()
return {'success': True, 'message': f'Cleared {count} pending contacts'}
except Exception as e:
logger.error(f"Failed to clear pending contacts: {e}")
return {'success': False, 'error': str(e)}
def get_battery(self) -> Optional[Dict]:
"""Get battery status."""
if not self.is_connected:
return None
try:
event = self.execute(self.mc.commands.get_bat(), timeout=5)
if event and hasattr(event, 'data'):
return getattr(event, 'payload', {})
except Exception as e:
logger.error(f"Failed to get battery: {e}")
return None
def get_device_stats(self) -> Dict:
"""Get combined device statistics (core + radio + packets)."""
if not self.is_connected:
return {}
stats = {}
try:
event = self.execute(self.mc.commands.get_stats_core(), timeout=5)
if event and hasattr(event, 'payload'):
stats['core'] = event.payload
except Exception as e:
logger.debug(f"get_stats_core failed: {e}")
try:
event = self.execute(self.mc.commands.get_stats_radio(), timeout=5)
if event and hasattr(event, 'payload'):
stats['radio'] = event.payload
except Exception as e:
logger.debug(f"get_stats_radio failed: {e}")
try:
event = self.execute(self.mc.commands.get_stats_packets(), timeout=5)
if event and hasattr(event, 'payload'):
stats['packets'] = event.payload
except Exception as e:
logger.debug(f"get_stats_packets failed: {e}")
return stats
def request_telemetry(self, contact_name: str) -> Optional[Dict]:
"""Request telemetry data from a remote sensor node."""
if not self.is_connected:
return None
contact = self.mc.get_contact_by_name(contact_name)
if not contact:
return {'error': f"Contact '{contact_name}' not found"}
try:
event = self.execute(
self.mc.commands.req_telemetry_sync(contact),
timeout=30
)
if event and hasattr(event, 'payload'):
return event.payload
return {'error': 'No telemetry response (timeout)'}
except Exception as e:
logger.error(f"Telemetry request failed: {e}")
return {'error': str(e)}
def request_neighbors(self, contact_name: str) -> Optional[Dict]:
"""Request neighbor list from a remote node."""
if not self.is_connected:
return None
contact = self.mc.get_contact_by_name(contact_name)
if not contact:
return {'error': f"Contact '{contact_name}' not found"}
try:
event = self.execute(
self.mc.commands.req_neighbours_sync(contact),
timeout=30
)
if event and hasattr(event, 'payload'):
return event.payload
return {'error': 'No neighbors response (timeout)'}
except Exception as e:
logger.error(f"Neighbors request failed: {e}")
return {'error': str(e)}
def send_trace(self, tag: int = 0) -> Optional[Dict]:
"""Send a trace packet to discover mesh topology."""
if not self.is_connected:
return None
try:
event = self.execute(
self.mc.commands.send_trace(tag=tag),
timeout=5
)
return {'success': True, 'message': f'Trace sent (tag={tag})'}
except Exception as e:
logger.error(f"Trace failed: {e}")
return {'error': str(e)}