Files
Remote-Terminal-for-MeshCore/app/routers/settings.py

347 lines
14 KiB
Python

import asyncio
import logging
from typing import Literal
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
from app.models import CONTACT_TYPE_REPEATER, AppSettings
from app.region_scope import normalize_region_scope
from app.repository import AppSettingsRepository, ChannelRepository, ContactRepository
from app.telemetry_interval import (
DEFAULT_TELEMETRY_INTERVAL_HOURS,
TELEMETRY_INTERVAL_OPTIONS_HOURS,
clamp_telemetry_interval,
legal_interval_options,
next_run_timestamp_utc,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/settings", tags=["settings"])
MAX_TRACKED_TELEMETRY_REPEATERS = 8
class AppSettingsUpdate(BaseModel):
max_radio_contacts: int | None = Field(
default=None,
ge=1,
le=1000,
description=(
"Configured radio contact capacity used for maintenance thresholds and "
"background refill behavior"
),
)
auto_decrypt_dm_on_advert: bool | None = Field(
default=None,
description="Whether to attempt historical DM decryption on new contact advertisement",
)
advert_interval: int | None = Field(
default=None,
ge=0,
description="Periodic advertisement interval in seconds (0 = disabled, minimum 3600)",
)
flood_scope: str | None = Field(
default=None,
description="Outbound flood scope / region name (empty = disabled)",
)
blocked_keys: list[str] | None = Field(
default=None,
description="Public keys whose messages are hidden from the UI",
)
blocked_names: list[str] | None = Field(
default=None,
description="Display names whose messages are hidden from the UI",
)
discovery_blocked_types: list[int] | None = Field(
default=None,
description=(
"Contact type codes (1=Client, 2=Repeater, 3=Room, 4=Sensor) whose "
"advertisements should not create new contacts"
),
)
auto_resend_channel: bool | None = Field(
default=None,
description="Auto-resend channel messages once if no echo heard within 2 seconds",
)
telemetry_interval_hours: int | None = Field(
default=None,
description=(
"Preferred tracked-repeater telemetry interval in hours. "
f"Must be one of {list(TELEMETRY_INTERVAL_OPTIONS_HOURS)}. "
"Effective interval is clamped up to the shortest legal value "
"based on the current tracked-repeater count."
),
)
class BlockKeyRequest(BaseModel):
key: str = Field(description="Public key to toggle block status")
class BlockNameRequest(BaseModel):
name: str = Field(description="Display name to toggle block status")
class FavoriteRequest(BaseModel):
type: Literal["channel", "contact"] = Field(description="'channel' or 'contact'")
id: str = Field(description="Channel key or contact public key")
class FavoriteToggleResponse(BaseModel):
type: Literal["channel", "contact"]
id: str
favorite: bool
class TrackedTelemetryRequest(BaseModel):
public_key: str = Field(description="Public key of the repeater to toggle tracking")
class TelemetrySchedule(BaseModel):
"""Surface of telemetry scheduling derivations for the UI.
``preferred_hours`` is the stored user choice. ``effective_hours`` is the
value the scheduler actually uses (preferred, clamped up to the shortest
legal interval given the current tracked-repeater count). ``options``
lists the subset of the menu that is legal at the current count; the UI
should hide anything not in this list. ``next_run_at`` is the Unix
timestamp (seconds, UTC) of the next scheduled cycle, or ``None`` when
no repeaters are tracked (nothing to schedule).
"""
preferred_hours: int = Field(description="User's saved telemetry interval preference")
effective_hours: int = Field(description="Scheduler's clamped interval")
options: list[int] = Field(description="Legal interval choices at the current count")
tracked_count: int = Field(description="Number of repeaters currently tracked")
max_tracked: int = Field(description="Maximum number of repeaters that can be tracked")
next_run_at: int | None = Field(
default=None,
description="Unix timestamp (UTC seconds) of the next scheduled cycle",
)
class TrackedTelemetryResponse(BaseModel):
tracked_telemetry_repeaters: list[str] = Field(
description="Current list of tracked repeater public keys"
)
names: dict[str, str] = Field(
description="Map of public key to display name for tracked repeaters"
)
schedule: TelemetrySchedule = Field(description="Current scheduling state")
def _build_schedule(tracked_count: int, preferred_hours: int | None) -> TelemetrySchedule:
pref = (
preferred_hours
if preferred_hours in TELEMETRY_INTERVAL_OPTIONS_HOURS
else DEFAULT_TELEMETRY_INTERVAL_HOURS
)
effective = clamp_telemetry_interval(pref, tracked_count)
return TelemetrySchedule(
preferred_hours=pref,
effective_hours=effective,
options=legal_interval_options(tracked_count),
tracked_count=tracked_count,
max_tracked=MAX_TRACKED_TELEMETRY_REPEATERS,
next_run_at=next_run_timestamp_utc(effective) if tracked_count > 0 else None,
)
@router.get("", response_model=AppSettings)
async def get_settings() -> AppSettings:
"""Get current application settings."""
return await AppSettingsRepository.get()
@router.patch("", response_model=AppSettings)
async def update_settings(update: AppSettingsUpdate) -> AppSettings:
"""Update application settings.
Settings are persisted to the database and survive restarts.
"""
kwargs = {}
if update.max_radio_contacts is not None:
logger.info("Updating max_radio_contacts to %d", update.max_radio_contacts)
kwargs["max_radio_contacts"] = update.max_radio_contacts
if update.auto_decrypt_dm_on_advert is not None:
logger.info("Updating auto_decrypt_dm_on_advert to %s", update.auto_decrypt_dm_on_advert)
kwargs["auto_decrypt_dm_on_advert"] = update.auto_decrypt_dm_on_advert
if update.advert_interval is not None:
# Enforce minimum 1-hour interval; 0 means disabled
interval = update.advert_interval
if 0 < interval < 3600:
interval = 3600
logger.info("Updating advert_interval to %d", interval)
kwargs["advert_interval"] = interval
# Block lists
if update.blocked_keys is not None:
kwargs["blocked_keys"] = [k.lower() for k in update.blocked_keys]
if update.blocked_names is not None:
kwargs["blocked_names"] = update.blocked_names
# Discovery blocked types
if update.discovery_blocked_types is not None:
# Only allow valid contact type codes (1-4)
valid = [t for t in update.discovery_blocked_types if t in (1, 2, 3, 4)]
kwargs["discovery_blocked_types"] = sorted(set(valid))
# Auto-resend channel
if update.auto_resend_channel is not None:
kwargs["auto_resend_channel"] = update.auto_resend_channel
# Telemetry interval preference. Invalid values fall back to default
# rather than 400-ing so a stale client can't brick settings saves.
if update.telemetry_interval_hours is not None:
raw_interval = update.telemetry_interval_hours
if raw_interval not in TELEMETRY_INTERVAL_OPTIONS_HOURS:
logger.warning(
"telemetry_interval_hours=%r is not in the menu; defaulting to %d",
raw_interval,
DEFAULT_TELEMETRY_INTERVAL_HOURS,
)
raw_interval = DEFAULT_TELEMETRY_INTERVAL_HOURS
logger.info("Updating telemetry_interval_hours to %d", raw_interval)
kwargs["telemetry_interval_hours"] = raw_interval
# Flood scope
flood_scope_changed = False
if update.flood_scope is not None:
kwargs["flood_scope"] = normalize_region_scope(update.flood_scope)
flood_scope_changed = True
if kwargs:
result = await AppSettingsRepository.update(**kwargs)
# Apply flood scope to radio immediately if changed
if flood_scope_changed:
from app.services.radio_runtime import radio_runtime as radio_manager
if radio_manager.is_connected:
try:
scope = result.flood_scope
async with radio_manager.radio_operation("set_flood_scope") as mc:
await mc.commands.set_flood_scope(scope if scope else "")
logger.info("Applied flood_scope=%r to radio", scope or "(disabled)")
except Exception as e:
logger.warning("Failed to apply flood_scope to radio: %s", e)
return result
return await AppSettingsRepository.get()
@router.post("/favorites/toggle", response_model=FavoriteToggleResponse)
async def toggle_favorite(request: FavoriteRequest) -> FavoriteToggleResponse:
"""Toggle a conversation's favorite status."""
if request.type == "contact":
contact = await ContactRepository.get_by_key(request.id)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
new_value = not contact.favorite
await ContactRepository.set_favorite(request.id, new_value)
logger.info("%s contact favorite: %s", "Added" if new_value else "Removed", request.id[:12])
# When newly favorited, load to radio immediately for DM ACK support
if new_value:
from app.radio_sync import ensure_contact_on_radio
asyncio.create_task(ensure_contact_on_radio(request.id, force=True))
else:
channel = await ChannelRepository.get_by_key(request.id)
if not channel:
raise HTTPException(status_code=404, detail="Channel not found")
new_value = not channel.favorite
await ChannelRepository.set_favorite(request.id, new_value)
logger.info("%s channel favorite: %s", "Added" if new_value else "Removed", request.id[:12])
return FavoriteToggleResponse(type=request.type, id=request.id, favorite=new_value)
@router.post("/blocked-keys/toggle", response_model=AppSettings)
async def toggle_blocked_key(request: BlockKeyRequest) -> AppSettings:
"""Toggle a public key's blocked status."""
logger.info("Toggling blocked key: %s", request.key[:12])
return await AppSettingsRepository.toggle_blocked_key(request.key)
@router.post("/blocked-names/toggle", response_model=AppSettings)
async def toggle_blocked_name(request: BlockNameRequest) -> AppSettings:
"""Toggle a display name's blocked status."""
logger.info("Toggling blocked name: %s", request.name)
return await AppSettingsRepository.toggle_blocked_name(request.name)
@router.post("/tracked-telemetry/toggle", response_model=TrackedTelemetryResponse)
async def toggle_tracked_telemetry(request: TrackedTelemetryRequest) -> TrackedTelemetryResponse:
"""Toggle periodic telemetry collection for a repeater.
Max 8 repeaters may be tracked. Returns 409 if the limit is reached and
the requested repeater is not already tracked.
"""
key = request.public_key.lower()
settings = await AppSettingsRepository.get()
current = settings.tracked_telemetry_repeaters
async def _resolve_names(keys: list[str]) -> dict[str, str]:
names: dict[str, str] = {}
for k in keys:
contact = await ContactRepository.get_by_key(k)
names[k] = contact.name if contact and contact.name else k[:12]
return names
if key in current:
# Remove
new_list = [k for k in current if k != key]
logger.info("Removing repeater %s from tracked telemetry", key[:12])
await AppSettingsRepository.update(tracked_telemetry_repeaters=new_list)
return TrackedTelemetryResponse(
tracked_telemetry_repeaters=new_list,
names=await _resolve_names(new_list),
schedule=_build_schedule(len(new_list), settings.telemetry_interval_hours),
)
# Validate it's a repeater
contact = await ContactRepository.get_by_key(key)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
if contact.type != CONTACT_TYPE_REPEATER:
raise HTTPException(status_code=400, detail="Contact is not a repeater")
if len(current) >= MAX_TRACKED_TELEMETRY_REPEATERS:
names = await _resolve_names(current)
raise HTTPException(
status_code=409,
detail={
"message": f"Limit of {MAX_TRACKED_TELEMETRY_REPEATERS} tracked repeaters reached",
"tracked_telemetry_repeaters": current,
"names": names,
},
)
new_list = current + [key]
logger.info("Adding repeater %s to tracked telemetry", key[:12])
await AppSettingsRepository.update(tracked_telemetry_repeaters=new_list)
return TrackedTelemetryResponse(
tracked_telemetry_repeaters=new_list,
names=await _resolve_names(new_list),
schedule=_build_schedule(len(new_list), settings.telemetry_interval_hours),
)
@router.get("/tracked-telemetry/schedule", response_model=TelemetrySchedule)
async def get_telemetry_schedule() -> TelemetrySchedule:
"""Return the current telemetry scheduling derivation.
The UI uses this to render the interval dropdown (legal options),
surface saved-vs-effective when they differ, and show the next-run-at
timestamp so users know when the next cycle will fire.
"""
app_settings = await AppSettingsRepository.get()
return _build_schedule(
len(app_settings.tracked_telemetry_repeaters),
app_settings.telemetry_interval_hours,
)