extract radio lifecycle service

This commit is contained in:
Jack Kingsman
2026-03-09 18:02:58 -07:00
parent 0d671f361d
commit 344cee5508
5 changed files with 332 additions and 221 deletions

View File

@@ -37,13 +37,14 @@ logger = logging.getLogger(__name__)
async def _startup_radio_connect_and_setup() -> None:
"""Connect/setup the radio in the background so HTTP serving can start immediately."""
try:
connected = await radio_manager.reconnect(broadcast_on_success=False)
if connected:
await radio_manager.post_connect_setup()
from app.websocket import broadcast_health
from app.services.radio_lifecycle import reconnect_and_prepare_radio
broadcast_health(True, radio_manager.connection_info)
try:
connected = await reconnect_and_prepare_radio(
radio_manager,
broadcast_on_success=True,
)
if connected:
logger.info("Connected to radio")
else:
logger.warning("Failed to connect to radio on startup")

View File

@@ -217,146 +217,10 @@ class RadioManager:
self._release_operation_lock(name)
async def post_connect_setup(self) -> None:
"""Full post-connection setup: handlers, key export, sync, advertisements, polling.
"""Run shared post-connection orchestration after transport setup succeeds."""
from app.services.radio_lifecycle import run_post_connect_setup
Called after every successful connection or reconnection.
Idempotent — safe to call repeatedly (periodic tasks have start guards).
"""
from app.event_handlers import register_event_handlers
from app.keystore import export_and_store_private_key
from app.radio_sync import (
drain_pending_messages,
send_advertisement,
start_message_polling,
start_periodic_advert,
start_periodic_sync,
sync_and_offload_all,
sync_radio_time,
)
if not self._meshcore:
return
if self._setup_lock is None:
self._setup_lock = asyncio.Lock()
async with self._setup_lock:
if not self._meshcore:
return
self._setup_in_progress = True
self._setup_complete = False
mc = self._meshcore
try:
# Register event handlers (no radio I/O, just callback setup)
register_event_handlers(mc)
# Hold the operation lock for all radio I/O during setup.
# This prevents user-initiated operations (send message, etc.)
# from interleaving commands on the serial link.
await self._acquire_operation_lock("post_connect_setup", blocking=True)
try:
await export_and_store_private_key(mc)
# Sync radio clock with system time
await sync_radio_time(mc)
# Apply flood scope from settings (best-effort; older firmware
# may not support set_flood_scope)
from app.region_scope import normalize_region_scope
from app.repository import AppSettingsRepository
app_settings = await AppSettingsRepository.get()
scope = normalize_region_scope(app_settings.flood_scope)
try:
await mc.commands.set_flood_scope(scope if scope else "")
logger.info("Applied flood_scope=%r", scope or "(disabled)")
except Exception as exc:
logger.warning(
"set_flood_scope failed (firmware may not support it): %s", exc
)
# Query path hash mode support (best-effort; older firmware won't report it).
# If the library's parsed payload is missing path_hash_mode (e.g. stale
# .pyc on WSL2 Windows mounts), fall back to raw-frame extraction.
reader = mc._reader
_original_handle_rx = reader.handle_rx
_captured_frame: list[bytes] = []
async def _capture_handle_rx(data: bytearray) -> None:
from meshcore.packets import PacketType
if len(data) > 0 and data[0] == PacketType.DEVICE_INFO.value:
_captured_frame.append(bytes(data))
return await _original_handle_rx(data)
reader.handle_rx = _capture_handle_rx
self.path_hash_mode = 0
self.path_hash_mode_supported = False
try:
device_query = await mc.commands.send_device_query()
if device_query and "path_hash_mode" in device_query.payload:
self.path_hash_mode = device_query.payload["path_hash_mode"]
self.path_hash_mode_supported = True
elif _captured_frame:
# Raw-frame fallback: byte 1 = fw_ver, byte 81 = path_hash_mode
raw = _captured_frame[-1]
fw_ver = raw[1] if len(raw) > 1 else 0
if fw_ver >= 10 and len(raw) >= 82:
self.path_hash_mode = raw[81]
self.path_hash_mode_supported = True
logger.warning(
"path_hash_mode=%d extracted from raw frame "
"(stale .pyc? try: rm %s)",
self.path_hash_mode,
getattr(
__import__("meshcore.reader", fromlist=["reader"]),
"__cached__",
"meshcore __pycache__/reader.*.pyc",
),
)
if self.path_hash_mode_supported:
logger.info("Path hash mode: %d (supported)", self.path_hash_mode)
else:
logger.debug("Firmware does not report path_hash_mode")
except Exception as exc:
logger.debug("Failed to query path_hash_mode: %s", exc)
finally:
reader.handle_rx = _original_handle_rx
# Sync contacts/channels from radio to DB and clear radio
logger.info("Syncing and offloading radio data...")
result = await sync_and_offload_all(mc)
logger.info("Sync complete: %s", result)
# Send advertisement to announce our presence (if enabled and not throttled)
if await send_advertisement(mc):
logger.info("Advertisement sent")
else:
logger.debug("Advertisement skipped (disabled or throttled)")
# Drain any messages that were queued before we connected.
# This must happen BEFORE starting auto-fetch, otherwise both
# compete on get_msg() with interleaved radio I/O.
drained = await drain_pending_messages(mc)
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
await mc.start_auto_message_fetching()
logger.info("Auto message fetching started")
finally:
self._release_operation_lock("post_connect_setup")
# Start background tasks AFTER releasing the operation lock.
# These tasks acquire their own locks when they need radio access.
start_periodic_sync()
start_periodic_advert()
start_message_polling()
self._setup_complete = True
finally:
self._setup_in_progress = False
logger.info("Post-connect setup complete")
await run_post_connect_setup(self)
@property
def meshcore(self) -> MeshCore | None:
@@ -516,77 +380,12 @@ class RadioManager:
async def start_connection_monitor(self) -> None:
"""Start background task to monitor connection and auto-reconnect."""
from app.services.radio_lifecycle import connection_monitor_loop
if self._reconnect_task is not None:
return
async def monitor_loop():
from app.websocket import broadcast_health
CHECK_INTERVAL_SECONDS = 5
UNRESPONSIVE_THRESHOLD = 3
consecutive_setup_failures = 0
while True:
try:
await asyncio.sleep(CHECK_INTERVAL_SECONDS)
current_connected = self.is_connected
# Detect status change
if self._last_connected and not current_connected:
# Connection lost
logger.warning("Radio connection lost, broadcasting status change")
broadcast_health(False, self._connection_info)
self._last_connected = False
consecutive_setup_failures = 0
if not current_connected:
# Attempt reconnection on every loop while disconnected
if not self.is_reconnecting and await self.reconnect(
broadcast_on_success=False
):
await self.post_connect_setup()
broadcast_health(True, self._connection_info)
self._last_connected = True
consecutive_setup_failures = 0
elif not self._last_connected and current_connected:
# Connection restored (might have reconnected automatically).
# Always run setup before reporting healthy.
logger.info("Radio connection restored")
await self.post_connect_setup()
broadcast_health(True, self._connection_info)
self._last_connected = True
consecutive_setup_failures = 0
elif current_connected and not self._setup_complete:
# Transport connected but setup incomplete — retry
logger.info("Retrying post-connect setup...")
await self.post_connect_setup()
broadcast_health(True, self._connection_info)
consecutive_setup_failures = 0
except asyncio.CancelledError:
# Task is being cancelled, exit cleanly
break
except Exception as e:
consecutive_setup_failures += 1
if consecutive_setup_failures == UNRESPONSIVE_THRESHOLD:
logger.error(
"Post-connect setup has failed %d times in a row. "
"The radio port appears open but the radio is not "
"responding to commands. Common causes: another "
"process has the serial port open (check for other "
"RemoteTerm instances, serial monitors, etc.), the "
"firmware is in repeater mode (not client), or the "
"radio needs a power cycle. Will keep retrying.",
consecutive_setup_failures,
)
elif consecutive_setup_failures < UNRESPONSIVE_THRESHOLD:
logger.exception("Error in connection monitor, continuing: %s", e)
# After the threshold, silently retry (avoid log spam)
self._reconnect_task = asyncio.create_task(monitor_loop())
self._reconnect_task = asyncio.create_task(connection_monitor_loop(self))
logger.info("Radio connection monitor started")
async def stop_connection_monitor(self) -> None:

View File

@@ -207,6 +207,8 @@ async def send_advertisement() -> dict:
async def _attempt_reconnect() -> dict:
"""Shared reconnection logic for reboot and reconnect endpoints."""
from app.services.radio_lifecycle import reconnect_and_prepare_radio
if radio_manager.is_reconnecting:
return {
"status": "pending",
@@ -214,14 +216,11 @@ async def _attempt_reconnect() -> dict:
"connected": False,
}
success = await radio_manager.reconnect()
if not success:
raise HTTPException(
status_code=503, detail="Failed to reconnect. Check radio connection and power."
)
try:
await radio_manager.post_connect_setup()
success = await reconnect_and_prepare_radio(
radio_manager,
broadcast_on_success=True,
)
except Exception as e:
logger.exception("Post-connect setup failed after reconnect")
raise HTTPException(
@@ -229,6 +228,11 @@ async def _attempt_reconnect() -> dict:
detail=f"Radio connected but setup failed: {e}",
) from e
if not success:
raise HTTPException(
status_code=503, detail="Failed to reconnect. Check radio connection and power."
)
return {"status": "ok", "message": "Reconnected successfully", "connected": True}
@@ -260,13 +264,15 @@ async def reconnect_radio() -> dict:
if no specific port is configured. Useful when the radio has been disconnected
or power-cycled.
"""
from app.services.radio_lifecycle import prepare_connected_radio
if radio_manager.is_connected:
if radio_manager.is_setup_complete:
return {"status": "ok", "message": "Already connected", "connected": True}
logger.info("Radio connected but setup incomplete, retrying setup")
try:
await radio_manager.post_connect_setup()
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
return {"status": "ok", "message": "Setup completed", "connected": True}
except Exception as e:
logger.exception("Post-connect setup failed")

View File

@@ -0,0 +1,221 @@
import asyncio
import logging
logger = logging.getLogger(__name__)
async def run_post_connect_setup(radio_manager) -> None:
"""Run shared radio initialization after a transport connection succeeds."""
from app.event_handlers import register_event_handlers
from app.keystore import export_and_store_private_key
from app.radio_sync import (
drain_pending_messages,
send_advertisement,
start_message_polling,
start_periodic_advert,
start_periodic_sync,
sync_and_offload_all,
sync_radio_time,
)
if not radio_manager.meshcore:
return
if radio_manager._setup_lock is None:
radio_manager._setup_lock = asyncio.Lock()
async with radio_manager._setup_lock:
if not radio_manager.meshcore:
return
radio_manager._setup_in_progress = True
radio_manager._setup_complete = False
mc = radio_manager.meshcore
try:
# Register event handlers (no radio I/O, just callback setup)
register_event_handlers(mc)
# Hold the operation lock for all radio I/O during setup.
# This prevents user-initiated operations (send message, etc.)
# from interleaving commands on the serial link.
await radio_manager._acquire_operation_lock("post_connect_setup", blocking=True)
try:
await export_and_store_private_key(mc)
# Sync radio clock with system time
await sync_radio_time(mc)
# Apply flood scope from settings (best-effort; older firmware
# may not support set_flood_scope)
from app.region_scope import normalize_region_scope
from app.repository import AppSettingsRepository
app_settings = await AppSettingsRepository.get()
scope = normalize_region_scope(app_settings.flood_scope)
try:
await mc.commands.set_flood_scope(scope if scope else "")
logger.info("Applied flood_scope=%r", scope or "(disabled)")
except Exception as exc:
logger.warning("set_flood_scope failed (firmware may not support it): %s", exc)
# Query path hash mode support (best-effort; older firmware won't report it).
# If the library's parsed payload is missing path_hash_mode (e.g. stale
# .pyc on WSL2 Windows mounts), fall back to raw-frame extraction.
reader = mc._reader
_original_handle_rx = reader.handle_rx
_captured_frame: list[bytes] = []
async def _capture_handle_rx(data: bytearray) -> None:
from meshcore.packets import PacketType
if len(data) > 0 and data[0] == PacketType.DEVICE_INFO.value:
_captured_frame.append(bytes(data))
return await _original_handle_rx(data)
reader.handle_rx = _capture_handle_rx
radio_manager.path_hash_mode = 0
radio_manager.path_hash_mode_supported = False
try:
device_query = await mc.commands.send_device_query()
if device_query and "path_hash_mode" in device_query.payload:
radio_manager.path_hash_mode = device_query.payload["path_hash_mode"]
radio_manager.path_hash_mode_supported = True
elif _captured_frame:
# Raw-frame fallback: byte 1 = fw_ver, byte 81 = path_hash_mode
raw = _captured_frame[-1]
fw_ver = raw[1] if len(raw) > 1 else 0
if fw_ver >= 10 and len(raw) >= 82:
radio_manager.path_hash_mode = raw[81]
radio_manager.path_hash_mode_supported = True
logger.warning(
"path_hash_mode=%d extracted from raw frame "
"(stale .pyc? try: rm %s)",
radio_manager.path_hash_mode,
getattr(
__import__("meshcore.reader", fromlist=["reader"]),
"__cached__",
"meshcore __pycache__/reader.*.pyc",
),
)
if radio_manager.path_hash_mode_supported:
logger.info("Path hash mode: %d (supported)", radio_manager.path_hash_mode)
else:
logger.debug("Firmware does not report path_hash_mode")
except Exception as exc:
logger.debug("Failed to query path_hash_mode: %s", exc)
finally:
reader.handle_rx = _original_handle_rx
# Sync contacts/channels from radio to DB and clear radio
logger.info("Syncing and offloading radio data...")
result = await sync_and_offload_all(mc)
logger.info("Sync complete: %s", result)
# Send advertisement to announce our presence (if enabled and not throttled)
if await send_advertisement(mc):
logger.info("Advertisement sent")
else:
logger.debug("Advertisement skipped (disabled or throttled)")
# Drain any messages that were queued before we connected.
# This must happen BEFORE starting auto-fetch, otherwise both
# compete on get_msg() with interleaved radio I/O.
drained = await drain_pending_messages(mc)
if drained > 0:
logger.info("Drained %d pending message(s)", drained)
await mc.start_auto_message_fetching()
logger.info("Auto message fetching started")
finally:
radio_manager._release_operation_lock("post_connect_setup")
# Start background tasks AFTER releasing the operation lock.
# These tasks acquire their own locks when they need radio access.
start_periodic_sync()
start_periodic_advert()
start_message_polling()
radio_manager._setup_complete = True
finally:
radio_manager._setup_in_progress = False
logger.info("Post-connect setup complete")
async def prepare_connected_radio(radio_manager, *, broadcast_on_success: bool = True) -> None:
"""Finish setup for an already-connected radio and optionally broadcast health."""
from app.websocket import broadcast_health
await radio_manager.post_connect_setup()
radio_manager._last_connected = True
if broadcast_on_success:
broadcast_health(True, radio_manager.connection_info)
async def reconnect_and_prepare_radio(
radio_manager,
*,
broadcast_on_success: bool = True,
) -> bool:
"""Reconnect the transport, then run post-connect setup before reporting healthy."""
connected = await radio_manager.reconnect(broadcast_on_success=False)
if not connected:
return False
await prepare_connected_radio(radio_manager, broadcast_on_success=broadcast_on_success)
return True
async def connection_monitor_loop(radio_manager) -> None:
"""Monitor radio health and keep transport/setup state converged."""
from app.websocket import broadcast_health
check_interval_seconds = 5
unresponsive_threshold = 3
consecutive_setup_failures = 0
while True:
try:
await asyncio.sleep(check_interval_seconds)
current_connected = radio_manager.is_connected
if radio_manager._last_connected and not current_connected:
logger.warning("Radio connection lost, broadcasting status change")
broadcast_health(False, radio_manager.connection_info)
radio_manager._last_connected = False
consecutive_setup_failures = 0
if not current_connected:
if not radio_manager.is_reconnecting and await reconnect_and_prepare_radio(
radio_manager,
broadcast_on_success=True,
):
consecutive_setup_failures = 0
elif not radio_manager._last_connected and current_connected:
logger.info("Radio connection restored")
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
consecutive_setup_failures = 0
elif current_connected and not radio_manager.is_setup_complete:
logger.info("Retrying post-connect setup...")
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
consecutive_setup_failures = 0
except asyncio.CancelledError:
break
except Exception as e:
consecutive_setup_failures += 1
if consecutive_setup_failures == unresponsive_threshold:
logger.error(
"Post-connect setup has failed %d times in a row. "
"The radio port appears open but the radio is not "
"responding to commands. Common causes: another "
"process has the serial port open (check for other "
"RemoteTerm instances, serial monitors, etc.), the "
"firmware is in repeater mode (not client), or the "
"radio needs a power cycle. Will keep retrying.",
consecutive_setup_failures,
)
elif consecutive_setup_failures < unresponsive_threshold:
logger.exception("Error in connection monitor, continuing: %s", e)

View File

@@ -0,0 +1,84 @@
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.services.radio_lifecycle import prepare_connected_radio, reconnect_and_prepare_radio
class TestPrepareConnectedRadio:
@pytest.mark.asyncio
async def test_runs_setup_then_broadcasts_health(self):
radio_manager = MagicMock()
radio_manager._last_connected = False
radio_manager.connection_info = "TCP: test:4000"
call_order: list[str] = []
async def _setup():
call_order.append("setup")
radio_manager.post_connect_setup = AsyncMock(side_effect=_setup)
with patch("app.websocket.broadcast_health") as mock_broadcast:
await prepare_connected_radio(radio_manager, broadcast_on_success=True)
assert call_order == ["setup"]
assert radio_manager._last_connected is True
mock_broadcast.assert_called_once_with(True, "TCP: test:4000")
@pytest.mark.asyncio
async def test_can_skip_broadcast(self):
radio_manager = MagicMock()
radio_manager._last_connected = False
radio_manager.connection_info = "TCP: test:4000"
radio_manager.post_connect_setup = AsyncMock()
with patch("app.websocket.broadcast_health") as mock_broadcast:
await prepare_connected_radio(radio_manager, broadcast_on_success=False)
assert radio_manager._last_connected is True
mock_broadcast.assert_not_called()
class TestReconnectAndPrepareRadio:
@pytest.mark.asyncio
async def test_reconnects_without_early_health_broadcast(self):
radio_manager = MagicMock()
radio_manager._last_connected = False
radio_manager.connection_info = "Serial: /dev/ttyUSB0"
reconnect_calls: list[bool] = []
call_order: list[str] = []
async def _reconnect(*, broadcast_on_success: bool):
reconnect_calls.append(broadcast_on_success)
call_order.append("reconnect")
return True
async def _setup():
call_order.append("setup")
radio_manager.reconnect = AsyncMock(side_effect=_reconnect)
radio_manager.post_connect_setup = AsyncMock(side_effect=_setup)
with patch("app.websocket.broadcast_health") as mock_broadcast:
result = await reconnect_and_prepare_radio(radio_manager, broadcast_on_success=True)
assert result is True
assert reconnect_calls == [False]
assert call_order == ["reconnect", "setup"]
assert radio_manager._last_connected is True
mock_broadcast.assert_called_once_with(True, "Serial: /dev/ttyUSB0")
@pytest.mark.asyncio
async def test_returns_false_without_running_setup_when_reconnect_fails(self):
radio_manager = MagicMock()
radio_manager.reconnect = AsyncMock(return_value=False)
radio_manager.post_connect_setup = AsyncMock()
with patch("app.websocket.broadcast_health") as mock_broadcast:
result = await reconnect_and_prepare_radio(radio_manager, broadcast_on_success=True)
assert result is False
radio_manager.post_connect_setup.assert_not_awaited()
mock_broadcast.assert_not_called()