Files
Remote-Terminal-for-MeshCore/tests/test_settings_router.py

246 lines
9.3 KiB
Python

"""Tests for settings router endpoints and validation behavior."""
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import HTTPException
from app.models import CONTACT_TYPE_REPEATER, AppSettings, ContactUpsert
from app.repository import AppSettingsRepository, ContactRepository
from app.routers.settings import (
AppSettingsUpdate,
FavoriteRequest,
TrackedTelemetryRequest,
toggle_favorite,
toggle_tracked_telemetry,
update_settings,
)
class TestUpdateSettings:
@pytest.mark.asyncio
async def test_forwards_only_provided_fields(self, test_db):
result = await update_settings(
AppSettingsUpdate(
max_radio_contacts=321,
advert_interval=3600,
)
)
assert result.max_radio_contacts == 321
assert result.advert_interval == 3600
@pytest.mark.asyncio
async def test_advert_interval_below_minimum_is_clamped_to_one_hour(self, test_db):
result = await update_settings(AppSettingsUpdate(advert_interval=600))
assert result.advert_interval == 3600
@pytest.mark.asyncio
async def test_advert_interval_zero_stays_disabled(self, test_db):
result = await update_settings(AppSettingsUpdate(advert_interval=0))
assert result.advert_interval == 0
@pytest.mark.asyncio
async def test_advert_interval_above_minimum_is_preserved(self, test_db):
result = await update_settings(AppSettingsUpdate(advert_interval=86400))
assert result.advert_interval == 86400
@pytest.mark.asyncio
async def test_empty_patch_returns_current_settings(self, test_db):
result = await update_settings(AppSettingsUpdate())
# Should return default settings without error
assert isinstance(result, AppSettings)
assert result.max_radio_contacts == 200 # default
@pytest.mark.asyncio
async def test_flood_scope_round_trip(self, test_db):
"""Flood scope should be saved and retrieved correctly."""
result = await update_settings(AppSettingsUpdate(flood_scope="MyRegion"))
assert result.flood_scope == "#MyRegion"
fresh = await AppSettingsRepository.get()
assert fresh.flood_scope == "#MyRegion"
@pytest.mark.asyncio
async def test_flood_scope_default_empty(self, test_db):
"""Fresh DB should have flood_scope as empty string."""
settings = await AppSettingsRepository.get()
assert settings.flood_scope == ""
@pytest.mark.asyncio
async def test_flood_scope_whitespace_stripped(self, test_db):
"""Flood scope should be stripped of whitespace."""
result = await update_settings(AppSettingsUpdate(flood_scope=" MyRegion "))
assert result.flood_scope == "#MyRegion"
@pytest.mark.asyncio
async def test_flood_scope_existing_hash_is_not_doubled(self, test_db):
"""Existing leading hash should be preserved for backward compatibility."""
result = await update_settings(AppSettingsUpdate(flood_scope="#MyRegion"))
assert result.flood_scope == "#MyRegion"
@pytest.mark.asyncio
async def test_flood_scope_applies_to_radio(self, test_db):
"""When radio is connected, setting flood_scope calls set_flood_scope on radio."""
mock_mc = AsyncMock()
mock_mc.commands.set_flood_scope = AsyncMock()
mock_rm = AsyncMock()
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
from contextlib import asynccontextmanager
@asynccontextmanager
async def mock_radio_op(name):
yield mock_mc
mock_rm.radio_operation = mock_radio_op
with patch("app.radio.radio_manager", mock_rm):
await update_settings(AppSettingsUpdate(flood_scope="TestRegion"))
mock_mc.commands.set_flood_scope.assert_awaited_once_with("#TestRegion")
@pytest.mark.asyncio
async def test_flood_scope_empty_resets_radio(self, test_db):
"""Setting flood_scope to empty calls set_flood_scope("") on radio."""
# First set a non-empty scope
await update_settings(AppSettingsUpdate(flood_scope="#TestRegion"))
mock_mc = AsyncMock()
mock_mc.commands.set_flood_scope = AsyncMock()
mock_rm = AsyncMock()
mock_rm.is_connected = True
mock_rm.meshcore = mock_mc
from contextlib import asynccontextmanager
@asynccontextmanager
async def mock_radio_op(name):
yield mock_mc
mock_rm.radio_operation = mock_radio_op
with patch("app.radio.radio_manager", mock_rm):
await update_settings(AppSettingsUpdate(flood_scope=""))
mock_mc.commands.set_flood_scope.assert_awaited_once_with("")
class TestToggleFavorite:
@pytest.mark.asyncio
async def test_adds_when_not_favorited(self, test_db):
request = FavoriteRequest(type="contact", id="aa" * 32)
with (
patch("app.radio_sync.ensure_contact_on_radio", new_callable=AsyncMock) as mock_sync,
patch("app.routers.settings.asyncio.create_task") as mock_create_task,
):
mock_create_task.side_effect = lambda coro: coro.close()
result = await toggle_favorite(request)
assert len(result.favorites) == 1
assert result.favorites[0].type == "contact"
assert result.favorites[0].id == "aa" * 32
mock_sync.assert_called_once_with("aa" * 32, force=True)
mock_create_task.assert_called_once()
@pytest.mark.asyncio
async def test_removes_when_already_favorited(self, test_db):
# Pre-add a favorite
await AppSettingsRepository.add_favorite("contact", "aa" * 32)
request = FavoriteRequest(type="contact", id="aa" * 32)
with (
patch("app.radio_sync.ensure_contact_on_radio", new_callable=AsyncMock) as mock_sync,
patch("app.routers.settings.asyncio.create_task") as mock_create_task,
):
mock_create_task.side_effect = lambda coro: coro.close()
result = await toggle_favorite(request)
assert result.favorites == []
mock_sync.assert_not_called()
mock_create_task.assert_not_called()
class TestToggleTrackedTelemetry:
"""Tests for POST /settings/tracked-telemetry/toggle."""
async def _create_repeater(self, key: str, name: str = "TestRepeater") -> None:
await ContactRepository.upsert(
ContactUpsert(public_key=key, name=name, type=CONTACT_TYPE_REPEATER)
)
@pytest.mark.asyncio
async def test_add_repeater_to_tracking(self, test_db):
key = "aa" * 32
await self._create_repeater(key)
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
assert key in result.tracked_telemetry_repeaters
assert result.names[key] == "TestRepeater"
# Verify persisted
settings = await AppSettingsRepository.get()
assert key in settings.tracked_telemetry_repeaters
@pytest.mark.asyncio
async def test_remove_repeater_from_tracking(self, test_db):
key = "bb" * 32
await self._create_repeater(key)
await AppSettingsRepository.update(tracked_telemetry_repeaters=[key])
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
assert key not in result.tracked_telemetry_repeaters
@pytest.mark.asyncio
async def test_rejects_non_repeater_contact(self, test_db):
key = "cc" * 32
await ContactRepository.upsert(ContactUpsert(public_key=key, name="Client", type=1))
with pytest.raises(HTTPException) as exc_info:
await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_rejects_unknown_contact(self, test_db):
with pytest.raises(HTTPException) as exc_info:
await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key="dd" * 32))
assert exc_info.value.status_code == 404
@pytest.mark.asyncio
async def test_rejects_when_limit_reached(self, test_db):
existing_keys = []
for i in range(8):
key = f"{i:02x}" * 32
await self._create_repeater(key, name=f"Repeater{i}")
existing_keys.append(key)
await AppSettingsRepository.update(tracked_telemetry_repeaters=existing_keys)
new_key = "ff" * 32
await self._create_repeater(new_key, name="NewRepeater")
with pytest.raises(HTTPException) as exc_info:
await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=new_key))
assert exc_info.value.status_code == 409
detail = exc_info.value.detail
assert len(detail["tracked_telemetry_repeaters"]) == 8
@pytest.mark.asyncio
async def test_remove_still_works_when_limit_reached(self, test_db):
"""Toggling OFF an already-tracked repeater should work even at max capacity."""
keys = []
for i in range(8):
key = f"{i:02x}" * 32
await self._create_repeater(key)
keys.append(key)
await AppSettingsRepository.update(tracked_telemetry_repeaters=keys)
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=keys[0]))
assert keys[0] not in result.tracked_telemetry_repeaters
assert len(result.tracked_telemetry_repeaters) == 7