import asyncio import logging import time from typing import TYPE_CHECKING from fastapi import HTTPException from meshcore import EventType from app.models import ( CONTACT_TYPE_REPEATER, CONTACT_TYPE_ROOM, CommandResponse, Contact, RepeaterLoginResponse, ) from app.routers.contacts import _ensure_on_radio from app.services.radio_runtime import radio_runtime as radio_manager if TYPE_CHECKING: from meshcore.events import Event logger = logging.getLogger(__name__) SERVER_LOGIN_RESPONSE_TIMEOUT_SECONDS = 5.0 def _monotonic() -> float: """Wrapper around time.monotonic() for testability.""" return time.monotonic() def get_server_contact_label(contact: Contact) -> str: """Return a user-facing label for server-capable contacts.""" if contact.type == CONTACT_TYPE_REPEATER: return "repeater" if contact.type == CONTACT_TYPE_ROOM: return "room server" return "server" def require_server_capable_contact( contact: Contact, *, allowed_types: tuple[int, ...] = (CONTACT_TYPE_REPEATER, CONTACT_TYPE_ROOM), ) -> None: """Raise 400 if the contact does not support server control/login features.""" if contact.type not in allowed_types: expected = ", ".join(str(value) for value in allowed_types) raise HTTPException( status_code=400, detail=f"Contact is not a supported server contact (type={contact.type}, expected one of {expected})", ) def _login_rejected_message(label: str) -> str: return ( f"The {label} replied but did not confirm this login. " f"Existing access may still allow some {label} operations, but privileged actions may fail." ) def _login_send_failed_message(label: str) -> str: return ( f"The login request could not be sent to the {label}. " f"The control panel is still available, but authenticated actions may fail until a login succeeds." ) def _login_timeout_message(label: str) -> str: return ( f"No login confirmation was heard from the {label}. " "That can mean the password was wrong or the reply was missed in transit. " "The control panel is still available; try logging in again if authenticated actions fail." ) def extract_response_text(event) -> str: """Extract text from a CLI response event, stripping the firmware '> ' prefix.""" text = event.payload.get("text", str(event.payload)) if text.startswith("> "): text = text[2:] return text async def fetch_contact_cli_response( mc, target_pubkey_prefix: str, timeout: float = 20.0, ) -> "Event | None": """Fetch a CLI response from a specific contact via a validated get_msg() loop.""" deadline = _monotonic() + timeout while _monotonic() < deadline: try: result = await mc.commands.get_msg(timeout=2.0) except asyncio.TimeoutError: continue except Exception as exc: logger.debug("get_msg() exception: %s", exc) await asyncio.sleep(1.0) continue if result.type == EventType.NO_MORE_MSGS: await asyncio.sleep(1.0) continue if result.type == EventType.ERROR: logger.debug("get_msg() error: %s", result.payload) await asyncio.sleep(1.0) continue if result.type == EventType.CONTACT_MSG_RECV: msg_prefix = result.payload.get("pubkey_prefix", "") txt_type = result.payload.get("txt_type", 0) if msg_prefix == target_pubkey_prefix and txt_type == 1: return result logger.debug( "Skipping non-target message (from=%s, txt_type=%d) while waiting for %s", msg_prefix, txt_type, target_pubkey_prefix, ) continue if result.type == EventType.CHANNEL_MSG_RECV: logger.debug( "Skipping channel message (channel_idx=%s) during CLI fetch", result.payload.get("channel_idx"), ) continue logger.debug("Unexpected event type %s during CLI fetch, skipping", result.type) logger.warning("No CLI response from contact %s within %.1fs", target_pubkey_prefix, timeout) return None async def prepare_authenticated_contact_connection( mc, contact: Contact, password: str, *, label: str | None = None, response_timeout: float = SERVER_LOGIN_RESPONSE_TIMEOUT_SECONDS, ) -> RepeaterLoginResponse: """Prepare connection to a server-capable contact by adding it to the radio and logging in.""" pubkey_prefix = contact.public_key[:12].lower() contact_label = label or get_server_contact_label(contact) loop = asyncio.get_running_loop() login_future = loop.create_future() def _resolve_login(event_type: EventType, message: str | None = None) -> None: if login_future.done(): return login_future.set_result( RepeaterLoginResponse( status="ok" if event_type == EventType.LOGIN_SUCCESS else "error", authenticated=event_type == EventType.LOGIN_SUCCESS, message=message, ) ) success_subscription = mc.subscribe( EventType.LOGIN_SUCCESS, lambda _event: _resolve_login(EventType.LOGIN_SUCCESS), attribute_filters={"pubkey_prefix": pubkey_prefix}, ) failed_subscription = mc.subscribe( EventType.LOGIN_FAILED, lambda _event: _resolve_login( EventType.LOGIN_FAILED, _login_rejected_message(contact_label), ), attribute_filters={"pubkey_prefix": pubkey_prefix}, ) try: logger.info("Adding %s %s to radio", contact_label, contact.public_key[:12]) await _ensure_on_radio(mc, contact) logger.info("Sending login to %s %s", contact_label, contact.public_key[:12]) login_result = await mc.commands.send_login(contact.public_key, password) if login_result.type == EventType.ERROR: return RepeaterLoginResponse( status="error", authenticated=False, message=f"{_login_send_failed_message(contact_label)} ({login_result.payload})", ) try: return await asyncio.wait_for( login_future, timeout=response_timeout, ) except asyncio.TimeoutError: logger.warning( "No login response from %s %s within %.1fs", contact_label, contact.public_key[:12], response_timeout, ) return RepeaterLoginResponse( status="timeout", authenticated=False, message=_login_timeout_message(contact_label), ) except HTTPException as exc: logger.warning( "%s login setup failed for %s: %s", contact_label.capitalize(), contact.public_key[:12], exc.detail, ) return RepeaterLoginResponse( status="error", authenticated=False, message=f"{_login_send_failed_message(contact_label)} ({exc.detail})", ) finally: success_subscription.unsubscribe() failed_subscription.unsubscribe() async def batch_cli_fetch( contact: Contact, operation_name: str, commands: list[tuple[str, str]], ) -> dict[str, str | None]: """Send a batch of CLI commands to a server-capable contact and collect responses.""" results: dict[str, str | None] = {field: None for _, field in commands} async with radio_manager.radio_operation( operation_name, pause_polling=True, suspend_auto_fetch=True, ) as mc: await _ensure_on_radio(mc, contact) await asyncio.sleep(1.0) for index, (cmd, field) in enumerate(commands): if index > 0: await asyncio.sleep(1.0) send_result = await mc.commands.send_cmd(contact.public_key, cmd) if send_result.type == EventType.ERROR: logger.debug("Command '%s' send error: %s", cmd, send_result.payload) continue response_event = await fetch_contact_cli_response( mc, contact.public_key[:12], timeout=10.0 ) if response_event is not None: results[field] = extract_response_text(response_event) else: logger.warning("No response for command '%s' (%s)", cmd, field) return results async def send_contact_cli_command( contact: Contact, command: str, *, operation_name: str, ) -> CommandResponse: """Send a CLI command to a server-capable contact and return the text response.""" label = get_server_contact_label(contact) async with radio_manager.radio_operation( operation_name, pause_polling=True, suspend_auto_fetch=True, ) as mc: logger.info("Adding %s %s to radio", label, contact.public_key[:12]) await _ensure_on_radio(mc, contact) await asyncio.sleep(1.0) logger.info("Sending command to %s %s: %s", label, contact.public_key[:12], command) send_result = await mc.commands.send_cmd(contact.public_key, command) if send_result.type == EventType.ERROR: raise HTTPException( status_code=500, detail=f"Failed to send command: {send_result.payload}" ) response_event = await fetch_contact_cli_response(mc, contact.public_key[:12]) if response_event is None: logger.warning( "No response from %s %s for command: %s", label, contact.public_key[:12], command, ) return CommandResponse( command=command, response="(no response - command may have been processed)", ) response_text = extract_response_text(response_event) sender_timestamp = response_event.payload.get( "sender_timestamp", response_event.payload.get("timestamp"), ) logger.info( "Received response from %s %s: %s", label, contact.public_key[:12], response_text, ) return CommandResponse( command=command, response=response_text, sender_timestamp=sender_timestamp, )