mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
252 lines
8.8 KiB
Python
252 lines
8.8 KiB
Python
import logging
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from meshcore import EventType
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.dependencies import require_connected
|
|
from app.radio import radio_manager
|
|
from app.radio_sync import send_advertisement as do_send_advertisement
|
|
from app.radio_sync import sync_radio_time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/radio", tags=["radio"])
|
|
|
|
|
|
class RadioSettings(BaseModel):
|
|
freq: float = Field(description="Frequency in MHz")
|
|
bw: float = Field(description="Bandwidth in kHz")
|
|
sf: int = Field(description="Spreading factor (7-12)")
|
|
cr: int = Field(description="Coding rate (1-4)")
|
|
|
|
|
|
class RadioConfigResponse(BaseModel):
|
|
public_key: str = Field(description="Public key (64-char hex)")
|
|
name: str
|
|
lat: float
|
|
lon: float
|
|
tx_power: int = Field(description="Transmit power in dBm")
|
|
max_tx_power: int = Field(description="Maximum transmit power in dBm")
|
|
radio: RadioSettings
|
|
|
|
|
|
class RadioConfigUpdate(BaseModel):
|
|
name: str | None = None
|
|
lat: float | None = None
|
|
lon: float | None = None
|
|
tx_power: int | None = Field(default=None, description="Transmit power in dBm")
|
|
radio: RadioSettings | None = None
|
|
|
|
|
|
class PrivateKeyUpdate(BaseModel):
|
|
private_key: str = Field(description="Private key as hex string")
|
|
|
|
|
|
@router.get("/config", response_model=RadioConfigResponse)
|
|
async def get_radio_config() -> RadioConfigResponse:
|
|
"""Get the current radio configuration."""
|
|
mc = require_connected()
|
|
|
|
info = mc.self_info
|
|
if not info:
|
|
raise HTTPException(status_code=503, detail="Radio info not available")
|
|
|
|
return RadioConfigResponse(
|
|
public_key=info.get("public_key", ""),
|
|
name=info.get("name", ""),
|
|
lat=info.get("adv_lat", 0.0),
|
|
lon=info.get("adv_lon", 0.0),
|
|
tx_power=info.get("tx_power", 0),
|
|
max_tx_power=info.get("max_tx_power", 0),
|
|
radio=RadioSettings(
|
|
freq=info.get("radio_freq", 0.0),
|
|
bw=info.get("radio_bw", 0.0),
|
|
sf=info.get("radio_sf", 0),
|
|
cr=info.get("radio_cr", 0),
|
|
),
|
|
)
|
|
|
|
|
|
@router.patch("/config", response_model=RadioConfigResponse)
|
|
async def update_radio_config(update: RadioConfigUpdate) -> RadioConfigResponse:
|
|
"""Update radio configuration. Only provided fields will be updated."""
|
|
require_connected()
|
|
|
|
async with radio_manager.radio_operation("update_radio_config") as mc:
|
|
if update.name is not None:
|
|
logger.info("Setting radio name to %s", update.name)
|
|
await mc.commands.set_name(update.name)
|
|
|
|
if update.lat is not None or update.lon is not None:
|
|
current_info = mc.self_info
|
|
lat = update.lat if update.lat is not None else current_info.get("adv_lat", 0.0)
|
|
lon = update.lon if update.lon is not None else current_info.get("adv_lon", 0.0)
|
|
logger.info("Setting radio coordinates to %f, %f", lat, lon)
|
|
await mc.commands.set_coords(lat=lat, lon=lon)
|
|
|
|
if update.tx_power is not None:
|
|
logger.info("Setting TX power to %d dBm", update.tx_power)
|
|
await mc.commands.set_tx_power(val=update.tx_power)
|
|
|
|
if update.radio is not None:
|
|
logger.info(
|
|
"Setting radio params: freq=%f MHz, bw=%f kHz, sf=%d, cr=%d",
|
|
update.radio.freq,
|
|
update.radio.bw,
|
|
update.radio.sf,
|
|
update.radio.cr,
|
|
)
|
|
await mc.commands.set_radio(
|
|
freq=update.radio.freq,
|
|
bw=update.radio.bw,
|
|
sf=update.radio.sf,
|
|
cr=update.radio.cr,
|
|
)
|
|
|
|
# Sync time with system clock
|
|
await sync_radio_time(mc)
|
|
|
|
# Re-fetch self_info so the response reflects the changes we just made.
|
|
# Commands like set_name() write to flash but don't update the cached
|
|
# self_info — send_appstart() triggers a fresh SELF_INFO from the radio.
|
|
await mc.commands.send_appstart()
|
|
|
|
return await get_radio_config()
|
|
|
|
|
|
@router.put("/private-key")
|
|
async def set_private_key(update: PrivateKeyUpdate) -> dict:
|
|
"""Set the radio's private key. This is write-only."""
|
|
require_connected()
|
|
|
|
try:
|
|
key_bytes = bytes.fromhex(update.private_key)
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="Invalid hex string for private key") from None
|
|
|
|
logger.info("Importing private key")
|
|
async with radio_manager.radio_operation("import_private_key") as mc:
|
|
result = await mc.commands.import_private_key(key_bytes)
|
|
|
|
if result.type == EventType.ERROR:
|
|
raise HTTPException(
|
|
status_code=500, detail=f"Failed to import private key: {result.payload}"
|
|
)
|
|
|
|
# Re-export from radio so the server-side keystore uses the new key
|
|
# for DM decryption immediately, rather than waiting for reconnect.
|
|
from app.keystore import export_and_store_private_key
|
|
|
|
keystore_refreshed = await export_and_store_private_key(mc)
|
|
if not keystore_refreshed:
|
|
logger.warning("Keystore refresh failed after import, retrying once")
|
|
keystore_refreshed = await export_and_store_private_key(mc)
|
|
|
|
if not keystore_refreshed:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=(
|
|
"Private key imported on radio, but server-side keystore "
|
|
"refresh failed. Reconnect to apply the new key for DM decryption."
|
|
),
|
|
)
|
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
@router.post("/advertise")
|
|
async def send_advertisement() -> dict:
|
|
"""Send a flood advertisement to announce presence on the mesh.
|
|
|
|
Manual advertisement requests always send immediately, updating the
|
|
last_advert_time which affects when the next periodic/startup advert
|
|
can occur.
|
|
|
|
Returns:
|
|
status: "ok" if sent successfully
|
|
"""
|
|
require_connected()
|
|
|
|
logger.info("Sending flood advertisement")
|
|
async with radio_manager.radio_operation("manual_advertisement") as mc:
|
|
success = await do_send_advertisement(mc, force=True)
|
|
|
|
if not success:
|
|
raise HTTPException(status_code=500, detail="Failed to send advertisement")
|
|
|
|
return {"status": "ok"}
|
|
|
|
|
|
async def _attempt_reconnect() -> dict:
|
|
"""Shared reconnection logic for reboot and reconnect endpoints."""
|
|
if radio_manager.is_reconnecting:
|
|
return {
|
|
"status": "pending",
|
|
"message": "Reconnection already in progress",
|
|
"connected": False,
|
|
}
|
|
|
|
success = await radio_manager.reconnect()
|
|
if not success:
|
|
raise HTTPException(
|
|
status_code=503, detail="Failed to reconnect. Check radio connection and power."
|
|
)
|
|
|
|
try:
|
|
await radio_manager.post_connect_setup()
|
|
except Exception as e:
|
|
logger.exception("Post-connect setup failed after reconnect")
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail=f"Radio connected but setup failed: {e}",
|
|
) from e
|
|
|
|
return {"status": "ok", "message": "Reconnected successfully", "connected": True}
|
|
|
|
|
|
@router.post("/reboot")
|
|
async def reboot_radio() -> dict:
|
|
"""Reboot the radio, or reconnect if not currently connected.
|
|
|
|
If connected: sends reboot command, connection will temporarily drop and auto-reconnect.
|
|
If not connected: attempts to reconnect (same as /reconnect endpoint).
|
|
"""
|
|
if radio_manager.is_connected:
|
|
logger.info("Rebooting radio")
|
|
async with radio_manager.radio_operation("reboot_radio") as mc:
|
|
await mc.commands.reboot()
|
|
return {
|
|
"status": "ok",
|
|
"message": "Reboot command sent. Radio will reconnect automatically.",
|
|
}
|
|
|
|
logger.info("Radio not connected, attempting reconnect")
|
|
return await _attempt_reconnect()
|
|
|
|
|
|
@router.post("/reconnect")
|
|
async def reconnect_radio() -> dict:
|
|
"""Attempt to reconnect to the radio.
|
|
|
|
This will try to re-establish connection to the radio, with auto-detection
|
|
if no specific port is configured. Useful when the radio has been disconnected
|
|
or power-cycled.
|
|
"""
|
|
if radio_manager.is_connected:
|
|
if radio_manager.is_setup_complete:
|
|
return {"status": "ok", "message": "Already connected", "connected": True}
|
|
|
|
logger.info("Radio connected but setup incomplete, retrying setup")
|
|
try:
|
|
await radio_manager.post_connect_setup()
|
|
return {"status": "ok", "message": "Setup completed", "connected": True}
|
|
except Exception as e:
|
|
logger.exception("Post-connect setup failed")
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail=f"Radio connected but setup failed: {e}",
|
|
) from e
|
|
|
|
logger.info("Manual reconnect requested")
|
|
return await _attempt_reconnect()
|