forked from iarv/Remote-Terminal-for-MeshCore
Fix event handler rereg, monitor crash, and polling pause ugliness
This commit is contained in:
@@ -113,6 +113,8 @@ broadcast_health(radio_connected=True, serial_port="/dev/ttyUSB0")
|
||||
- Checks connection every 5 seconds
|
||||
- Broadcasts `health` event on status change
|
||||
- Attempts automatic reconnection when connection lost
|
||||
- **Re-registers event handlers after successful auto-reconnect** (critical for message delivery)
|
||||
- Resilient to transient errors (logs and continues rather than crashing)
|
||||
- Supports manual reconnection via `POST /api/radio/reconnect`
|
||||
|
||||
```python
|
||||
@@ -126,6 +128,33 @@ await radio_manager.start_connection_monitor()
|
||||
await radio_manager.stop_connection_monitor()
|
||||
```
|
||||
|
||||
### Message Polling
|
||||
|
||||
Periodic message polling serves as a fallback for platforms where push events are unreliable.
|
||||
Use `pause_polling()` to temporarily suspend polling during operations that need exclusive
|
||||
radio access (e.g., repeater CLI commands):
|
||||
|
||||
```python
|
||||
from app.radio_sync import pause_polling, is_polling_paused
|
||||
|
||||
# Pause polling during sensitive operations (supports nesting)
|
||||
async with pause_polling():
|
||||
# Polling is paused here
|
||||
await do_repeater_operation()
|
||||
|
||||
async with pause_polling():
|
||||
# Still paused (nested)
|
||||
await do_another_operation()
|
||||
|
||||
# Still paused (outer context active)
|
||||
|
||||
# Polling resumes when all contexts exit
|
||||
|
||||
# Check current state
|
||||
if is_polling_paused():
|
||||
print("Polling is currently paused")
|
||||
```
|
||||
|
||||
## Database Schema
|
||||
|
||||
```sql
|
||||
|
||||
46
app/radio.py
46
app/radio.py
@@ -210,26 +210,40 @@ class RadioManager:
|
||||
from app.websocket import broadcast_health
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(5) # Check every 5 seconds
|
||||
try:
|
||||
await asyncio.sleep(5) # Check every 5 seconds
|
||||
|
||||
current_connected = self.is_connected
|
||||
current_connected = self.is_connected
|
||||
|
||||
# Detect status change
|
||||
if self._last_connected and not current_connected:
|
||||
# Connection lost
|
||||
logger.warning("Radio connection lost, broadcasting status change")
|
||||
broadcast_health(False, self._port)
|
||||
self._last_connected = False
|
||||
# Detect status change
|
||||
if self._last_connected and not current_connected:
|
||||
# Connection lost
|
||||
logger.warning("Radio connection lost, broadcasting status change")
|
||||
broadcast_health(False, self._port)
|
||||
self._last_connected = False
|
||||
|
||||
# Attempt reconnection
|
||||
await asyncio.sleep(3) # Wait a bit before trying
|
||||
await self.reconnect()
|
||||
# Attempt reconnection
|
||||
await asyncio.sleep(3) # Wait a bit before trying
|
||||
if await self.reconnect():
|
||||
# Re-register event handlers after successful reconnect
|
||||
from app.event_handlers import register_event_handlers
|
||||
if self._meshcore:
|
||||
register_event_handlers(self._meshcore)
|
||||
await self._meshcore.start_auto_message_fetching()
|
||||
logger.info("Event handlers re-registered after auto-reconnect")
|
||||
|
||||
elif not self._last_connected and current_connected:
|
||||
# Connection restored (might have reconnected automatically)
|
||||
logger.info("Radio connection restored")
|
||||
broadcast_health(True, self._port)
|
||||
self._last_connected = True
|
||||
elif not self._last_connected and current_connected:
|
||||
# Connection restored (might have reconnected automatically)
|
||||
logger.info("Radio connection restored")
|
||||
broadcast_health(True, self._port)
|
||||
self._last_connected = True
|
||||
|
||||
except asyncio.CancelledError:
|
||||
# Task is being cancelled, exit cleanly
|
||||
break
|
||||
except Exception as e:
|
||||
# Log error but continue monitoring - don't let the monitor die
|
||||
logger.exception("Error in connection monitor, continuing: %s", e)
|
||||
|
||||
self._reconnect_task = asyncio.create_task(monitor_loop())
|
||||
logger.info("Radio connection monitor started")
|
||||
|
||||
@@ -29,19 +29,27 @@ _message_poll_task: asyncio.Task | None = None
|
||||
# Message poll interval in seconds
|
||||
MESSAGE_POLL_INTERVAL = 5
|
||||
|
||||
# Flag to pause polling during repeater operations
|
||||
_polling_paused: bool = False
|
||||
# Counter to pause polling during repeater operations (supports nested pauses)
|
||||
_polling_pause_count: int = 0
|
||||
|
||||
|
||||
def is_polling_paused() -> bool:
|
||||
"""Check if polling is currently paused."""
|
||||
return _polling_pause_count > 0
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def pause_polling():
|
||||
"""Context manager to pause message polling during repeater operations."""
|
||||
global _polling_paused
|
||||
_polling_paused = True
|
||||
"""Context manager to pause message polling during repeater operations.
|
||||
|
||||
Supports nested pauses - polling only resumes when all pause contexts have exited.
|
||||
"""
|
||||
global _polling_pause_count
|
||||
_polling_pause_count += 1
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
_polling_paused = False
|
||||
_polling_pause_count -= 1
|
||||
|
||||
# Background task handle
|
||||
_sync_task: asyncio.Task | None = None
|
||||
@@ -291,7 +299,7 @@ async def _message_poll_loop():
|
||||
try:
|
||||
await asyncio.sleep(MESSAGE_POLL_INTERVAL)
|
||||
|
||||
if radio_manager.is_connected and not _polling_paused:
|
||||
if radio_manager.is_connected and not is_polling_paused():
|
||||
await poll_for_messages()
|
||||
|
||||
except asyncio.CancelledError:
|
||||
|
||||
115
tests/test_radio_sync.py
Normal file
115
tests/test_radio_sync.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""Tests for radio_sync module.
|
||||
|
||||
These tests verify the polling pause mechanism that prevents
|
||||
message polling from interfering with repeater CLI operations.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from app.radio_sync import (
|
||||
_polling_pause_count,
|
||||
is_polling_paused,
|
||||
pause_polling,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_polling_state():
|
||||
"""Reset polling pause state before and after each test."""
|
||||
import app.radio_sync as radio_sync
|
||||
radio_sync._polling_pause_count = 0
|
||||
yield
|
||||
radio_sync._polling_pause_count = 0
|
||||
|
||||
|
||||
class TestPollingPause:
|
||||
"""Test the polling pause mechanism."""
|
||||
|
||||
def test_initially_not_paused(self):
|
||||
"""Polling is not paused by default."""
|
||||
assert not is_polling_paused()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pause_polling_pauses(self):
|
||||
"""pause_polling context manager pauses polling."""
|
||||
assert not is_polling_paused()
|
||||
|
||||
async with pause_polling():
|
||||
assert is_polling_paused()
|
||||
|
||||
assert not is_polling_paused()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_pause_stays_paused(self):
|
||||
"""Nested pause_polling contexts keep polling paused until all exit."""
|
||||
assert not is_polling_paused()
|
||||
|
||||
async with pause_polling():
|
||||
assert is_polling_paused()
|
||||
|
||||
async with pause_polling():
|
||||
assert is_polling_paused()
|
||||
|
||||
# Still paused - outer context active
|
||||
assert is_polling_paused()
|
||||
|
||||
# Now unpaused - all contexts exited
|
||||
assert not is_polling_paused()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_triple_nested_pause(self):
|
||||
"""Three levels of nesting work correctly."""
|
||||
async with pause_polling():
|
||||
async with pause_polling():
|
||||
async with pause_polling():
|
||||
assert is_polling_paused()
|
||||
assert is_polling_paused()
|
||||
assert is_polling_paused()
|
||||
assert not is_polling_paused()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pause_resumes_on_exception(self):
|
||||
"""Polling resumes even if exception occurs in context."""
|
||||
try:
|
||||
async with pause_polling():
|
||||
assert is_polling_paused()
|
||||
raise ValueError("Test error")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Should be unpaused despite exception
|
||||
assert not is_polling_paused()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_nested_pause_resumes_correctly_on_inner_exception(self):
|
||||
"""Nested contexts handle exceptions correctly."""
|
||||
async with pause_polling():
|
||||
try:
|
||||
async with pause_polling():
|
||||
assert is_polling_paused()
|
||||
raise ValueError("Inner error")
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
# Outer context still active
|
||||
assert is_polling_paused()
|
||||
|
||||
# All contexts exited
|
||||
assert not is_polling_paused()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_counter_increments_and_decrements(self):
|
||||
"""Counter correctly tracks pause depth."""
|
||||
import app.radio_sync as radio_sync
|
||||
|
||||
assert radio_sync._polling_pause_count == 0
|
||||
|
||||
async with pause_polling():
|
||||
assert radio_sync._polling_pause_count == 1
|
||||
|
||||
async with pause_polling():
|
||||
assert radio_sync._polling_pause_count == 2
|
||||
|
||||
assert radio_sync._polling_pause_count == 1
|
||||
|
||||
assert radio_sync._polling_pause_count == 0
|
||||
Reference in New Issue
Block a user