mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
354 lines
16 KiB
Python
354 lines
16 KiB
Python
import asyncio
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
POST_CONNECT_SETUP_TIMEOUT_SECONDS = 300
|
|
POST_CONNECT_SETUP_MAX_ATTEMPTS = 2
|
|
|
|
|
|
def _clean_device_string(value: object) -> str | None:
|
|
if not isinstance(value, str):
|
|
return None
|
|
cleaned = value.strip()
|
|
return cleaned or None
|
|
|
|
|
|
def _decode_fixed_string(raw: bytes, start: int, length: int) -> str | None:
|
|
if len(raw) < start:
|
|
return None
|
|
return _clean_device_string(
|
|
raw[start : start + length].decode("utf-8", "ignore").replace("\0", "")
|
|
)
|
|
|
|
|
|
async def run_post_connect_setup(radio_manager) -> None:
|
|
"""Run shared radio initialization after a transport connection succeeds."""
|
|
from app.event_handlers import register_event_handlers
|
|
from app.keystore import export_and_store_private_key
|
|
from app.radio_sync import (
|
|
drain_pending_messages,
|
|
send_advertisement,
|
|
start_message_polling,
|
|
start_periodic_advert,
|
|
start_periodic_sync,
|
|
sync_and_offload_all,
|
|
sync_radio_time,
|
|
)
|
|
|
|
if not radio_manager.meshcore:
|
|
return
|
|
|
|
if radio_manager._setup_lock is None:
|
|
radio_manager._setup_lock = asyncio.Lock()
|
|
|
|
async def _setup_body() -> None:
|
|
if not radio_manager.meshcore:
|
|
return
|
|
radio_manager._setup_in_progress = True
|
|
radio_manager._setup_complete = False
|
|
try:
|
|
# Hold the operation lock for all radio I/O during setup.
|
|
# This prevents user-initiated operations (send message, etc.)
|
|
# from interleaving commands on the serial link.
|
|
await radio_manager._acquire_operation_lock("post_connect_setup", blocking=True)
|
|
try:
|
|
mc = radio_manager.meshcore
|
|
if not mc:
|
|
return
|
|
|
|
# Register event handlers against the locked, current transport.
|
|
register_event_handlers(mc)
|
|
|
|
await export_and_store_private_key(mc)
|
|
|
|
# Sync radio clock with system time
|
|
await sync_radio_time(mc)
|
|
|
|
# Apply flood scope from settings (best-effort; older firmware
|
|
# may not support set_flood_scope)
|
|
from app.region_scope import normalize_region_scope
|
|
from app.repository import AppSettingsRepository
|
|
|
|
app_settings = await AppSettingsRepository.get()
|
|
scope = normalize_region_scope(app_settings.flood_scope)
|
|
try:
|
|
await mc.commands.set_flood_scope(scope if scope else "")
|
|
logger.info("Applied flood_scope=%r", scope or "(disabled)")
|
|
except Exception as exc:
|
|
logger.warning("set_flood_scope failed (firmware may not support it): %s", exc)
|
|
|
|
# Query path hash mode support (best-effort; older firmware won't report it).
|
|
# If the library's parsed payload is missing path_hash_mode (e.g. stale
|
|
# .pyc on WSL2 Windows mounts), fall back to raw-frame extraction.
|
|
reader = mc._reader
|
|
_original_handle_rx = reader.handle_rx
|
|
_captured_frame: list[bytes] = []
|
|
|
|
async def _capture_handle_rx(data: bytearray) -> None:
|
|
from meshcore.packets import PacketType
|
|
|
|
if len(data) > 0 and data[0] == PacketType.DEVICE_INFO.value:
|
|
_captured_frame.append(bytes(data))
|
|
return await _original_handle_rx(data)
|
|
|
|
reader.handle_rx = _capture_handle_rx
|
|
radio_manager.device_info_loaded = False
|
|
radio_manager.max_contacts = None
|
|
radio_manager.device_model = None
|
|
radio_manager.firmware_build = None
|
|
radio_manager.firmware_version = None
|
|
radio_manager.max_channels = 40
|
|
radio_manager.path_hash_mode = 0
|
|
radio_manager.path_hash_mode_supported = False
|
|
try:
|
|
device_query = await mc.commands.send_device_query()
|
|
payload = (
|
|
device_query.payload
|
|
if device_query is not None and isinstance(device_query.payload, dict)
|
|
else {}
|
|
)
|
|
|
|
payload_max_contacts = payload.get("max_contacts")
|
|
if isinstance(payload_max_contacts, int):
|
|
radio_manager.max_contacts = max(1, payload_max_contacts)
|
|
|
|
payload_max_channels = payload.get("max_channels")
|
|
if isinstance(payload_max_channels, int):
|
|
radio_manager.max_channels = max(1, payload_max_channels)
|
|
|
|
radio_manager.device_model = _clean_device_string(payload.get("model"))
|
|
radio_manager.firmware_build = _clean_device_string(payload.get("fw_build"))
|
|
radio_manager.firmware_version = _clean_device_string(payload.get("ver"))
|
|
|
|
fw_ver = payload.get("fw ver")
|
|
payload_reports_device_info = isinstance(fw_ver, int) and fw_ver >= 3
|
|
if payload_reports_device_info:
|
|
radio_manager.device_info_loaded = True
|
|
|
|
if "path_hash_mode" in payload and isinstance(payload["path_hash_mode"], int):
|
|
radio_manager.path_hash_mode = payload["path_hash_mode"]
|
|
radio_manager.path_hash_mode_supported = True
|
|
|
|
if _captured_frame:
|
|
# Raw-frame fallback / completion:
|
|
# byte 1 = fw_ver, byte 2 = max_contacts/2, byte 3 = max_channels,
|
|
# bytes 8:20 = fw_build, 20:60 = model, 60:80 = ver, byte 81 = path_hash_mode
|
|
raw = _captured_frame[-1]
|
|
fw_ver = raw[1] if len(raw) > 1 else 0
|
|
if fw_ver >= 3:
|
|
radio_manager.device_info_loaded = True
|
|
if radio_manager.max_contacts is None and len(raw) >= 3:
|
|
radio_manager.max_contacts = max(1, raw[2] * 2)
|
|
if len(raw) >= 4 and not isinstance(payload_max_channels, int):
|
|
radio_manager.max_channels = max(1, raw[3])
|
|
if radio_manager.firmware_build is None:
|
|
radio_manager.firmware_build = _decode_fixed_string(raw, 8, 12)
|
|
if radio_manager.device_model is None:
|
|
radio_manager.device_model = _decode_fixed_string(raw, 20, 40)
|
|
if radio_manager.firmware_version is None:
|
|
radio_manager.firmware_version = _decode_fixed_string(raw, 60, 20)
|
|
if (
|
|
not radio_manager.path_hash_mode_supported
|
|
and fw_ver >= 10
|
|
and len(raw) >= 82
|
|
):
|
|
radio_manager.path_hash_mode = raw[81]
|
|
radio_manager.path_hash_mode_supported = True
|
|
logger.warning(
|
|
"path_hash_mode=%d extracted from raw frame "
|
|
"(stale .pyc? try: rm %s)",
|
|
radio_manager.path_hash_mode,
|
|
getattr(
|
|
__import__("meshcore.reader", fromlist=["reader"]),
|
|
"__cached__",
|
|
"meshcore __pycache__/reader.*.pyc",
|
|
),
|
|
)
|
|
if radio_manager.path_hash_mode_supported:
|
|
logger.info("Path hash mode: %d (supported)", radio_manager.path_hash_mode)
|
|
else:
|
|
logger.debug("Firmware does not report path_hash_mode")
|
|
if radio_manager.device_info_loaded:
|
|
logger.info(
|
|
"Radio device info: model=%s build=%s version=%s max_contacts=%s max_channels=%d",
|
|
radio_manager.device_model or "unknown",
|
|
radio_manager.firmware_build or "unknown",
|
|
radio_manager.firmware_version or "unknown",
|
|
radio_manager.max_contacts
|
|
if radio_manager.max_contacts is not None
|
|
else "unknown",
|
|
radio_manager.max_channels,
|
|
)
|
|
logger.info("Max channel slots: %d", radio_manager.max_channels)
|
|
except Exception as exc:
|
|
logger.debug("Failed to query device info capabilities: %s", exc)
|
|
finally:
|
|
reader.handle_rx = _original_handle_rx
|
|
|
|
# Sync contacts/channels from radio to DB and clear radio
|
|
logger.info("Syncing and offloading radio data...")
|
|
result = await sync_and_offload_all(mc)
|
|
logger.info("Sync complete: %s", result)
|
|
|
|
# Send advertisement to announce our presence (if enabled and not throttled)
|
|
if await send_advertisement(mc):
|
|
logger.info("Advertisement sent")
|
|
else:
|
|
logger.debug("Advertisement skipped (disabled or throttled)")
|
|
|
|
# Drain any messages that were queued before we connected.
|
|
# This must happen BEFORE starting auto-fetch, otherwise both
|
|
# compete on get_msg() with interleaved radio I/O.
|
|
drained = await drain_pending_messages(mc)
|
|
if drained > 0:
|
|
logger.info("Drained %d pending message(s)", drained)
|
|
radio_manager.clear_pending_message_channel_slots()
|
|
|
|
await mc.start_auto_message_fetching()
|
|
logger.info("Auto message fetching started")
|
|
finally:
|
|
radio_manager._release_operation_lock("post_connect_setup")
|
|
|
|
# Start background tasks AFTER releasing the operation lock.
|
|
# These tasks acquire their own locks when they need radio access.
|
|
start_periodic_sync()
|
|
start_periodic_advert()
|
|
start_message_polling()
|
|
|
|
radio_manager._setup_complete = True
|
|
finally:
|
|
radio_manager._setup_in_progress = False
|
|
|
|
async with radio_manager._setup_lock:
|
|
await asyncio.wait_for(_setup_body(), timeout=POST_CONNECT_SETUP_TIMEOUT_SECONDS)
|
|
|
|
logger.info("Post-connect setup complete")
|
|
|
|
|
|
async def prepare_connected_radio(radio_manager, *, broadcast_on_success: bool = True) -> bool:
|
|
"""Finish setup for an already-connected radio and optionally broadcast health."""
|
|
from app.websocket import broadcast_error, broadcast_health
|
|
|
|
if not radio_manager.connection_desired:
|
|
if radio_manager.is_connected:
|
|
await radio_manager.disconnect()
|
|
return False
|
|
|
|
for attempt in range(1, POST_CONNECT_SETUP_MAX_ATTEMPTS + 1):
|
|
try:
|
|
await radio_manager.post_connect_setup()
|
|
break
|
|
except asyncio.TimeoutError as exc:
|
|
if attempt < POST_CONNECT_SETUP_MAX_ATTEMPTS:
|
|
logger.warning(
|
|
"Post-connect setup timed out after %ds on attempt %d/%d; retrying once",
|
|
POST_CONNECT_SETUP_TIMEOUT_SECONDS,
|
|
attempt,
|
|
POST_CONNECT_SETUP_MAX_ATTEMPTS,
|
|
)
|
|
continue
|
|
|
|
logger.error(
|
|
"Post-connect setup timed out after %ds on %d attempts. Initial radio offload "
|
|
"took too long; something is probably wrong.",
|
|
POST_CONNECT_SETUP_TIMEOUT_SECONDS,
|
|
POST_CONNECT_SETUP_MAX_ATTEMPTS,
|
|
)
|
|
broadcast_error(
|
|
"Radio startup appears stuck",
|
|
"Initial radio offload took too long. Reboot the radio and restart the server.",
|
|
)
|
|
raise RuntimeError("Post-connect setup timed out") from exc
|
|
|
|
if not radio_manager.connection_desired:
|
|
if radio_manager.is_connected:
|
|
await radio_manager.disconnect()
|
|
return False
|
|
|
|
radio_manager._last_connected = True
|
|
if broadcast_on_success:
|
|
broadcast_health(True, radio_manager.connection_info)
|
|
return True
|
|
|
|
|
|
async def reconnect_and_prepare_radio(
|
|
radio_manager,
|
|
*,
|
|
broadcast_on_success: bool = True,
|
|
) -> bool:
|
|
"""Reconnect the transport, then run post-connect setup before reporting healthy."""
|
|
connected = await radio_manager.reconnect(broadcast_on_success=False)
|
|
if not connected:
|
|
return False
|
|
|
|
return await prepare_connected_radio(radio_manager, broadcast_on_success=broadcast_on_success)
|
|
|
|
|
|
async def connection_monitor_loop(radio_manager) -> None:
|
|
"""Monitor radio health and keep transport/setup state converged."""
|
|
from app.websocket import broadcast_health
|
|
|
|
check_interval_seconds = 5
|
|
unresponsive_threshold = 3
|
|
consecutive_setup_failures = 0
|
|
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(check_interval_seconds)
|
|
|
|
current_connected = radio_manager.is_connected
|
|
connection_desired = radio_manager.connection_desired
|
|
|
|
if radio_manager._last_connected and not current_connected:
|
|
logger.warning("Radio connection lost, broadcasting status change")
|
|
broadcast_health(False, radio_manager.connection_info)
|
|
radio_manager._last_connected = False
|
|
consecutive_setup_failures = 0
|
|
|
|
if not connection_desired:
|
|
if current_connected:
|
|
logger.info("Radio connection paused by operator; disconnecting transport")
|
|
await radio_manager.disconnect()
|
|
consecutive_setup_failures = 0
|
|
continue
|
|
|
|
if not current_connected:
|
|
if not radio_manager.is_reconnecting and await reconnect_and_prepare_radio(
|
|
radio_manager,
|
|
broadcast_on_success=True,
|
|
):
|
|
consecutive_setup_failures = 0
|
|
|
|
elif not radio_manager._last_connected and current_connected:
|
|
logger.info("Radio connection restored")
|
|
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
|
|
consecutive_setup_failures = 0
|
|
|
|
elif (
|
|
current_connected
|
|
and not radio_manager.is_setup_complete
|
|
and not radio_manager.is_setup_in_progress
|
|
):
|
|
logger.info("Retrying post-connect setup...")
|
|
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
|
|
consecutive_setup_failures = 0
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
consecutive_setup_failures += 1
|
|
if consecutive_setup_failures == unresponsive_threshold:
|
|
logger.error(
|
|
"Post-connect setup has failed %d times in a row. "
|
|
"The radio port appears open but the radio is not "
|
|
"responding to commands. Common causes: another "
|
|
"process has the serial port open (check for other "
|
|
"RemoteTerm instances, serial monitors, etc.), the "
|
|
"firmware is in repeater mode (not client), or the "
|
|
"radio needs a power cycle. Will keep retrying.",
|
|
consecutive_setup_failures,
|
|
)
|
|
elif consecutive_setup_failures < unresponsive_threshold:
|
|
logger.exception("Error in connection monitor, continuing: %s", e)
|