mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
1193 lines
44 KiB
Python
1193 lines
44 KiB
Python
"""Tests for repeater-specific contacts routes (telemetry, command, trace)."""
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
from fastapi import HTTPException
|
|
from meshcore import EventType
|
|
|
|
from app.models import CommandRequest, Contact, RepeaterLoginRequest
|
|
from app.radio import radio_manager
|
|
from app.repository import ContactRepository
|
|
from app.routers.contacts import request_trace
|
|
from app.routers.repeaters import (
|
|
_batch_cli_fetch,
|
|
_fetch_repeater_response,
|
|
repeater_acl,
|
|
repeater_advert_intervals,
|
|
repeater_login,
|
|
repeater_lpp_telemetry,
|
|
repeater_neighbors,
|
|
repeater_owner_info,
|
|
repeater_radio_settings,
|
|
repeater_status,
|
|
send_repeater_command,
|
|
)
|
|
|
|
KEY_A = "aa" * 32
|
|
|
|
# Patch target for the wall-clock wrapper used by _fetch_repeater_response.
|
|
# We patch _monotonic (not time.monotonic) to avoid breaking the asyncio event loop.
|
|
_MONOTONIC = "app.routers.repeaters._monotonic"
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _reset_radio_state():
|
|
"""Save/restore radio_manager state so tests don't leak."""
|
|
prev = radio_manager._meshcore
|
|
prev_lock = radio_manager._operation_lock
|
|
yield
|
|
radio_manager._meshcore = prev
|
|
radio_manager._operation_lock = prev_lock
|
|
|
|
|
|
def _radio_result(event_type=EventType.OK, payload=None):
|
|
result = MagicMock()
|
|
result.type = event_type
|
|
result.payload = payload or {}
|
|
return result
|
|
|
|
|
|
async def _insert_contact(public_key: str, name: str = "Node", contact_type: int = 0):
|
|
"""Insert a contact into the test database."""
|
|
await ContactRepository.upsert(
|
|
{
|
|
"public_key": public_key,
|
|
"name": name,
|
|
"type": contact_type,
|
|
"flags": 0,
|
|
"last_path": None,
|
|
"last_path_len": -1,
|
|
"last_advert": None,
|
|
"lat": None,
|
|
"lon": None,
|
|
"last_seen": None,
|
|
"on_radio": False,
|
|
"last_contacted": None,
|
|
"first_seen": None,
|
|
}
|
|
)
|
|
|
|
|
|
def _mock_mc():
|
|
mc = MagicMock()
|
|
mc.commands = MagicMock()
|
|
mc.commands.req_status_sync = AsyncMock()
|
|
mc.commands.fetch_all_neighbours = AsyncMock()
|
|
mc.commands.req_acl_sync = AsyncMock()
|
|
mc.commands.req_telemetry_sync = AsyncMock()
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.get_msg = AsyncMock()
|
|
mc.commands.add_contact = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.send_trace = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.wait_for_event = AsyncMock()
|
|
mc.stop_auto_message_fetching = AsyncMock()
|
|
mc.start_auto_message_fetching = AsyncMock()
|
|
return mc
|
|
|
|
|
|
def _advancing_clock(start=0.0, step=0.1):
|
|
"""Return a callable for _monotonic that advances by `step` each call."""
|
|
t = start
|
|
|
|
def _tick():
|
|
nonlocal t
|
|
val = t
|
|
t += step
|
|
return val
|
|
|
|
return _tick
|
|
|
|
|
|
class TestFetchRepeaterResponse:
|
|
"""Tests for the _fetch_repeater_response helper."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_matching_cli_response(self):
|
|
mc = _mock_mc()
|
|
mc.commands.get_msg = AsyncMock(
|
|
return_value=_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
|
|
)
|
|
)
|
|
|
|
with patch(_MONOTONIC, side_effect=_advancing_clock()):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ok"
|
|
mc.commands.get_msg.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rejects_same_sender_non_cli_message(self):
|
|
"""A txt_type=0 message from the target repeater is NOT accepted as the CLI response."""
|
|
mc = _mock_mc()
|
|
non_cli = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "chat msg", "txt_type": 0},
|
|
)
|
|
cli_response = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[non_cli, cli_response])
|
|
|
|
with patch(_MONOTONIC, side_effect=_advancing_clock()):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ver 1.0"
|
|
assert mc.commands.get_msg.await_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unrelated_dm_is_skipped(self):
|
|
"""Unrelated DMs are skipped (dispatcher already handled them)."""
|
|
mc = _mock_mc()
|
|
unrelated = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "bbbbbbbbbbbb", "text": "hello", "txt_type": 0},
|
|
)
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected])
|
|
|
|
with patch(_MONOTONIC, side_effect=_advancing_clock()):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ver 1.0"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_message_is_skipped(self):
|
|
mc = _mock_mc()
|
|
channel_msg = _radio_result(
|
|
EventType.CHANNEL_MSG_RECV,
|
|
{"channel_idx": 0, "text": "flood msg"},
|
|
)
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected])
|
|
|
|
with patch(_MONOTONIC, side_effect=_advancing_clock()):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_more_msgs_retries_then_succeeds(self):
|
|
mc = _mock_mc()
|
|
no_msgs = _radio_result(EventType.NO_MORE_MSGS)
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[no_msgs, expected])
|
|
|
|
with (
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ok"
|
|
assert mc.commands.get_msg.await_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_returns_none_after_deadline(self):
|
|
"""Returns None when wall-clock deadline expires."""
|
|
mc = _mock_mc()
|
|
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
|
|
|
|
# Start at 100.0, jump past deadline (timeout=2.0) after 2 get_msg calls
|
|
times = iter([100.0, 100.5, 101.0, 103.0])
|
|
|
|
with (
|
|
patch(_MONOTONIC, side_effect=times),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=2.0)
|
|
|
|
assert result is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_error_retries_then_succeeds(self):
|
|
mc = _mock_mc()
|
|
error = _radio_result(EventType.ERROR, {"err": "busy"})
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ok", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[error, expected])
|
|
|
|
with (
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=5.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_high_traffic_does_not_exhaust_budget(self):
|
|
"""Many unrelated messages don't prevent eventual success (wall-clock deadline)."""
|
|
mc = _mock_mc()
|
|
# 20 unrelated DMs followed by the expected CLI response
|
|
unrelated = [
|
|
_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": f"{i:012x}", "text": f"msg {i}", "txt_type": 0},
|
|
)
|
|
for i in range(20)
|
|
]
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "aaaaaaaaaaaa", "text": "ver 1.0", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[*unrelated, expected])
|
|
|
|
with patch(_MONOTONIC, side_effect=_advancing_clock()):
|
|
result = await _fetch_repeater_response(mc, "aaaaaaaaaaaa", timeout=30.0)
|
|
|
|
assert result is not None
|
|
assert result.payload["text"] == "ver 1.0"
|
|
assert mc.commands.get_msg.await_count == 21
|
|
|
|
|
|
class TestRepeaterCommandRoute:
|
|
@pytest.mark.asyncio
|
|
async def test_send_cmd_error_raises_and_restores_auto_fetch(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "bad"})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert exc.value.status_code == 500
|
|
mc.start_auto_message_fetching.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_no_response_message(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
|
|
|
|
# Expire the deadline after a couple of ticks
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=[0.0, 5.0, 25.0]),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.command == "ver"
|
|
assert "no response" in response.response.lower()
|
|
mc.start_auto_message_fetching.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_success_returns_command_response_text_and_sender_timestamp(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.get_msg = AsyncMock(
|
|
return_value=_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{
|
|
"pubkey_prefix": KEY_A[:12],
|
|
"text": "firmware: v1.2.3",
|
|
"sender_timestamp": 1700000000,
|
|
"txt_type": 1,
|
|
},
|
|
)
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.command == "ver"
|
|
assert response.response == "firmware: v1.2.3"
|
|
assert response.sender_timestamp == 1700000000
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_response_strips_firmware_prompt_prefix(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.get_msg = AsyncMock(
|
|
return_value=_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{
|
|
"pubkey_prefix": KEY_A[:12],
|
|
"text": "> firmware: v1.2.3",
|
|
"sender_timestamp": 1700000000,
|
|
"txt_type": 1,
|
|
},
|
|
)
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.response == "firmware: v1.2.3"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_success_falls_back_to_legacy_timestamp_field(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.get_msg = AsyncMock(
|
|
return_value=_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{
|
|
"pubkey_prefix": KEY_A[:12],
|
|
"text": "firmware: v1.2.3",
|
|
"timestamp": 1700000000,
|
|
"txt_type": 1,
|
|
},
|
|
)
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.command == "ver"
|
|
assert response.response == "firmware: v1.2.3"
|
|
assert response.sender_timestamp == 1700000000
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unrelated_dm_during_command_does_not_prevent_success(self, test_db):
|
|
"""Unrelated DMs arriving during command wait are skipped; correct response returned."""
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
|
|
unrelated = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": "bbbbbbbbbbbb", "text": "hello from someone", "txt_type": 0},
|
|
)
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "ver 1.0", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[unrelated, expected])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.command == "ver"
|
|
assert response.response == "ver 1.0"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_channel_message_during_command_is_skipped(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
|
|
channel_msg = _radio_result(
|
|
EventType.CHANNEL_MSG_RECV,
|
|
{"channel_idx": 0, "text": "flood msg"},
|
|
)
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "ok", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[channel_msg, expected])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.command == "ver"
|
|
assert response.response == "ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_more_msgs_then_response_succeeds(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
|
|
no_msgs = _radio_result(EventType.NO_MORE_MSGS)
|
|
expected = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "done", "txt_type": 1},
|
|
)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[no_msgs, expected])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
response = await send_repeater_command(KEY_A, CommandRequest(command="ver"))
|
|
|
|
assert response.command == "ver"
|
|
assert response.response == "done"
|
|
|
|
|
|
class TestTraceRoute:
|
|
@pytest.mark.asyncio
|
|
async def test_send_trace_error_returns_500(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
mc.commands.send_trace = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "x"})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.contacts.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.contacts.random.randint", return_value=1234),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await request_trace(KEY_A)
|
|
|
|
assert exc.value.status_code == 500
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_timeout_returns_504(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
mc.commands.send_trace = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.wait_for_event = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.contacts.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.contacts.random.randint", return_value=1234),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await request_trace(KEY_A)
|
|
|
|
assert exc.value.status_code == 504
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_success_returns_remote_and_local_snr(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
mc.commands.send_trace = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.wait_for_event = AsyncMock(
|
|
return_value=MagicMock(payload={"path": [{"snr": 5.5}, {"snr": 3.2}], "path_len": 2})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.contacts.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.contacts.random.randint", return_value=1234),
|
|
):
|
|
response = await request_trace(KEY_A)
|
|
|
|
assert response.remote_snr == 5.5
|
|
assert response.local_snr == 3.2
|
|
assert response.path_len == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests for new granular repeater endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRepeaterLogin:
|
|
@pytest.mark.asyncio
|
|
async def test_success(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(
|
|
"app.routers.repeaters.prepare_repeater_connection",
|
|
new_callable=AsyncMock,
|
|
) as mock_prepare,
|
|
):
|
|
response = await repeater_login(KEY_A, RepeaterLoginRequest(password="secret"))
|
|
|
|
assert response.status == "ok"
|
|
mock_prepare.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_404_missing_contact(self, test_db):
|
|
mc = _mock_mc()
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_login(KEY_A, RepeaterLoginRequest(password="pw"))
|
|
assert exc.value.status_code == 404
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_400_not_repeater(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_login(KEY_A, RepeaterLoginRequest(password="pw"))
|
|
assert exc.value.status_code == 400
|
|
assert "not a repeater" in exc.value.detail.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_login_error_raises(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
|
|
async def _prepare_fail(*args, **kwargs):
|
|
raise HTTPException(status_code=401, detail="Login failed")
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch("app.routers.repeaters.prepare_repeater_connection", side_effect=_prepare_fail),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_login(KEY_A, RepeaterLoginRequest(password="bad"))
|
|
assert exc.value.status_code == 401
|
|
|
|
|
|
class TestRepeaterStatus:
|
|
@pytest.mark.asyncio
|
|
async def test_success_with_field_mapping(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_status_sync = AsyncMock(
|
|
return_value={
|
|
"bat": 4200,
|
|
"tx_queue_len": 2,
|
|
"noise_floor": -120,
|
|
"last_rssi": -85,
|
|
"last_snr": 7.5,
|
|
"nb_recv": 1000,
|
|
"nb_sent": 500,
|
|
"airtime": 3600,
|
|
"rx_airtime": 7200,
|
|
"uptime": 86400,
|
|
"sent_flood": 100,
|
|
"sent_direct": 400,
|
|
"recv_flood": 300,
|
|
"recv_direct": 700,
|
|
"flood_dups": 10,
|
|
"direct_dups": 5,
|
|
"full_evts": 0,
|
|
}
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_status(KEY_A)
|
|
|
|
assert response.battery_volts == 4.2
|
|
assert response.tx_queue_len == 2
|
|
assert response.noise_floor_dbm == -120
|
|
assert response.last_rssi_dbm == -85
|
|
assert response.last_snr_db == 7.5
|
|
assert response.packets_received == 1000
|
|
assert response.packets_sent == 500
|
|
assert response.uptime_seconds == 86400
|
|
assert response.sent_flood == 100
|
|
assert response.recv_direct == 700
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_504_on_timeout(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_status_sync = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_status(KEY_A)
|
|
assert exc.value.status_code == 504
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_400_not_repeater(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_status(KEY_A)
|
|
assert exc.value.status_code == 400
|
|
|
|
|
|
class TestRepeaterLppTelemetry:
|
|
@pytest.mark.asyncio
|
|
async def test_success_with_sensors(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_telemetry_sync = AsyncMock(
|
|
return_value=[
|
|
{"channel": 0, "type": "temperature", "value": 24.5},
|
|
{"channel": 1, "type": "humidity", "value": 62.0},
|
|
{
|
|
"channel": 2,
|
|
"type": "gps",
|
|
"value": {"latitude": 37.7, "longitude": -122.4, "altitude": 15.0},
|
|
},
|
|
]
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_lpp_telemetry(KEY_A)
|
|
|
|
assert len(response.sensors) == 3
|
|
assert response.sensors[0].channel == 0
|
|
assert response.sensors[0].type_name == "temperature"
|
|
assert response.sensors[0].value == 24.5
|
|
assert response.sensors[1].type_name == "humidity"
|
|
assert response.sensors[1].value == 62.0
|
|
assert response.sensors[2].type_name == "gps"
|
|
assert isinstance(response.sensors[2].value, dict)
|
|
assert response.sensors[2].value["latitude"] == 37.7
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_sensors(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_telemetry_sync = AsyncMock(return_value=[])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_lpp_telemetry(KEY_A)
|
|
|
|
assert response.sensors == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_504_on_timeout(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_telemetry_sync = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_lpp_telemetry(KEY_A)
|
|
assert exc.value.status_code == 504
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_400_not_repeater(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_lpp_telemetry(KEY_A)
|
|
assert exc.value.status_code == 400
|
|
|
|
|
|
class TestRepeaterNeighbors:
|
|
@pytest.mark.asyncio
|
|
async def test_success_with_name_resolution(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
neighbor_key = "bb" * 32
|
|
await _insert_contact(neighbor_key, name="NeighborNode", contact_type=1)
|
|
|
|
mc.commands.fetch_all_neighbours = AsyncMock(
|
|
return_value={
|
|
"neighbours": [
|
|
{"pubkey": neighbor_key[:12], "snr": 9.0, "secs_ago": 5},
|
|
{"pubkey": "cccccccccccc", "snr": 3.0, "secs_ago": 120},
|
|
]
|
|
}
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_neighbors(KEY_A)
|
|
|
|
assert len(response.neighbors) == 2
|
|
assert response.neighbors[0].name == "NeighborNode"
|
|
assert response.neighbors[0].snr == 9.0
|
|
assert response.neighbors[1].name is None
|
|
assert response.neighbors[1].last_heard_seconds == 120
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_neighbors(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.fetch_all_neighbours = AsyncMock(return_value={"neighbours": []})
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_neighbors(KEY_A)
|
|
|
|
assert response.neighbors == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_empty(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.fetch_all_neighbours = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_neighbors(KEY_A)
|
|
|
|
assert response.neighbors == []
|
|
|
|
|
|
class TestRepeaterAcl:
|
|
@pytest.mark.asyncio
|
|
async def test_success_with_permission_mapping(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
neighbor_key = "bb" * 32
|
|
await _insert_contact(neighbor_key, name="Admin User", contact_type=1)
|
|
|
|
mc.commands.req_acl_sync = AsyncMock(
|
|
return_value=[
|
|
{"key": neighbor_key[:12], "perm": 3},
|
|
{"key": "dddddddddddd", "perm": 0},
|
|
]
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_acl(KEY_A)
|
|
|
|
assert len(response.acl) == 2
|
|
assert response.acl[0].name == "Admin User"
|
|
assert response.acl[0].permission_name == "Admin"
|
|
assert response.acl[1].name is None
|
|
assert response.acl[1].permission_name == "Guest"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_acl(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_acl_sync = AsyncMock(return_value=[])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_acl(KEY_A)
|
|
|
|
assert response.acl == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_empty(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.req_acl_sync = AsyncMock(return_value=None)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
response = await repeater_acl(KEY_A)
|
|
|
|
assert response.acl == []
|
|
|
|
|
|
class TestRepeaterRadioSettings:
|
|
@pytest.mark.asyncio
|
|
async def test_full_success(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
|
|
# Build responses for all 10 commands
|
|
responses = [
|
|
"v2.1.0", # ver
|
|
"915.0,250,7,5", # get radio
|
|
"20", # get tx
|
|
"0", # get af
|
|
"1", # get repeat
|
|
"3", # get flood.max
|
|
"MyRepeater", # get name
|
|
"40.7128", # get lat
|
|
"-74.0060", # get lon
|
|
"2025-02-25 14:30:00", # clock
|
|
]
|
|
get_msg_results = [
|
|
_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": text, "txt_type": 1},
|
|
)
|
|
for text in responses
|
|
]
|
|
mc.commands.get_msg = AsyncMock(side_effect=get_msg_results)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await repeater_radio_settings(KEY_A)
|
|
|
|
assert response.firmware_version == "v2.1.0"
|
|
assert response.radio == "915.0,250,7,5"
|
|
assert response.tx_power == "20"
|
|
assert response.airtime_factor == "0"
|
|
assert response.repeat_enabled == "1"
|
|
assert response.flood_max == "3"
|
|
assert response.name == "MyRepeater"
|
|
assert response.lat == "40.7128"
|
|
assert response.lon == "-74.0060"
|
|
assert response.clock_utc == "2025-02-25 14:30:00"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_partial_failure(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
|
|
# First command succeeds, rest timeout
|
|
first_response = _radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "v2.0.0", "txt_type": 1},
|
|
)
|
|
no_msgs = _radio_result(EventType.NO_MORE_MSGS)
|
|
mc.commands.get_msg = AsyncMock(side_effect=[first_response] + [no_msgs] * 50)
|
|
|
|
# Provide clock ticks: first command succeeds quickly, others expire
|
|
clock_ticks = [0.0, 0.1] # First fetch succeeds
|
|
for i in range(9):
|
|
base = 100.0 * (i + 1)
|
|
clock_ticks.extend([base, base + 5.0, base + 11.0])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=clock_ticks),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
response = await repeater_radio_settings(KEY_A)
|
|
|
|
assert response.firmware_version == "v2.0.0"
|
|
assert response.radio is None
|
|
assert response.tx_power is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_400_not_repeater(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Client", contact_type=1)
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_radio_settings(KEY_A)
|
|
assert exc.value.status_code == 400
|
|
|
|
|
|
class TestRepeaterAdvertIntervals:
|
|
@pytest.mark.asyncio
|
|
async def test_success(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
|
|
responses = [
|
|
_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "30", "txt_type": 1},
|
|
),
|
|
_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "120", "txt_type": 1},
|
|
),
|
|
]
|
|
mc.commands.get_msg = AsyncMock(side_effect=responses)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await repeater_advert_intervals(KEY_A)
|
|
|
|
assert response.advert_interval == "30"
|
|
assert response.flood_advert_interval == "120"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_none_fields(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
|
|
|
|
clock_ticks = []
|
|
for i in range(2):
|
|
base = 100.0 * i
|
|
clock_ticks.extend([base, base + 5.0, base + 11.0])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=clock_ticks),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
response = await repeater_advert_intervals(KEY_A)
|
|
|
|
assert response.advert_interval is None
|
|
assert response.flood_advert_interval is None
|
|
|
|
|
|
class TestRepeaterOwnerInfo:
|
|
@pytest.mark.asyncio
|
|
async def test_success(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
|
|
responses = [
|
|
_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{
|
|
"pubkey_prefix": KEY_A[:12],
|
|
"text": "John Doe - Contact: john@example.com",
|
|
"txt_type": 1,
|
|
},
|
|
),
|
|
_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "guestpw123", "txt_type": 1},
|
|
),
|
|
]
|
|
mc.commands.get_msg = AsyncMock(side_effect=responses)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
):
|
|
response = await repeater_owner_info(KEY_A)
|
|
|
|
assert response.owner_info == "John Doe - Contact: john@example.com"
|
|
assert response.guest_password == "guestpw123"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_none_fields(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
|
|
|
|
clock_ticks = []
|
|
for i in range(2):
|
|
base = 100.0 * i
|
|
clock_ticks.extend([base, base + 5.0, base + 11.0])
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=clock_ticks),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
response = await repeater_owner_info(KEY_A)
|
|
|
|
assert response.owner_info is None
|
|
assert response.guest_password is None
|
|
|
|
|
|
def _make_contact(
|
|
public_key: str = KEY_A, name: str = "Repeater", contact_type: int = 2
|
|
) -> Contact:
|
|
"""Create a Contact model instance for testing."""
|
|
return Contact(public_key=public_key, name=name, type=contact_type)
|
|
|
|
|
|
class TestBatchCliFetch:
|
|
"""Tests for the _batch_cli_fetch helper."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_contact_error_raises_500(self):
|
|
mc = _mock_mc()
|
|
mc.commands.add_contact = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "radio busy"})
|
|
)
|
|
|
|
contact = _make_contact()
|
|
|
|
with patch.object(radio_manager, "_meshcore", mc):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await _batch_cli_fetch(contact, "test_op", [("ver", "firmware_version")])
|
|
|
|
assert exc.value.status_code == 500
|
|
assert "Failed to add contact to radio" in exc.value.detail
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_cmd_error_skips_field(self):
|
|
mc = _mock_mc()
|
|
mc.commands.add_contact = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
|
|
# First command fails, second succeeds
|
|
mc.commands.send_cmd = AsyncMock(
|
|
side_effect=[
|
|
_radio_result(EventType.ERROR, {"err": "bad cmd"}),
|
|
_radio_result(EventType.OK),
|
|
]
|
|
)
|
|
mc.commands.get_msg = AsyncMock(
|
|
return_value=_radio_result(
|
|
EventType.CONTACT_MSG_RECV,
|
|
{"pubkey_prefix": KEY_A[:12], "text": "result2", "txt_type": 1},
|
|
)
|
|
)
|
|
|
|
contact = _make_contact()
|
|
|
|
with (
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=_advancing_clock()),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
results = await _batch_cli_fetch(
|
|
contact, "test_op", [("bad_cmd", "field_a"), ("good_cmd", "field_b")]
|
|
)
|
|
|
|
assert results["field_a"] is None # skipped due to send error
|
|
assert results["field_b"] == "result2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_response_leaves_field_none(self):
|
|
mc = _mock_mc()
|
|
mc.commands.add_contact = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.send_cmd = AsyncMock(return_value=_radio_result(EventType.OK))
|
|
mc.commands.get_msg = AsyncMock(return_value=_radio_result(EventType.NO_MORE_MSGS))
|
|
|
|
contact = _make_contact()
|
|
|
|
with (
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
patch(_MONOTONIC, side_effect=[0.0, 5.0, 11.0]),
|
|
patch("app.routers.repeaters.asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
results = await _batch_cli_fetch(contact, "test_op", [("clock", "clock_output")])
|
|
|
|
assert results["clock_output"] is None
|
|
|
|
|
|
class TestRepeaterAddContactError:
|
|
"""Test that repeater endpoints raise 500 when add_contact fails."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_add_contact_error(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.add_contact = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "radio busy"})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_status(KEY_A)
|
|
|
|
assert exc.value.status_code == 500
|
|
assert "Failed to add contact to radio" in exc.value.detail
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lpp_telemetry_add_contact_error(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.add_contact = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "radio busy"})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_lpp_telemetry(KEY_A)
|
|
|
|
assert exc.value.status_code == 500
|
|
assert "Failed to add contact to radio" in exc.value.detail
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_neighbors_add_contact_error(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.add_contact = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "radio busy"})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_neighbors(KEY_A)
|
|
|
|
assert exc.value.status_code == 500
|
|
assert "Failed to add contact to radio" in exc.value.detail
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_acl_add_contact_error(self, test_db):
|
|
mc = _mock_mc()
|
|
await _insert_contact(KEY_A, name="Repeater", contact_type=2)
|
|
mc.commands.add_contact = AsyncMock(
|
|
return_value=_radio_result(EventType.ERROR, {"err": "radio busy"})
|
|
)
|
|
|
|
with (
|
|
patch("app.routers.repeaters.require_connected", return_value=mc),
|
|
patch.object(radio_manager, "_meshcore", mc),
|
|
):
|
|
with pytest.raises(HTTPException) as exc:
|
|
await repeater_acl(KEY_A)
|
|
|
|
assert exc.value.status_code == 500
|
|
assert "Failed to add contact to radio" in exc.value.detail
|