Files
Remote-Terminal-for-MeshCore/app/routers/repeaters.py
Jack Kingsman 5b166c4b66 Add room server
2026-03-19 19:22:40 -07:00

387 lines
13 KiB
Python

import asyncio
import logging
from typing import TYPE_CHECKING
from fastapi import APIRouter, HTTPException
from meshcore import EventType
from app.dependencies import require_connected
from app.models import (
CONTACT_TYPE_REPEATER,
AclEntry,
CommandRequest,
CommandResponse,
Contact,
LppSensor,
NeighborInfo,
RepeaterAclResponse,
RepeaterAdvertIntervalsResponse,
RepeaterLoginRequest,
RepeaterLoginResponse,
RepeaterLppTelemetryResponse,
RepeaterNeighborsResponse,
RepeaterNodeInfoResponse,
RepeaterOwnerInfoResponse,
RepeaterRadioSettingsResponse,
RepeaterStatusResponse,
)
from app.repository import ContactRepository
from app.routers.contacts import _ensure_on_radio, _resolve_contact_or_404
from app.routers.server_control import (
_monotonic,
batch_cli_fetch,
extract_response_text,
prepare_authenticated_contact_connection,
require_server_capable_contact,
send_contact_cli_command,
)
from app.services.radio_runtime import radio_runtime as radio_manager
if TYPE_CHECKING:
from meshcore.events import Event
logger = logging.getLogger(__name__)
# ACL permission level names
ACL_PERMISSION_NAMES = {
0: "Guest",
1: "Read-only",
2: "Read-write",
3: "Admin",
}
router = APIRouter(prefix="/contacts", tags=["repeaters"])
REPEATER_LOGIN_RESPONSE_TIMEOUT_SECONDS = 5.0
def _extract_response_text(event) -> str:
return extract_response_text(event)
async def _fetch_repeater_response(
mc,
target_pubkey_prefix: str,
timeout: float = 20.0,
) -> "Event | None":
deadline = _monotonic() + timeout
while _monotonic() < deadline:
try:
result = await mc.commands.get_msg(timeout=2.0)
except asyncio.TimeoutError:
continue
except Exception as exc:
logger.debug("get_msg() exception: %s", exc)
await asyncio.sleep(1.0)
continue
if result.type == EventType.NO_MORE_MSGS:
await asyncio.sleep(1.0)
continue
if result.type == EventType.ERROR:
logger.debug("get_msg() error: %s", result.payload)
await asyncio.sleep(1.0)
continue
if result.type == EventType.CONTACT_MSG_RECV:
msg_prefix = result.payload.get("pubkey_prefix", "")
txt_type = result.payload.get("txt_type", 0)
if msg_prefix == target_pubkey_prefix and txt_type == 1:
return result
logger.debug(
"Skipping non-target message (from=%s, txt_type=%d) while waiting for %s",
msg_prefix,
txt_type,
target_pubkey_prefix,
)
continue
if result.type == EventType.CHANNEL_MSG_RECV:
logger.debug(
"Skipping channel message (channel_idx=%s) during repeater fetch",
result.payload.get("channel_idx"),
)
continue
logger.debug("Unexpected event type %s during repeater fetch, skipping", result.type)
logger.warning("No CLI response from repeater %s within %.1fs", target_pubkey_prefix, timeout)
return None
async def prepare_repeater_connection(mc, contact: Contact, password: str) -> RepeaterLoginResponse:
return await prepare_authenticated_contact_connection(
mc,
contact,
password,
label="repeater",
response_timeout=REPEATER_LOGIN_RESPONSE_TIMEOUT_SECONDS,
)
def _require_repeater(contact: Contact) -> None:
"""Raise 400 if contact is not a repeater."""
if contact.type != CONTACT_TYPE_REPEATER:
raise HTTPException(
status_code=400,
detail=f"Contact is not a repeater (type={contact.type}, expected {CONTACT_TYPE_REPEATER})",
)
# ---------------------------------------------------------------------------
# Granular repeater endpoints — one attempt, no server-side retries.
# Frontend manages retry logic for better UX control.
# ---------------------------------------------------------------------------
@router.post("/{public_key}/repeater/login", response_model=RepeaterLoginResponse)
async def repeater_login(public_key: str, request: RepeaterLoginRequest) -> RepeaterLoginResponse:
"""Attempt repeater login and report whether auth was confirmed."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
async with radio_manager.radio_operation(
"repeater_login",
pause_polling=True,
suspend_auto_fetch=True,
) as mc:
return await prepare_repeater_connection(mc, contact, request.password)
@router.post("/{public_key}/repeater/status", response_model=RepeaterStatusResponse)
async def repeater_status(public_key: str) -> RepeaterStatusResponse:
"""Fetch status telemetry from a repeater (single attempt, 10s timeout)."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
async with radio_manager.radio_operation(
"repeater_status", pause_polling=True, suspend_auto_fetch=True
) as mc:
# Ensure contact is on radio for routing
await _ensure_on_radio(mc, contact)
status = await mc.commands.req_status_sync(contact.public_key, timeout=10, min_timeout=5)
if status is None:
raise HTTPException(status_code=504, detail="No status response from repeater")
return RepeaterStatusResponse(
battery_volts=status.get("bat", 0) / 1000.0,
tx_queue_len=status.get("tx_queue_len", 0),
noise_floor_dbm=status.get("noise_floor", 0),
last_rssi_dbm=status.get("last_rssi", 0),
last_snr_db=status.get("last_snr", 0.0),
packets_received=status.get("nb_recv", 0),
packets_sent=status.get("nb_sent", 0),
airtime_seconds=status.get("airtime", 0),
rx_airtime_seconds=status.get("rx_airtime", 0),
uptime_seconds=status.get("uptime", 0),
sent_flood=status.get("sent_flood", 0),
sent_direct=status.get("sent_direct", 0),
recv_flood=status.get("recv_flood", 0),
recv_direct=status.get("recv_direct", 0),
flood_dups=status.get("flood_dups", 0),
direct_dups=status.get("direct_dups", 0),
full_events=status.get("full_evts", 0),
)
@router.post("/{public_key}/repeater/lpp-telemetry", response_model=RepeaterLppTelemetryResponse)
async def repeater_lpp_telemetry(public_key: str) -> RepeaterLppTelemetryResponse:
"""Fetch CayenneLPP sensor telemetry from a repeater (single attempt, 10s timeout)."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
async with radio_manager.radio_operation(
"repeater_lpp_telemetry", pause_polling=True, suspend_auto_fetch=True
) as mc:
await _ensure_on_radio(mc, contact)
telemetry = await mc.commands.req_telemetry_sync(
contact.public_key, timeout=10, min_timeout=5
)
if telemetry is None:
raise HTTPException(status_code=504, detail="No telemetry response from repeater")
sensors: list[LppSensor] = []
for entry in telemetry:
channel = entry.get("channel", 0)
type_name = str(entry.get("type", "unknown"))
value = entry.get("value", 0)
sensors.append(LppSensor(channel=channel, type_name=type_name, value=value))
return RepeaterLppTelemetryResponse(sensors=sensors)
@router.post("/{public_key}/repeater/neighbors", response_model=RepeaterNeighborsResponse)
async def repeater_neighbors(public_key: str) -> RepeaterNeighborsResponse:
"""Fetch neighbors from a repeater (single attempt, 10s timeout)."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
async with radio_manager.radio_operation(
"repeater_neighbors", pause_polling=True, suspend_auto_fetch=True
) as mc:
# Ensure contact is on radio for routing
await _ensure_on_radio(mc, contact)
neighbors_data = await mc.commands.fetch_all_neighbours(
contact.public_key, timeout=10, min_timeout=5
)
neighbors: list[NeighborInfo] = []
if neighbors_data and "neighbours" in neighbors_data:
for n in neighbors_data["neighbours"]:
pubkey_prefix = n.get("pubkey", "")
resolved_contact = await ContactRepository.get_by_key_prefix(pubkey_prefix)
neighbors.append(
NeighborInfo(
pubkey_prefix=pubkey_prefix,
name=resolved_contact.name if resolved_contact else None,
snr=n.get("snr", 0.0),
last_heard_seconds=n.get("secs_ago", 0),
)
)
return RepeaterNeighborsResponse(neighbors=neighbors)
@router.post("/{public_key}/repeater/acl", response_model=RepeaterAclResponse)
async def repeater_acl(public_key: str) -> RepeaterAclResponse:
"""Fetch ACL from a repeater (single attempt, 10s timeout)."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
async with radio_manager.radio_operation(
"repeater_acl", pause_polling=True, suspend_auto_fetch=True
) as mc:
# Ensure contact is on radio for routing
await _ensure_on_radio(mc, contact)
acl_data = await mc.commands.req_acl_sync(contact.public_key, timeout=10, min_timeout=5)
acl_entries: list[AclEntry] = []
if acl_data and isinstance(acl_data, list):
for entry in acl_data:
pubkey_prefix = entry.get("key", "")
perm = entry.get("perm", 0)
resolved_contact = await ContactRepository.get_by_key_prefix(pubkey_prefix)
acl_entries.append(
AclEntry(
pubkey_prefix=pubkey_prefix,
name=resolved_contact.name if resolved_contact else None,
permission=perm,
permission_name=ACL_PERMISSION_NAMES.get(perm, f"Unknown({perm})"),
)
)
return RepeaterAclResponse(acl=acl_entries)
async def _batch_cli_fetch(
contact: Contact,
operation_name: str,
commands: list[tuple[str, str]],
) -> dict[str, str | None]:
return await batch_cli_fetch(contact, operation_name, commands)
@router.post("/{public_key}/repeater/node-info", response_model=RepeaterNodeInfoResponse)
async def repeater_node_info(public_key: str) -> RepeaterNodeInfoResponse:
"""Fetch repeater identity/location info via a small CLI batch."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
results = await _batch_cli_fetch(
contact,
"repeater_node_info",
[
("get name", "name"),
("get lat", "lat"),
("get lon", "lon"),
("clock", "clock_utc"),
],
)
return RepeaterNodeInfoResponse(**results)
@router.post("/{public_key}/repeater/radio-settings", response_model=RepeaterRadioSettingsResponse)
async def repeater_radio_settings(public_key: str) -> RepeaterRadioSettingsResponse:
"""Fetch radio settings from a repeater via radio/config CLI commands."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
results = await _batch_cli_fetch(
contact,
"repeater_radio_settings",
[
("ver", "firmware_version"),
("get radio", "radio"),
("get tx", "tx_power"),
("get af", "airtime_factor"),
("get repeat", "repeat_enabled"),
("get flood.max", "flood_max"),
],
)
return RepeaterRadioSettingsResponse(**results)
@router.post(
"/{public_key}/repeater/advert-intervals", response_model=RepeaterAdvertIntervalsResponse
)
async def repeater_advert_intervals(public_key: str) -> RepeaterAdvertIntervalsResponse:
"""Fetch advertisement intervals from a repeater via CLI commands."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
results = await _batch_cli_fetch(
contact,
"repeater_advert_intervals",
[
("get advert.interval", "advert_interval"),
("get flood.advert.interval", "flood_advert_interval"),
],
)
return RepeaterAdvertIntervalsResponse(**results)
@router.post("/{public_key}/repeater/owner-info", response_model=RepeaterOwnerInfoResponse)
async def repeater_owner_info(public_key: str) -> RepeaterOwnerInfoResponse:
"""Fetch owner info and guest password from a repeater via CLI commands."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
_require_repeater(contact)
results = await _batch_cli_fetch(
contact,
"repeater_owner_info",
[
("get owner.info", "owner_info"),
("get guest.password", "guest_password"),
],
)
return RepeaterOwnerInfoResponse(**results)
@router.post("/{public_key}/command", response_model=CommandResponse)
async def send_repeater_command(public_key: str, request: CommandRequest) -> CommandResponse:
"""Send a CLI command to a repeater or room server."""
require_connected()
contact = await _resolve_contact_or_404(public_key)
require_server_capable_contact(contact)
return await send_contact_cli_command(
contact,
request.command,
operation_name="send_repeater_command",
)