mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-01 19:12:57 +02:00
259 lines
9.7 KiB
Python
259 lines
9.7 KiB
Python
import asyncio
|
|
import logging
|
|
from typing import Literal
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.models import CONTACT_TYPE_REPEATER, AppSettings
|
|
from app.region_scope import normalize_region_scope
|
|
from app.repository import AppSettingsRepository, ChannelRepository, ContactRepository
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/settings", tags=["settings"])
|
|
|
|
MAX_TRACKED_TELEMETRY_REPEATERS = 8
|
|
|
|
|
|
class AppSettingsUpdate(BaseModel):
|
|
max_radio_contacts: int | None = Field(
|
|
default=None,
|
|
ge=1,
|
|
le=1000,
|
|
description=(
|
|
"Configured radio contact capacity used for maintenance thresholds and "
|
|
"background refill behavior"
|
|
),
|
|
)
|
|
auto_decrypt_dm_on_advert: bool | None = Field(
|
|
default=None,
|
|
description="Whether to attempt historical DM decryption on new contact advertisement",
|
|
)
|
|
advert_interval: int | None = Field(
|
|
default=None,
|
|
ge=0,
|
|
description="Periodic advertisement interval in seconds (0 = disabled, minimum 3600)",
|
|
)
|
|
flood_scope: str | None = Field(
|
|
default=None,
|
|
description="Outbound flood scope / region name (empty = disabled)",
|
|
)
|
|
blocked_keys: list[str] | None = Field(
|
|
default=None,
|
|
description="Public keys whose messages are hidden from the UI",
|
|
)
|
|
blocked_names: list[str] | None = Field(
|
|
default=None,
|
|
description="Display names whose messages are hidden from the UI",
|
|
)
|
|
discovery_blocked_types: list[int] | None = Field(
|
|
default=None,
|
|
description=(
|
|
"Contact type codes (1=Client, 2=Repeater, 3=Room, 4=Sensor) whose "
|
|
"advertisements should not create new contacts"
|
|
),
|
|
)
|
|
auto_resend_channel: bool | None = Field(
|
|
default=None,
|
|
description="Auto-resend channel messages once if no echo heard within 2 seconds",
|
|
)
|
|
|
|
|
|
class BlockKeyRequest(BaseModel):
|
|
key: str = Field(description="Public key to toggle block status")
|
|
|
|
|
|
class BlockNameRequest(BaseModel):
|
|
name: str = Field(description="Display name to toggle block status")
|
|
|
|
|
|
class FavoriteRequest(BaseModel):
|
|
type: Literal["channel", "contact"] = Field(description="'channel' or 'contact'")
|
|
id: str = Field(description="Channel key or contact public key")
|
|
|
|
|
|
class FavoriteToggleResponse(BaseModel):
|
|
type: Literal["channel", "contact"]
|
|
id: str
|
|
favorite: bool
|
|
|
|
|
|
class TrackedTelemetryRequest(BaseModel):
|
|
public_key: str = Field(description="Public key of the repeater to toggle tracking")
|
|
|
|
|
|
class TrackedTelemetryResponse(BaseModel):
|
|
tracked_telemetry_repeaters: list[str] = Field(
|
|
description="Current list of tracked repeater public keys"
|
|
)
|
|
names: dict[str, str] = Field(
|
|
description="Map of public key to display name for tracked repeaters"
|
|
)
|
|
|
|
|
|
@router.get("", response_model=AppSettings)
|
|
async def get_settings() -> AppSettings:
|
|
"""Get current application settings."""
|
|
return await AppSettingsRepository.get()
|
|
|
|
|
|
@router.patch("", response_model=AppSettings)
|
|
async def update_settings(update: AppSettingsUpdate) -> AppSettings:
|
|
"""Update application settings.
|
|
|
|
Settings are persisted to the database and survive restarts.
|
|
"""
|
|
kwargs = {}
|
|
if update.max_radio_contacts is not None:
|
|
logger.info("Updating max_radio_contacts to %d", update.max_radio_contacts)
|
|
kwargs["max_radio_contacts"] = update.max_radio_contacts
|
|
|
|
if update.auto_decrypt_dm_on_advert is not None:
|
|
logger.info("Updating auto_decrypt_dm_on_advert to %s", update.auto_decrypt_dm_on_advert)
|
|
kwargs["auto_decrypt_dm_on_advert"] = update.auto_decrypt_dm_on_advert
|
|
|
|
if update.advert_interval is not None:
|
|
# Enforce minimum 1-hour interval; 0 means disabled
|
|
interval = update.advert_interval
|
|
if 0 < interval < 3600:
|
|
interval = 3600
|
|
logger.info("Updating advert_interval to %d", interval)
|
|
kwargs["advert_interval"] = interval
|
|
|
|
# Block lists
|
|
if update.blocked_keys is not None:
|
|
kwargs["blocked_keys"] = [k.lower() for k in update.blocked_keys]
|
|
if update.blocked_names is not None:
|
|
kwargs["blocked_names"] = update.blocked_names
|
|
|
|
# Discovery blocked types
|
|
if update.discovery_blocked_types is not None:
|
|
# Only allow valid contact type codes (1-4)
|
|
valid = [t for t in update.discovery_blocked_types if t in (1, 2, 3, 4)]
|
|
kwargs["discovery_blocked_types"] = sorted(set(valid))
|
|
|
|
# Auto-resend channel
|
|
if update.auto_resend_channel is not None:
|
|
kwargs["auto_resend_channel"] = update.auto_resend_channel
|
|
|
|
# Flood scope
|
|
flood_scope_changed = False
|
|
if update.flood_scope is not None:
|
|
kwargs["flood_scope"] = normalize_region_scope(update.flood_scope)
|
|
flood_scope_changed = True
|
|
|
|
if kwargs:
|
|
result = await AppSettingsRepository.update(**kwargs)
|
|
|
|
# Apply flood scope to radio immediately if changed
|
|
if flood_scope_changed:
|
|
from app.services.radio_runtime import radio_runtime as radio_manager
|
|
|
|
if radio_manager.is_connected:
|
|
try:
|
|
scope = result.flood_scope
|
|
async with radio_manager.radio_operation("set_flood_scope") as mc:
|
|
await mc.commands.set_flood_scope(scope if scope else "")
|
|
logger.info("Applied flood_scope=%r to radio", scope or "(disabled)")
|
|
except Exception as e:
|
|
logger.warning("Failed to apply flood_scope to radio: %s", e)
|
|
|
|
return result
|
|
|
|
return await AppSettingsRepository.get()
|
|
|
|
|
|
@router.post("/favorites/toggle", response_model=FavoriteToggleResponse)
|
|
async def toggle_favorite(request: FavoriteRequest) -> FavoriteToggleResponse:
|
|
"""Toggle a conversation's favorite status."""
|
|
if request.type == "contact":
|
|
contact = await ContactRepository.get_by_key(request.id)
|
|
if not contact:
|
|
raise HTTPException(status_code=404, detail="Contact not found")
|
|
new_value = not contact.favorite
|
|
await ContactRepository.set_favorite(request.id, new_value)
|
|
logger.info("%s contact favorite: %s", "Added" if new_value else "Removed", request.id[:12])
|
|
# When newly favorited, load to radio immediately for DM ACK support
|
|
if new_value:
|
|
from app.radio_sync import ensure_contact_on_radio
|
|
|
|
asyncio.create_task(ensure_contact_on_radio(request.id, force=True))
|
|
else:
|
|
channel = await ChannelRepository.get_by_key(request.id)
|
|
if not channel:
|
|
raise HTTPException(status_code=404, detail="Channel not found")
|
|
new_value = not channel.favorite
|
|
await ChannelRepository.set_favorite(request.id, new_value)
|
|
logger.info("%s channel favorite: %s", "Added" if new_value else "Removed", request.id[:12])
|
|
|
|
return FavoriteToggleResponse(type=request.type, id=request.id, favorite=new_value)
|
|
|
|
|
|
@router.post("/blocked-keys/toggle", response_model=AppSettings)
|
|
async def toggle_blocked_key(request: BlockKeyRequest) -> AppSettings:
|
|
"""Toggle a public key's blocked status."""
|
|
logger.info("Toggling blocked key: %s", request.key[:12])
|
|
return await AppSettingsRepository.toggle_blocked_key(request.key)
|
|
|
|
|
|
@router.post("/blocked-names/toggle", response_model=AppSettings)
|
|
async def toggle_blocked_name(request: BlockNameRequest) -> AppSettings:
|
|
"""Toggle a display name's blocked status."""
|
|
logger.info("Toggling blocked name: %s", request.name)
|
|
return await AppSettingsRepository.toggle_blocked_name(request.name)
|
|
|
|
|
|
@router.post("/tracked-telemetry/toggle", response_model=TrackedTelemetryResponse)
|
|
async def toggle_tracked_telemetry(request: TrackedTelemetryRequest) -> TrackedTelemetryResponse:
|
|
"""Toggle periodic telemetry collection for a repeater.
|
|
|
|
Max 8 repeaters may be tracked. Returns 409 if the limit is reached and
|
|
the requested repeater is not already tracked.
|
|
"""
|
|
key = request.public_key.lower()
|
|
settings = await AppSettingsRepository.get()
|
|
current = settings.tracked_telemetry_repeaters
|
|
|
|
async def _resolve_names(keys: list[str]) -> dict[str, str]:
|
|
names: dict[str, str] = {}
|
|
for k in keys:
|
|
contact = await ContactRepository.get_by_key(k)
|
|
names[k] = contact.name if contact and contact.name else k[:12]
|
|
return names
|
|
|
|
if key in current:
|
|
# Remove
|
|
new_list = [k for k in current if k != key]
|
|
logger.info("Removing repeater %s from tracked telemetry", key[:12])
|
|
await AppSettingsRepository.update(tracked_telemetry_repeaters=new_list)
|
|
return TrackedTelemetryResponse(
|
|
tracked_telemetry_repeaters=new_list,
|
|
names=await _resolve_names(new_list),
|
|
)
|
|
|
|
# Validate it's a repeater
|
|
contact = await ContactRepository.get_by_key(key)
|
|
if not contact:
|
|
raise HTTPException(status_code=404, detail="Contact not found")
|
|
if contact.type != CONTACT_TYPE_REPEATER:
|
|
raise HTTPException(status_code=400, detail="Contact is not a repeater")
|
|
|
|
if len(current) >= MAX_TRACKED_TELEMETRY_REPEATERS:
|
|
names = await _resolve_names(current)
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail={
|
|
"message": f"Limit of {MAX_TRACKED_TELEMETRY_REPEATERS} tracked repeaters reached",
|
|
"tracked_telemetry_repeaters": current,
|
|
"names": names,
|
|
},
|
|
)
|
|
|
|
new_list = current + [key]
|
|
logger.info("Adding repeater %s to tracked telemetry", key[:12])
|
|
await AppSettingsRepository.update(tracked_telemetry_repeaters=new_list)
|
|
return TrackedTelemetryResponse(
|
|
tracked_telemetry_repeaters=new_list,
|
|
names=await _resolve_names(new_list),
|
|
)
|