mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
318 lines
10 KiB
Python
318 lines
10 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
from typing import TYPE_CHECKING
|
|
|
|
from fastapi import HTTPException
|
|
from meshcore import EventType
|
|
|
|
from app.models import (
|
|
CONTACT_TYPE_REPEATER,
|
|
CONTACT_TYPE_ROOM,
|
|
CommandResponse,
|
|
Contact,
|
|
RepeaterLoginResponse,
|
|
)
|
|
from app.routers.contacts import _ensure_on_radio
|
|
from app.services.radio_runtime import radio_runtime as radio_manager
|
|
|
|
if TYPE_CHECKING:
|
|
from meshcore.events import Event
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SERVER_LOGIN_RESPONSE_TIMEOUT_SECONDS = 5.0
|
|
|
|
|
|
def _monotonic() -> float:
|
|
"""Wrapper around time.monotonic() for testability."""
|
|
return time.monotonic()
|
|
|
|
|
|
def get_server_contact_label(contact: Contact) -> str:
|
|
"""Return a user-facing label for server-capable contacts."""
|
|
if contact.type == CONTACT_TYPE_REPEATER:
|
|
return "repeater"
|
|
if contact.type == CONTACT_TYPE_ROOM:
|
|
return "room server"
|
|
return "server"
|
|
|
|
|
|
def require_server_capable_contact(
|
|
contact: Contact,
|
|
*,
|
|
allowed_types: tuple[int, ...] = (CONTACT_TYPE_REPEATER, CONTACT_TYPE_ROOM),
|
|
) -> None:
|
|
"""Raise 400 if the contact does not support server control/login features."""
|
|
if contact.type not in allowed_types:
|
|
expected = ", ".join(str(value) for value in allowed_types)
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Contact is not a supported server contact (type={contact.type}, expected one of {expected})",
|
|
)
|
|
|
|
|
|
def _login_rejected_message(label: str) -> str:
|
|
return (
|
|
f"The {label} replied but did not confirm this login. "
|
|
f"Existing access may still allow some {label} operations, but privileged actions may fail."
|
|
)
|
|
|
|
|
|
def _login_send_failed_message(label: str) -> str:
|
|
return (
|
|
f"The login request could not be sent to the {label}. "
|
|
f"The control panel is still available, but authenticated actions may fail until a login succeeds."
|
|
)
|
|
|
|
|
|
def _login_timeout_message(label: str) -> str:
|
|
return (
|
|
f"No login confirmation was heard from the {label}. "
|
|
"That can mean the password was wrong or the reply was missed in transit. "
|
|
"The control panel is still available; try logging in again if authenticated actions fail."
|
|
)
|
|
|
|
|
|
def extract_response_text(event) -> str:
|
|
"""Extract text from a CLI response event, stripping the firmware '> ' prefix."""
|
|
text = event.payload.get("text", str(event.payload))
|
|
if text.startswith("> "):
|
|
text = text[2:]
|
|
return text
|
|
|
|
|
|
async def fetch_contact_cli_response(
|
|
mc,
|
|
target_pubkey_prefix: str,
|
|
timeout: float = 20.0,
|
|
) -> "Event | None":
|
|
"""Fetch a CLI response from a specific contact via a validated get_msg() loop."""
|
|
deadline = _monotonic() + timeout
|
|
|
|
while _monotonic() < deadline:
|
|
try:
|
|
result = await mc.commands.get_msg(timeout=2.0)
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as exc:
|
|
logger.debug("get_msg() exception: %s", exc)
|
|
await asyncio.sleep(1.0)
|
|
continue
|
|
|
|
if result.type == EventType.NO_MORE_MSGS:
|
|
await asyncio.sleep(1.0)
|
|
continue
|
|
|
|
if result.type == EventType.ERROR:
|
|
logger.debug("get_msg() error: %s", result.payload)
|
|
await asyncio.sleep(1.0)
|
|
continue
|
|
|
|
if result.type == EventType.CONTACT_MSG_RECV:
|
|
msg_prefix = result.payload.get("pubkey_prefix", "")
|
|
txt_type = result.payload.get("txt_type", 0)
|
|
if msg_prefix == target_pubkey_prefix and txt_type == 1:
|
|
return result
|
|
logger.debug(
|
|
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
|
|
msg_prefix,
|
|
txt_type,
|
|
target_pubkey_prefix,
|
|
)
|
|
continue
|
|
|
|
if result.type == EventType.CHANNEL_MSG_RECV:
|
|
logger.debug(
|
|
"Skipping channel message (channel_idx=%s) during CLI fetch",
|
|
result.payload.get("channel_idx"),
|
|
)
|
|
continue
|
|
|
|
logger.debug("Unexpected event type %s during CLI fetch, skipping", result.type)
|
|
|
|
logger.warning("No CLI response from contact %s within %.1fs", target_pubkey_prefix, timeout)
|
|
return None
|
|
|
|
|
|
async def prepare_authenticated_contact_connection(
|
|
mc,
|
|
contact: Contact,
|
|
password: str,
|
|
*,
|
|
label: str | None = None,
|
|
response_timeout: float = SERVER_LOGIN_RESPONSE_TIMEOUT_SECONDS,
|
|
) -> RepeaterLoginResponse:
|
|
"""Prepare connection to a server-capable contact by adding it to the radio and logging in."""
|
|
pubkey_prefix = contact.public_key[:12].lower()
|
|
contact_label = label or get_server_contact_label(contact)
|
|
loop = asyncio.get_running_loop()
|
|
login_future = loop.create_future()
|
|
|
|
def _resolve_login(event_type: EventType, message: str | None = None) -> None:
|
|
if login_future.done():
|
|
return
|
|
login_future.set_result(
|
|
RepeaterLoginResponse(
|
|
status="ok" if event_type == EventType.LOGIN_SUCCESS else "error",
|
|
authenticated=event_type == EventType.LOGIN_SUCCESS,
|
|
message=message,
|
|
)
|
|
)
|
|
|
|
success_subscription = mc.subscribe(
|
|
EventType.LOGIN_SUCCESS,
|
|
lambda _event: _resolve_login(EventType.LOGIN_SUCCESS),
|
|
attribute_filters={"pubkey_prefix": pubkey_prefix},
|
|
)
|
|
failed_subscription = mc.subscribe(
|
|
EventType.LOGIN_FAILED,
|
|
lambda _event: _resolve_login(
|
|
EventType.LOGIN_FAILED,
|
|
_login_rejected_message(contact_label),
|
|
),
|
|
attribute_filters={"pubkey_prefix": pubkey_prefix},
|
|
)
|
|
|
|
try:
|
|
logger.info("Adding %s %s to radio", contact_label, contact.public_key[:12])
|
|
await _ensure_on_radio(mc, contact)
|
|
|
|
logger.info("Sending login to %s %s", contact_label, contact.public_key[:12])
|
|
login_result = await mc.commands.send_login(contact.public_key, password)
|
|
|
|
if login_result.type == EventType.ERROR:
|
|
return RepeaterLoginResponse(
|
|
status="error",
|
|
authenticated=False,
|
|
message=f"{_login_send_failed_message(contact_label)} ({login_result.payload})",
|
|
)
|
|
|
|
try:
|
|
return await asyncio.wait_for(
|
|
login_future,
|
|
timeout=response_timeout,
|
|
)
|
|
except asyncio.TimeoutError:
|
|
logger.warning(
|
|
"No login response from %s %s within %.1fs",
|
|
contact_label,
|
|
contact.public_key[:12],
|
|
response_timeout,
|
|
)
|
|
return RepeaterLoginResponse(
|
|
status="timeout",
|
|
authenticated=False,
|
|
message=_login_timeout_message(contact_label),
|
|
)
|
|
except HTTPException as exc:
|
|
logger.warning(
|
|
"%s login setup failed for %s: %s",
|
|
contact_label.capitalize(),
|
|
contact.public_key[:12],
|
|
exc.detail,
|
|
)
|
|
return RepeaterLoginResponse(
|
|
status="error",
|
|
authenticated=False,
|
|
message=f"{_login_send_failed_message(contact_label)} ({exc.detail})",
|
|
)
|
|
finally:
|
|
success_subscription.unsubscribe()
|
|
failed_subscription.unsubscribe()
|
|
|
|
|
|
async def batch_cli_fetch(
|
|
contact: Contact,
|
|
operation_name: str,
|
|
commands: list[tuple[str, str]],
|
|
) -> dict[str, str | None]:
|
|
"""Send a batch of CLI commands to a server-capable contact and collect responses."""
|
|
results: dict[str, str | None] = {field: None for _, field in commands}
|
|
|
|
async with radio_manager.radio_operation(
|
|
operation_name,
|
|
pause_polling=True,
|
|
suspend_auto_fetch=True,
|
|
) as mc:
|
|
await _ensure_on_radio(mc, contact)
|
|
await asyncio.sleep(1.0)
|
|
|
|
for index, (cmd, field) in enumerate(commands):
|
|
if index > 0:
|
|
await asyncio.sleep(1.0)
|
|
|
|
send_result = await mc.commands.send_cmd(contact.public_key, cmd)
|
|
if send_result.type == EventType.ERROR:
|
|
logger.debug("Command '%s' send error: %s", cmd, send_result.payload)
|
|
continue
|
|
|
|
response_event = await fetch_contact_cli_response(
|
|
mc, contact.public_key[:12], timeout=10.0
|
|
)
|
|
if response_event is not None:
|
|
results[field] = extract_response_text(response_event)
|
|
else:
|
|
logger.warning("No response for command '%s' (%s)", cmd, field)
|
|
|
|
return results
|
|
|
|
|
|
async def send_contact_cli_command(
|
|
contact: Contact,
|
|
command: str,
|
|
*,
|
|
operation_name: str,
|
|
) -> CommandResponse:
|
|
"""Send a CLI command to a server-capable contact and return the text response."""
|
|
label = get_server_contact_label(contact)
|
|
|
|
async with radio_manager.radio_operation(
|
|
operation_name,
|
|
pause_polling=True,
|
|
suspend_auto_fetch=True,
|
|
) as mc:
|
|
logger.info("Adding %s %s to radio", label, contact.public_key[:12])
|
|
await _ensure_on_radio(mc, contact)
|
|
await asyncio.sleep(1.0)
|
|
|
|
logger.info("Sending command to %s %s: %s", label, contact.public_key[:12], command)
|
|
send_result = await mc.commands.send_cmd(contact.public_key, command)
|
|
|
|
if send_result.type == EventType.ERROR:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to send command: {send_result.payload}"
|
|
)
|
|
|
|
response_event = await fetch_contact_cli_response(mc, contact.public_key[:12])
|
|
|
|
if response_event is None:
|
|
logger.warning(
|
|
"No response from %s %s for command: %s",
|
|
label,
|
|
contact.public_key[:12],
|
|
command,
|
|
)
|
|
return CommandResponse(
|
|
command=command,
|
|
response="(no response - command may have been processed)",
|
|
)
|
|
|
|
response_text = extract_response_text(response_event)
|
|
sender_timestamp = response_event.payload.get(
|
|
"sender_timestamp",
|
|
response_event.payload.get("timestamp"),
|
|
)
|
|
logger.info(
|
|
"Received response from %s %s: %s",
|
|
label,
|
|
contact.public_key[:12],
|
|
response_text,
|
|
)
|
|
|
|
return CommandResponse(
|
|
command=command,
|
|
response=response_text,
|
|
sender_timestamp=sender_timestamp,
|
|
)
|