mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-06-26 04:51:21 +02:00
Testing blitz!
This commit is contained in:
@@ -0,0 +1,217 @@
|
||||
"""Tests for radio router endpoint logic."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from meshcore import EventType
|
||||
|
||||
from app.routers.radio import (
|
||||
PrivateKeyUpdate,
|
||||
RadioConfigResponse,
|
||||
RadioConfigUpdate,
|
||||
RadioSettings,
|
||||
get_radio_config,
|
||||
reboot_radio,
|
||||
reconnect_radio,
|
||||
send_advertisement,
|
||||
set_private_key,
|
||||
update_radio_config,
|
||||
)
|
||||
|
||||
|
||||
def _radio_result(event_type=EventType.OK, payload=None):
|
||||
result = MagicMock()
|
||||
result.type = event_type
|
||||
result.payload = payload or {}
|
||||
return result
|
||||
|
||||
|
||||
def _mock_meshcore_with_info():
|
||||
mc = MagicMock()
|
||||
mc.self_info = {
|
||||
"public_key": "aa" * 32,
|
||||
"name": "NodeA",
|
||||
"adv_lat": 10.0,
|
||||
"adv_lon": 20.0,
|
||||
"tx_power": 17,
|
||||
"max_tx_power": 22,
|
||||
"radio_freq": 910.525,
|
||||
"radio_bw": 62.5,
|
||||
"radio_sf": 7,
|
||||
"radio_cr": 5,
|
||||
}
|
||||
mc.commands = MagicMock()
|
||||
mc.commands.set_name = AsyncMock()
|
||||
mc.commands.set_coords = AsyncMock()
|
||||
mc.commands.set_tx_power = AsyncMock()
|
||||
mc.commands.set_radio = AsyncMock()
|
||||
mc.commands.send_appstart = AsyncMock()
|
||||
mc.commands.import_private_key = AsyncMock(return_value=_radio_result())
|
||||
return mc
|
||||
|
||||
|
||||
class TestGetRadioConfig:
|
||||
@pytest.mark.asyncio
|
||||
async def test_maps_self_info_to_response(self):
|
||||
mc = _mock_meshcore_with_info()
|
||||
with patch("app.routers.radio.require_connected", return_value=mc):
|
||||
response = await get_radio_config()
|
||||
|
||||
assert response.public_key == "aa" * 32
|
||||
assert response.name == "NodeA"
|
||||
assert response.lat == 10.0
|
||||
assert response.lon == 20.0
|
||||
assert response.radio.freq == 910.525
|
||||
assert response.radio.cr == 5
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_503_when_self_info_missing(self):
|
||||
mc = MagicMock()
|
||||
mc.self_info = None
|
||||
with patch("app.routers.radio.require_connected", return_value=mc):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await get_radio_config()
|
||||
|
||||
assert exc.value.status_code == 503
|
||||
|
||||
|
||||
class TestUpdateRadioConfig:
|
||||
@pytest.mark.asyncio
|
||||
async def test_updates_only_requested_fields_and_refreshes_info(self):
|
||||
mc = _mock_meshcore_with_info()
|
||||
expected = RadioConfigResponse(
|
||||
public_key="aa" * 32,
|
||||
name="NodeUpdated",
|
||||
lat=1.23,
|
||||
lon=20.0,
|
||||
tx_power=17,
|
||||
max_tx_power=22,
|
||||
radio=RadioSettings(freq=910.525, bw=62.5, sf=7, cr=5),
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.routers.radio.require_connected", return_value=mc),
|
||||
patch("app.routers.radio.sync_radio_time", new_callable=AsyncMock) as mock_sync_time,
|
||||
patch(
|
||||
"app.routers.radio.get_radio_config", new_callable=AsyncMock, return_value=expected
|
||||
),
|
||||
):
|
||||
result = await update_radio_config(RadioConfigUpdate(name="NodeUpdated", lat=1.23))
|
||||
|
||||
mc.commands.set_name.assert_awaited_once_with("NodeUpdated")
|
||||
mc.commands.set_coords.assert_awaited_once_with(lat=1.23, lon=20.0)
|
||||
mc.commands.set_tx_power.assert_not_awaited()
|
||||
mc.commands.set_radio.assert_not_awaited()
|
||||
mc.commands.send_appstart.assert_awaited_once()
|
||||
mock_sync_time.assert_awaited_once()
|
||||
assert result == expected
|
||||
|
||||
|
||||
class TestPrivateKeyImport:
|
||||
@pytest.mark.asyncio
|
||||
async def test_rejects_invalid_hex(self):
|
||||
mc = _mock_meshcore_with_info()
|
||||
with patch("app.routers.radio.require_connected", return_value=mc):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await set_private_key(PrivateKeyUpdate(private_key="not-hex"))
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_500_on_radio_error(self):
|
||||
mc = _mock_meshcore_with_info()
|
||||
mc.commands.import_private_key = AsyncMock(
|
||||
return_value=_radio_result(EventType.ERROR, {"error": "failed"})
|
||||
)
|
||||
with patch("app.routers.radio.require_connected", return_value=mc):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await set_private_key(PrivateKeyUpdate(private_key="aa" * 64))
|
||||
|
||||
assert exc.value.status_code == 500
|
||||
|
||||
|
||||
class TestAdvertise:
|
||||
@pytest.mark.asyncio
|
||||
async def test_raises_when_send_fails(self):
|
||||
with (
|
||||
patch("app.routers.radio.require_connected"),
|
||||
patch(
|
||||
"app.routers.radio.do_send_advertisement",
|
||||
new_callable=AsyncMock,
|
||||
return_value=False,
|
||||
),
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await send_advertisement()
|
||||
|
||||
assert exc.value.status_code == 500
|
||||
|
||||
|
||||
class TestRebootAndReconnect:
|
||||
@pytest.mark.asyncio
|
||||
async def test_reboot_connected_sends_reboot_command(self):
|
||||
mock_rm = MagicMock()
|
||||
mock_rm.is_connected = True
|
||||
mock_rm.meshcore = MagicMock()
|
||||
mock_rm.meshcore.commands.reboot = AsyncMock()
|
||||
|
||||
with patch("app.radio.radio_manager", mock_rm):
|
||||
result = await reboot_radio()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
mock_rm.meshcore.commands.reboot.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reboot_returns_pending_when_reconnect_in_progress(self):
|
||||
mock_rm = MagicMock()
|
||||
mock_rm.is_connected = False
|
||||
mock_rm.meshcore = None
|
||||
mock_rm.is_reconnecting = True
|
||||
|
||||
with patch("app.radio.radio_manager", mock_rm):
|
||||
result = await reboot_radio()
|
||||
|
||||
assert result["status"] == "pending"
|
||||
assert result["connected"] is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reboot_attempts_reconnect_when_disconnected(self):
|
||||
mock_rm = MagicMock()
|
||||
mock_rm.is_connected = False
|
||||
mock_rm.meshcore = None
|
||||
mock_rm.is_reconnecting = False
|
||||
mock_rm.reconnect = AsyncMock(return_value=True)
|
||||
mock_rm.post_connect_setup = AsyncMock()
|
||||
|
||||
with patch("app.radio.radio_manager", mock_rm):
|
||||
result = await reboot_radio()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert result["connected"] is True
|
||||
mock_rm.reconnect.assert_awaited_once()
|
||||
mock_rm.post_connect_setup.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_returns_already_connected(self):
|
||||
mock_rm = MagicMock()
|
||||
mock_rm.is_connected = True
|
||||
|
||||
with patch("app.radio.radio_manager", mock_rm):
|
||||
result = await reconnect_radio()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert result["connected"] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reconnect_raises_503_on_failure(self):
|
||||
mock_rm = MagicMock()
|
||||
mock_rm.is_connected = False
|
||||
mock_rm.is_reconnecting = False
|
||||
mock_rm.reconnect = AsyncMock(return_value=False)
|
||||
|
||||
with patch("app.radio.radio_manager", mock_rm):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await reconnect_radio()
|
||||
|
||||
assert exc.value.status_code == 503
|
||||
@@ -0,0 +1,286 @@
|
||||
"""Tests for repeater-specific contacts routes (telemetry, command, trace)."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
from meshcore import EventType
|
||||
|
||||
from app.models import CommandRequest, Contact, TelemetryRequest
|
||||
from app.routers.contacts import request_telemetry, request_trace, send_repeater_command
|
||||
|
||||
KEY_A = "aa" * 32
|
||||
|
||||
|
||||
def _radio_result(event_type=EventType.OK, payload=None):
|
||||
result = MagicMock()
|
||||
result.type = event_type
|
||||
result.payload = payload or {}
|
||||
return result
|
||||
|
||||
|
||||
def _make_contact(public_key: str, contact_type: int, name: str = "Node") -> Contact:
|
||||
return Contact(public_key=public_key, name=name, type=contact_type)
|
||||
|
||||
|
||||
def _mock_mc():
|
||||
mc = MagicMock()
|
||||
mc.commands = MagicMock()
|
||||
mc.commands.req_status_sync = AsyncMock()
|
||||
mc.commands.fetch_all_neighbours = AsyncMock()
|
||||
mc.commands.req_acl_sync = AsyncMock()
|
||||
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.commands.get_msg = AsyncMock()
|
||||
mc.commands.add_contact = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.commands.send_trace = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.wait_for_event = AsyncMock()
|
||||
mc.stop_auto_message_fetching = AsyncMock()
|
||||
mc.start_auto_message_fetching = AsyncMock()
|
||||
return mc
|
||||
|
||||
|
||||
class TestTelemetryRoute:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_404_when_contact_missing(self):
|
||||
mc = _mock_mc()
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
),
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
|
||||
|
||||
assert exc.value.status_code == 404
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_400_for_non_repeater_contact(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=1, name="Client")
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
assert "not a repeater" in exc.value.detail.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_retry_timeout_returns_504(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=2, name="Repeater")
|
||||
mc.commands.req_status_sync = AsyncMock(side_effect=[None, None, None])
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
patch(
|
||||
"app.routers.contacts.prepare_repeater_connection",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_prepare,
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
|
||||
|
||||
assert exc.value.status_code == 504
|
||||
assert mc.commands.req_status_sync.await_count == 3
|
||||
mock_prepare.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clock_timeout_uses_fallback_message_and_restores_auto_fetch(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=2, name="Repeater")
|
||||
mc.commands.req_status_sync = AsyncMock(
|
||||
return_value={
|
||||
"pubkey_pre": "aaaaaaaaaaaa",
|
||||
"bat": 3775,
|
||||
"uptime": 1234,
|
||||
}
|
||||
)
|
||||
mc.commands.fetch_all_neighbours = AsyncMock(
|
||||
return_value={"neighbours": [{"pubkey": "abc123def456", "snr": 9.0, "secs_ago": 5}]}
|
||||
)
|
||||
mc.commands.req_acl_sync = AsyncMock(return_value=[{"key": "def456abc123", "perm": 2}])
|
||||
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.wait_for_event = AsyncMock(side_effect=[None, None]) # two clock attempts, no response
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=None,
|
||||
),
|
||||
patch(
|
||||
"app.routers.contacts.prepare_repeater_connection",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_prepare,
|
||||
):
|
||||
response = await request_telemetry(KEY_A, TelemetryRequest(password="pw"))
|
||||
|
||||
assert response.pubkey_prefix == "aaaaaaaaaaaa"
|
||||
assert response.battery_volts == 3.775
|
||||
assert response.clock_output is not None
|
||||
assert "unable to fetch `clock` output" in response.clock_output.lower()
|
||||
mock_prepare.assert_awaited_once()
|
||||
mc.stop_auto_message_fetching.assert_awaited_once()
|
||||
mc.start_auto_message_fetching.assert_awaited_once()
|
||||
|
||||
|
||||
class TestRepeaterCommandRoute:
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_cmd_error_raises_and_restores_auto_fetch(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=2, name="Repeater")
|
||||
mc.commands.send_cmd = AsyncMock(
|
||||
return_value=_radio_result(EventType.ERROR, {"err": "bad"})
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
||||
|
||||
assert exc.value.status_code == 500
|
||||
mc.start_auto_message_fetching.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timeout_returns_no_response_message(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=2, name="Repeater")
|
||||
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.wait_for_event = AsyncMock(return_value=None)
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
):
|
||||
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
||||
|
||||
assert response.command == "ver"
|
||||
assert "no response" in response.response.lower()
|
||||
mc.start_auto_message_fetching.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_success_returns_command_response_text_and_timestamp(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=2, name="Repeater")
|
||||
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.wait_for_event = AsyncMock(return_value=MagicMock())
|
||||
mc.commands.get_msg = AsyncMock(
|
||||
return_value=_radio_result(
|
||||
EventType.CONTACT_MSG_RECV,
|
||||
{"text": "firmware: v1.2.3", "timestamp": 1700000000},
|
||||
)
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
):
|
||||
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
||||
|
||||
assert response.command == "ver"
|
||||
assert response.response == "firmware: v1.2.3"
|
||||
assert response.sender_timestamp == 1700000000
|
||||
|
||||
|
||||
class TestTraceRoute:
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_trace_error_returns_500(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=1, name="Client")
|
||||
mc.commands.send_trace = AsyncMock(
|
||||
return_value=_radio_result(EventType.ERROR, {"err": "x"})
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
patch("app.routers.contacts.random.randint", return_value=1234),
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await request_trace(KEY_A)
|
||||
|
||||
assert exc.value.status_code == 500
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_timeout_returns_504(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=1, name="Client")
|
||||
mc.commands.send_trace = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.wait_for_event = AsyncMock(return_value=None)
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
patch("app.routers.contacts.random.randint", return_value=1234),
|
||||
):
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await request_trace(KEY_A)
|
||||
|
||||
assert exc.value.status_code == 504
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_success_returns_remote_and_local_snr(self):
|
||||
mc = _mock_mc()
|
||||
contact = _make_contact(KEY_A, contact_type=1, name="Client")
|
||||
mc.commands.send_trace = AsyncMock(return_value=_radio_result(EventType.OK))
|
||||
mc.wait_for_event = AsyncMock(
|
||||
return_value=MagicMock(payload={"path": [{"snr": 5.5}, {"snr": 3.2}], "path_len": 2})
|
||||
)
|
||||
|
||||
with (
|
||||
patch("app.routers.contacts.require_connected", return_value=mc),
|
||||
patch(
|
||||
"app.routers.contacts.ContactRepository.get_by_key_or_prefix",
|
||||
new_callable=AsyncMock,
|
||||
return_value=contact,
|
||||
),
|
||||
patch("app.routers.contacts.random.randint", return_value=1234),
|
||||
):
|
||||
response = await request_trace(KEY_A)
|
||||
|
||||
assert response.remote_snr == 5.5
|
||||
assert response.local_snr == 3.2
|
||||
assert response.path_len == 2
|
||||
@@ -415,3 +415,100 @@ class TestMessageRepositoryGetAckCount:
|
||||
result = await MessageRepository.get_ack_count(message_id=42)
|
||||
|
||||
assert result == 0
|
||||
|
||||
|
||||
class TestAppSettingsRepository:
|
||||
"""Test AppSettingsRepository parsing and migration edge cases."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_handles_corrupted_json_and_invalid_sort_order(self):
|
||||
"""Corrupted JSON fields are recovered with safe defaults."""
|
||||
mock_conn = AsyncMock()
|
||||
mock_cursor = AsyncMock()
|
||||
mock_cursor.fetchone = AsyncMock(
|
||||
return_value={
|
||||
"max_radio_contacts": 250,
|
||||
"favorites": "{not-json",
|
||||
"auto_decrypt_dm_on_advert": 1,
|
||||
"sidebar_sort_order": "invalid",
|
||||
"last_message_times": "{also-not-json",
|
||||
"preferences_migrated": 0,
|
||||
"advert_interval": None,
|
||||
"last_advert_time": None,
|
||||
"bots": "{bad-bots-json",
|
||||
}
|
||||
)
|
||||
mock_conn.execute = AsyncMock(return_value=mock_cursor)
|
||||
mock_db = MagicMock()
|
||||
mock_db.conn = mock_conn
|
||||
|
||||
with patch("app.repository.db", mock_db):
|
||||
from app.repository import AppSettingsRepository
|
||||
|
||||
settings = await AppSettingsRepository.get()
|
||||
|
||||
assert settings.max_radio_contacts == 250
|
||||
assert settings.favorites == []
|
||||
assert settings.last_message_times == {}
|
||||
assert settings.sidebar_sort_order == "recent"
|
||||
assert settings.bots == []
|
||||
assert settings.advert_interval == 0
|
||||
assert settings.last_advert_time == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_favorite_is_idempotent(self):
|
||||
"""Adding an existing favorite does not write duplicate entries."""
|
||||
from app.models import AppSettings, Favorite
|
||||
|
||||
existing = AppSettings(favorites=[Favorite(type="contact", id="aa" * 32)])
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.repository.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=existing,
|
||||
),
|
||||
patch(
|
||||
"app.repository.AppSettingsRepository.update",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_update,
|
||||
):
|
||||
from app.repository import AppSettingsRepository
|
||||
|
||||
result = await AppSettingsRepository.add_favorite("contact", "aa" * 32)
|
||||
|
||||
assert result == existing
|
||||
mock_update.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_migrate_preferences_uses_recent_for_invalid_sort_order(self):
|
||||
"""Migration normalizes invalid sort order to 'recent'."""
|
||||
from app.models import AppSettings
|
||||
|
||||
current = AppSettings(preferences_migrated=False)
|
||||
migrated = AppSettings(preferences_migrated=True, sidebar_sort_order="recent")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.repository.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=current,
|
||||
),
|
||||
patch(
|
||||
"app.repository.AppSettingsRepository.update",
|
||||
new_callable=AsyncMock,
|
||||
return_value=migrated,
|
||||
) as mock_update,
|
||||
):
|
||||
from app.repository import AppSettingsRepository
|
||||
|
||||
result, did_migrate = await AppSettingsRepository.migrate_preferences_from_frontend(
|
||||
favorites=[{"type": "contact", "id": "bb" * 32}],
|
||||
sort_order="weird-order",
|
||||
last_message_times={"contact-bbbbbbbbbbbb": 123},
|
||||
)
|
||||
|
||||
assert did_migrate is True
|
||||
assert result.preferences_migrated is True
|
||||
assert mock_update.call_args.kwargs["sidebar_sort_order"] == "recent"
|
||||
assert mock_update.call_args.kwargs["preferences_migrated"] is True
|
||||
|
||||
@@ -0,0 +1,194 @@
|
||||
"""Tests for settings router endpoints and validation behavior."""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
from app.models import AppSettings, BotConfig, Favorite
|
||||
from app.routers.settings import (
|
||||
AppSettingsUpdate,
|
||||
FavoriteRequest,
|
||||
MigratePreferencesRequest,
|
||||
migrate_preferences,
|
||||
toggle_favorite,
|
||||
update_settings,
|
||||
)
|
||||
|
||||
|
||||
def _settings(
|
||||
*,
|
||||
favorites: list[Favorite] | None = None,
|
||||
migrated: bool = False,
|
||||
max_radio_contacts: int = 200,
|
||||
) -> AppSettings:
|
||||
return AppSettings(
|
||||
max_radio_contacts=max_radio_contacts,
|
||||
favorites=favorites or [],
|
||||
auto_decrypt_dm_on_advert=False,
|
||||
sidebar_sort_order="recent",
|
||||
last_message_times={},
|
||||
preferences_migrated=migrated,
|
||||
advert_interval=0,
|
||||
last_advert_time=0,
|
||||
bots=[],
|
||||
)
|
||||
|
||||
|
||||
class TestUpdateSettings:
|
||||
@pytest.mark.asyncio
|
||||
async def test_forwards_only_provided_fields(self):
|
||||
updated = _settings(max_radio_contacts=321)
|
||||
with patch(
|
||||
"app.routers.settings.AppSettingsRepository.update",
|
||||
new_callable=AsyncMock,
|
||||
return_value=updated,
|
||||
) as mock_update:
|
||||
result = await update_settings(
|
||||
AppSettingsUpdate(max_radio_contacts=321, advert_interval=3600)
|
||||
)
|
||||
|
||||
assert result.max_radio_contacts == 321
|
||||
assert mock_update.call_count == 1
|
||||
assert mock_update.call_args.kwargs == {
|
||||
"max_radio_contacts": 321,
|
||||
"advert_interval": 3600,
|
||||
}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_patch_returns_current_settings(self):
|
||||
current = _settings()
|
||||
with (
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=current,
|
||||
) as mock_get,
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.update",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_update,
|
||||
):
|
||||
result = await update_settings(AppSettingsUpdate())
|
||||
|
||||
assert result == current
|
||||
mock_get.assert_awaited_once()
|
||||
mock_update.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_bot_syntax_returns_400(self):
|
||||
bad_bot = BotConfig(
|
||||
id="bot-1",
|
||||
name="BadBot",
|
||||
enabled=True,
|
||||
code="def bot(:\n return 'x'\n",
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPException) as exc:
|
||||
await update_settings(AppSettingsUpdate(bots=[bad_bot]))
|
||||
|
||||
assert exc.value.status_code == 400
|
||||
assert "syntax error" in exc.value.detail.lower()
|
||||
|
||||
|
||||
class TestToggleFavorite:
|
||||
@pytest.mark.asyncio
|
||||
async def test_adds_when_not_favorited(self):
|
||||
initial = _settings(favorites=[])
|
||||
updated = _settings(favorites=[Favorite(type="contact", id="aa" * 32)])
|
||||
request = FavoriteRequest(type="contact", id="aa" * 32)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=initial,
|
||||
),
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.add_favorite",
|
||||
new_callable=AsyncMock,
|
||||
return_value=updated,
|
||||
) as mock_add,
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.remove_favorite",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_remove,
|
||||
):
|
||||
result = await toggle_favorite(request)
|
||||
|
||||
assert result.favorites == updated.favorites
|
||||
mock_add.assert_awaited_once_with("contact", "aa" * 32)
|
||||
mock_remove.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_removes_when_already_favorited(self):
|
||||
initial = _settings(favorites=[Favorite(type="channel", id="ABCD")])
|
||||
updated = _settings(favorites=[])
|
||||
request = FavoriteRequest(type="channel", id="ABCD")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.get",
|
||||
new_callable=AsyncMock,
|
||||
return_value=initial,
|
||||
),
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.remove_favorite",
|
||||
new_callable=AsyncMock,
|
||||
return_value=updated,
|
||||
) as mock_remove,
|
||||
patch(
|
||||
"app.routers.settings.AppSettingsRepository.add_favorite",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_add,
|
||||
):
|
||||
result = await toggle_favorite(request)
|
||||
|
||||
assert result.favorites == []
|
||||
mock_remove.assert_awaited_once_with("channel", "ABCD")
|
||||
mock_add.assert_not_awaited()
|
||||
|
||||
|
||||
class TestMigratePreferences:
|
||||
@pytest.mark.asyncio
|
||||
async def test_maps_frontend_payload_and_returns_migrated_true(self):
|
||||
request = MigratePreferencesRequest(
|
||||
favorites=[FavoriteRequest(type="contact", id="aa" * 32)],
|
||||
sort_order="alpha",
|
||||
last_message_times={"contact-aaaaaaaaaaaa": 123},
|
||||
)
|
||||
settings = _settings(favorites=[Favorite(type="contact", id="aa" * 32)], migrated=True)
|
||||
|
||||
with patch(
|
||||
"app.routers.settings.AppSettingsRepository.migrate_preferences_from_frontend",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(settings, True),
|
||||
) as mock_migrate:
|
||||
response = await migrate_preferences(request)
|
||||
|
||||
assert response.migrated is True
|
||||
assert response.settings == settings
|
||||
assert mock_migrate.call_args.kwargs == {
|
||||
"favorites": [{"type": "contact", "id": "aa" * 32}],
|
||||
"sort_order": "alpha",
|
||||
"last_message_times": {"contact-aaaaaaaaaaaa": 123},
|
||||
}
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_migrated_false_when_already_done(self):
|
||||
request = MigratePreferencesRequest(
|
||||
favorites=[],
|
||||
sort_order="recent",
|
||||
last_message_times={},
|
||||
)
|
||||
settings = _settings(migrated=True)
|
||||
|
||||
with patch(
|
||||
"app.routers.settings.AppSettingsRepository.migrate_preferences_from_frontend",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(settings, False),
|
||||
):
|
||||
response = await migrate_preferences(request)
|
||||
|
||||
assert response.migrated is False
|
||||
assert response.settings.preferences_migrated is True
|
||||
Reference in New Issue
Block a user