mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-05-01 11:02:56 +02:00
247 lines
9.4 KiB
Python
247 lines
9.4 KiB
Python
"""Tests for settings router endpoints and validation behavior."""
|
|
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
|
|
from app.models import CONTACT_TYPE_REPEATER, AppSettings, ContactUpsert
|
|
from app.repository import AppSettingsRepository, ContactRepository
|
|
from app.routers.settings import (
|
|
AppSettingsUpdate,
|
|
FavoriteRequest,
|
|
TrackedTelemetryRequest,
|
|
toggle_favorite,
|
|
toggle_tracked_telemetry,
|
|
update_settings,
|
|
)
|
|
|
|
|
|
class TestUpdateSettings:
|
|
@pytest.mark.asyncio
|
|
async def test_forwards_only_provided_fields(self, test_db):
|
|
result = await update_settings(
|
|
AppSettingsUpdate(
|
|
max_radio_contacts=321,
|
|
advert_interval=3600,
|
|
)
|
|
)
|
|
|
|
assert result.max_radio_contacts == 321
|
|
assert result.advert_interval == 3600
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advert_interval_below_minimum_is_clamped_to_one_hour(self, test_db):
|
|
result = await update_settings(AppSettingsUpdate(advert_interval=600))
|
|
assert result.advert_interval == 3600
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advert_interval_zero_stays_disabled(self, test_db):
|
|
result = await update_settings(AppSettingsUpdate(advert_interval=0))
|
|
assert result.advert_interval == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advert_interval_above_minimum_is_preserved(self, test_db):
|
|
result = await update_settings(AppSettingsUpdate(advert_interval=86400))
|
|
assert result.advert_interval == 86400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_patch_returns_current_settings(self, test_db):
|
|
result = await update_settings(AppSettingsUpdate())
|
|
|
|
# Should return default settings without error
|
|
assert isinstance(result, AppSettings)
|
|
assert result.max_radio_contacts == 200 # default
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flood_scope_round_trip(self, test_db):
|
|
"""Flood scope should be saved and retrieved correctly."""
|
|
result = await update_settings(AppSettingsUpdate(flood_scope="MyRegion"))
|
|
assert result.flood_scope == "#MyRegion"
|
|
|
|
fresh = await AppSettingsRepository.get()
|
|
assert fresh.flood_scope == "#MyRegion"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flood_scope_default_empty(self, test_db):
|
|
"""Fresh DB should have flood_scope as empty string."""
|
|
settings = await AppSettingsRepository.get()
|
|
assert settings.flood_scope == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flood_scope_whitespace_stripped(self, test_db):
|
|
"""Flood scope should be stripped of whitespace."""
|
|
result = await update_settings(AppSettingsUpdate(flood_scope=" MyRegion "))
|
|
assert result.flood_scope == "#MyRegion"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flood_scope_existing_hash_is_not_doubled(self, test_db):
|
|
"""Existing leading hash should be preserved for backward compatibility."""
|
|
result = await update_settings(AppSettingsUpdate(flood_scope="#MyRegion"))
|
|
assert result.flood_scope == "#MyRegion"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flood_scope_applies_to_radio(self, test_db):
|
|
"""When radio is connected, setting flood_scope calls set_flood_scope on radio."""
|
|
mock_mc = AsyncMock()
|
|
mock_mc.commands.set_flood_scope = AsyncMock()
|
|
|
|
mock_rm = AsyncMock()
|
|
mock_rm.is_connected = True
|
|
mock_rm.meshcore = mock_mc
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
@asynccontextmanager
|
|
async def mock_radio_op(name):
|
|
yield mock_mc
|
|
|
|
mock_rm.radio_operation = mock_radio_op
|
|
|
|
with patch("app.radio.radio_manager", mock_rm):
|
|
await update_settings(AppSettingsUpdate(flood_scope="TestRegion"))
|
|
|
|
mock_mc.commands.set_flood_scope.assert_awaited_once_with("#TestRegion")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_flood_scope_empty_resets_radio(self, test_db):
|
|
"""Setting flood_scope to empty calls set_flood_scope("") on radio."""
|
|
# First set a non-empty scope
|
|
await update_settings(AppSettingsUpdate(flood_scope="#TestRegion"))
|
|
|
|
mock_mc = AsyncMock()
|
|
mock_mc.commands.set_flood_scope = AsyncMock()
|
|
|
|
mock_rm = AsyncMock()
|
|
mock_rm.is_connected = True
|
|
mock_rm.meshcore = mock_mc
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
@asynccontextmanager
|
|
async def mock_radio_op(name):
|
|
yield mock_mc
|
|
|
|
mock_rm.radio_operation = mock_radio_op
|
|
|
|
with patch("app.radio.radio_manager", mock_rm):
|
|
await update_settings(AppSettingsUpdate(flood_scope=""))
|
|
|
|
mock_mc.commands.set_flood_scope.assert_awaited_once_with("")
|
|
|
|
|
|
class TestToggleFavorite:
|
|
@pytest.mark.asyncio
|
|
async def test_adds_when_not_favorited(self, test_db):
|
|
await ContactRepository.upsert(ContactUpsert(public_key="aa" * 32, name="Alice"))
|
|
request = FavoriteRequest(type="contact", id="aa" * 32)
|
|
with (
|
|
patch("app.radio_sync.ensure_contact_on_radio", new_callable=AsyncMock) as mock_sync,
|
|
patch("app.routers.settings.asyncio.create_task") as mock_create_task,
|
|
):
|
|
mock_create_task.side_effect = lambda coro: coro.close()
|
|
result = await toggle_favorite(request)
|
|
|
|
assert result.favorite is True
|
|
assert result.type == "contact"
|
|
assert result.id == "aa" * 32
|
|
mock_sync.assert_called_once_with("aa" * 32, force=True)
|
|
mock_create_task.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_removes_when_already_favorited(self, test_db):
|
|
await ContactRepository.upsert(ContactUpsert(public_key="aa" * 32, name="Alice"))
|
|
await ContactRepository.set_favorite("aa" * 32, True)
|
|
|
|
request = FavoriteRequest(type="contact", id="aa" * 32)
|
|
with (
|
|
patch("app.radio_sync.ensure_contact_on_radio", new_callable=AsyncMock) as mock_sync,
|
|
patch("app.routers.settings.asyncio.create_task") as mock_create_task,
|
|
):
|
|
mock_create_task.side_effect = lambda coro: coro.close()
|
|
result = await toggle_favorite(request)
|
|
|
|
assert result.favorite is False
|
|
mock_sync.assert_not_called()
|
|
mock_create_task.assert_not_called()
|
|
|
|
|
|
class TestToggleTrackedTelemetry:
|
|
"""Tests for POST /settings/tracked-telemetry/toggle."""
|
|
|
|
async def _create_repeater(self, key: str, name: str = "TestRepeater") -> None:
|
|
await ContactRepository.upsert(
|
|
ContactUpsert(public_key=key, name=name, type=CONTACT_TYPE_REPEATER)
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_repeater_to_tracking(self, test_db):
|
|
key = "aa" * 32
|
|
await self._create_repeater(key)
|
|
|
|
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
|
|
|
|
assert key in result.tracked_telemetry_repeaters
|
|
assert result.names[key] == "TestRepeater"
|
|
|
|
# Verify persisted
|
|
settings = await AppSettingsRepository.get()
|
|
assert key in settings.tracked_telemetry_repeaters
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_remove_repeater_from_tracking(self, test_db):
|
|
key = "bb" * 32
|
|
await self._create_repeater(key)
|
|
await AppSettingsRepository.update(tracked_telemetry_repeaters=[key])
|
|
|
|
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
|
|
|
|
assert key not in result.tracked_telemetry_repeaters
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rejects_non_repeater_contact(self, test_db):
|
|
key = "cc" * 32
|
|
await ContactRepository.upsert(ContactUpsert(public_key=key, name="Client", type=1))
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=key))
|
|
assert exc_info.value.status_code == 400
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rejects_unknown_contact(self, test_db):
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key="dd" * 32))
|
|
assert exc_info.value.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rejects_when_limit_reached(self, test_db):
|
|
existing_keys = []
|
|
for i in range(8):
|
|
key = f"{i:02x}" * 32
|
|
await self._create_repeater(key, name=f"Repeater{i}")
|
|
existing_keys.append(key)
|
|
await AppSettingsRepository.update(tracked_telemetry_repeaters=existing_keys)
|
|
|
|
new_key = "ff" * 32
|
|
await self._create_repeater(new_key, name="NewRepeater")
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=new_key))
|
|
assert exc_info.value.status_code == 409
|
|
detail = exc_info.value.detail
|
|
assert len(detail["tracked_telemetry_repeaters"]) == 8
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_remove_still_works_when_limit_reached(self, test_db):
|
|
"""Toggling OFF an already-tracked repeater should work even at max capacity."""
|
|
keys = []
|
|
for i in range(8):
|
|
key = f"{i:02x}" * 32
|
|
await self._create_repeater(key)
|
|
keys.append(key)
|
|
await AppSettingsRepository.update(tracked_telemetry_repeaters=keys)
|
|
|
|
result = await toggle_tracked_telemetry(TrackedTelemetryRequest(public_key=keys[0]))
|
|
assert keys[0] not in result.tracked_telemetry_repeaters
|
|
assert len(result.tracked_telemetry_repeaters) == 7
|