mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
223 lines
7.6 KiB
Python
223 lines
7.6 KiB
Python
"""Tests for shared radio operation locking behavior."""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from app.radio import RadioDisconnectedError, RadioOperationBusyError, radio_manager
|
|
from app.radio_sync import is_polling_paused
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def reset_radio_operation_state():
|
|
"""Reset shared radio operation lock state before/after each test."""
|
|
prev_meshcore = radio_manager._meshcore
|
|
radio_manager._operation_lock = None
|
|
# Default to a non-None MagicMock so radio_operation() doesn't raise
|
|
# RadioDisconnectedError for tests that only exercise locking.
|
|
radio_manager._meshcore = MagicMock()
|
|
|
|
import app.radio_sync as radio_sync
|
|
|
|
radio_sync._polling_pause_count = 0
|
|
yield
|
|
radio_manager._operation_lock = None
|
|
radio_manager._meshcore = prev_meshcore
|
|
radio_sync._polling_pause_count = 0
|
|
|
|
|
|
class TestRadioOperationLock:
|
|
"""Validate shared radio operation lock semantics."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_non_blocking_fails_when_lock_held_by_other_task(self):
|
|
started = asyncio.Event()
|
|
release = asyncio.Event()
|
|
|
|
async def holder():
|
|
async with radio_manager.radio_operation("holder"):
|
|
started.set()
|
|
await release.wait()
|
|
|
|
holder_task = asyncio.create_task(holder())
|
|
await started.wait()
|
|
|
|
with pytest.raises(RadioOperationBusyError):
|
|
async with radio_manager.radio_operation("contender", blocking=False):
|
|
pass
|
|
|
|
release.set()
|
|
await holder_task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_blocking_waits_and_acquires_after_release(self):
|
|
holder_entered = asyncio.Event()
|
|
holder_release = asyncio.Event()
|
|
contender_entered = asyncio.Event()
|
|
order: list[str] = []
|
|
|
|
async def holder():
|
|
async with radio_manager.radio_operation("holder"):
|
|
order.append("holder_enter")
|
|
holder_entered.set()
|
|
await holder_release.wait()
|
|
order.append("holder_exit")
|
|
|
|
async def contender():
|
|
await holder_entered.wait()
|
|
async with radio_manager.radio_operation("contender"):
|
|
order.append("contender_enter")
|
|
contender_entered.set()
|
|
|
|
holder_task = asyncio.create_task(holder())
|
|
contender_task = asyncio.create_task(contender())
|
|
|
|
await holder_entered.wait()
|
|
await asyncio.sleep(0.02)
|
|
assert not contender_entered.is_set()
|
|
|
|
holder_release.set()
|
|
await asyncio.wait_for(contender_entered.wait(), timeout=1.0)
|
|
|
|
await holder_task
|
|
await contender_task
|
|
assert order == ["holder_enter", "holder_exit", "contender_enter"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_suspend_auto_fetch_stops_and_restarts(self):
|
|
mc = MagicMock()
|
|
mc.stop_auto_message_fetching = AsyncMock()
|
|
mc.start_auto_message_fetching = AsyncMock()
|
|
radio_manager._meshcore = mc
|
|
|
|
async with radio_manager.radio_operation(
|
|
"auto_fetch_toggle",
|
|
suspend_auto_fetch=True,
|
|
):
|
|
pass
|
|
|
|
mc.stop_auto_message_fetching.assert_awaited_once()
|
|
mc.start_auto_message_fetching.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_lock_released_when_auto_fetch_restart_is_cancelled(self):
|
|
mc = MagicMock()
|
|
mc.stop_auto_message_fetching = AsyncMock()
|
|
mc.start_auto_message_fetching = AsyncMock(side_effect=asyncio.CancelledError())
|
|
radio_manager._meshcore = mc
|
|
|
|
with pytest.raises(asyncio.CancelledError):
|
|
async with radio_manager.radio_operation(
|
|
"cancelled_restart",
|
|
suspend_auto_fetch=True,
|
|
):
|
|
pass
|
|
|
|
async with radio_manager.radio_operation("after_cancel", blocking=False):
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pause_polling_toggles_state(self):
|
|
assert not is_polling_paused()
|
|
|
|
async with radio_manager.radio_operation("pause_polling", pause_polling=True):
|
|
assert is_polling_paused()
|
|
|
|
assert not is_polling_paused()
|
|
|
|
|
|
class TestRadioOperationYield:
|
|
"""Validate that radio_operation() yields the current meshcore instance."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_radio_operation_yields_current_meshcore(self):
|
|
"""The yielded value is the current _meshcore at lock-acquisition time."""
|
|
mc = MagicMock()
|
|
radio_manager._meshcore = mc
|
|
|
|
async with radio_manager.radio_operation("test_yield") as yielded:
|
|
assert yielded is mc
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_radio_operation_raises_when_disconnected_after_lock(self):
|
|
"""RadioDisconnectedError is raised when _meshcore is None after acquiring the lock."""
|
|
radio_manager._meshcore = None
|
|
|
|
with pytest.raises(RadioDisconnectedError):
|
|
async with radio_manager.radio_operation("test_disconnected"):
|
|
pass # pragma: no cover
|
|
|
|
# Lock must be released even after the error
|
|
radio_manager._meshcore = MagicMock()
|
|
async with radio_manager.radio_operation("after_error", blocking=False):
|
|
pass
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_radio_operation_yields_fresh_reference_after_swap(self):
|
|
"""If _meshcore is swapped between pre-check and lock acquisition,
|
|
the yielded value is the new (current) instance, not the old one."""
|
|
old_mc = MagicMock(name="old")
|
|
new_mc = MagicMock(name="new")
|
|
|
|
# Start with old_mc
|
|
radio_manager._meshcore = old_mc
|
|
|
|
# Simulate a reconnect swapping _meshcore before the caller enters the block
|
|
radio_manager._meshcore = new_mc
|
|
|
|
async with radio_manager.radio_operation("test_swap") as yielded:
|
|
assert yielded is new_mc
|
|
assert yielded is not old_mc
|
|
|
|
|
|
class TestRequireConnected:
|
|
"""Test the require_connected() FastAPI dependency."""
|
|
|
|
def test_raises_503_when_setup_in_progress(self):
|
|
"""HTTPException 503 is raised when radio is connected but setup is still in progress."""
|
|
from fastapi import HTTPException
|
|
|
|
from app.dependencies import require_connected
|
|
|
|
with patch("app.dependencies.radio_manager") as mock_rm:
|
|
mock_rm.is_connected = True
|
|
mock_rm.meshcore = MagicMock()
|
|
mock_rm.is_setup_in_progress = True
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
require_connected()
|
|
|
|
assert exc_info.value.status_code == 503
|
|
assert "initializing" in exc_info.value.detail.lower()
|
|
|
|
def test_raises_503_when_not_connected(self):
|
|
"""HTTPException 503 is raised when radio is not connected."""
|
|
from fastapi import HTTPException
|
|
|
|
from app.dependencies import require_connected
|
|
|
|
with patch("app.dependencies.radio_manager") as mock_rm:
|
|
mock_rm.is_setup_in_progress = False
|
|
mock_rm.is_connected = False
|
|
mock_rm.meshcore = None
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
require_connected()
|
|
|
|
assert exc_info.value.status_code == 503
|
|
|
|
def test_returns_meshcore_when_connected_and_setup_complete(self):
|
|
"""Returns meshcore instance when radio is connected and setup is complete."""
|
|
from app.dependencies import require_connected
|
|
|
|
mock_mc = MagicMock()
|
|
with patch("app.dependencies.radio_manager") as mock_rm:
|
|
mock_rm.is_setup_in_progress = False
|
|
mock_rm.is_connected = True
|
|
mock_rm.meshcore = mock_mc
|
|
|
|
result = require_connected()
|
|
|
|
assert result is mock_mc
|