diff --git a/app/main.py b/app/main.py index 4becab6..e174f0c 100644 --- a/app/main.py +++ b/app/main.py @@ -37,13 +37,14 @@ logger = logging.getLogger(__name__) async def _startup_radio_connect_and_setup() -> None: """Connect/setup the radio in the background so HTTP serving can start immediately.""" - try: - connected = await radio_manager.reconnect(broadcast_on_success=False) - if connected: - await radio_manager.post_connect_setup() - from app.websocket import broadcast_health + from app.services.radio_lifecycle import reconnect_and_prepare_radio - broadcast_health(True, radio_manager.connection_info) + try: + connected = await reconnect_and_prepare_radio( + radio_manager, + broadcast_on_success=True, + ) + if connected: logger.info("Connected to radio") else: logger.warning("Failed to connect to radio on startup") diff --git a/app/radio.py b/app/radio.py index ed86958..7ede1f4 100644 --- a/app/radio.py +++ b/app/radio.py @@ -217,146 +217,10 @@ class RadioManager: self._release_operation_lock(name) async def post_connect_setup(self) -> None: - """Full post-connection setup: handlers, key export, sync, advertisements, polling. + """Run shared post-connection orchestration after transport setup succeeds.""" + from app.services.radio_lifecycle import run_post_connect_setup - Called after every successful connection or reconnection. - Idempotent — safe to call repeatedly (periodic tasks have start guards). - """ - from app.event_handlers import register_event_handlers - from app.keystore import export_and_store_private_key - from app.radio_sync import ( - drain_pending_messages, - send_advertisement, - start_message_polling, - start_periodic_advert, - start_periodic_sync, - sync_and_offload_all, - sync_radio_time, - ) - - if not self._meshcore: - return - - if self._setup_lock is None: - self._setup_lock = asyncio.Lock() - - async with self._setup_lock: - if not self._meshcore: - return - self._setup_in_progress = True - self._setup_complete = False - mc = self._meshcore - try: - # Register event handlers (no radio I/O, just callback setup) - register_event_handlers(mc) - - # Hold the operation lock for all radio I/O during setup. - # This prevents user-initiated operations (send message, etc.) - # from interleaving commands on the serial link. - await self._acquire_operation_lock("post_connect_setup", blocking=True) - try: - await export_and_store_private_key(mc) - - # Sync radio clock with system time - await sync_radio_time(mc) - - # Apply flood scope from settings (best-effort; older firmware - # may not support set_flood_scope) - from app.region_scope import normalize_region_scope - from app.repository import AppSettingsRepository - - app_settings = await AppSettingsRepository.get() - scope = normalize_region_scope(app_settings.flood_scope) - try: - await mc.commands.set_flood_scope(scope if scope else "") - logger.info("Applied flood_scope=%r", scope or "(disabled)") - except Exception as exc: - logger.warning( - "set_flood_scope failed (firmware may not support it): %s", exc - ) - - # Query path hash mode support (best-effort; older firmware won't report it). - # If the library's parsed payload is missing path_hash_mode (e.g. stale - # .pyc on WSL2 Windows mounts), fall back to raw-frame extraction. - reader = mc._reader - _original_handle_rx = reader.handle_rx - _captured_frame: list[bytes] = [] - - async def _capture_handle_rx(data: bytearray) -> None: - from meshcore.packets import PacketType - - if len(data) > 0 and data[0] == PacketType.DEVICE_INFO.value: - _captured_frame.append(bytes(data)) - return await _original_handle_rx(data) - - reader.handle_rx = _capture_handle_rx - self.path_hash_mode = 0 - self.path_hash_mode_supported = False - try: - device_query = await mc.commands.send_device_query() - if device_query and "path_hash_mode" in device_query.payload: - self.path_hash_mode = device_query.payload["path_hash_mode"] - self.path_hash_mode_supported = True - elif _captured_frame: - # Raw-frame fallback: byte 1 = fw_ver, byte 81 = path_hash_mode - raw = _captured_frame[-1] - fw_ver = raw[1] if len(raw) > 1 else 0 - if fw_ver >= 10 and len(raw) >= 82: - self.path_hash_mode = raw[81] - self.path_hash_mode_supported = True - logger.warning( - "path_hash_mode=%d extracted from raw frame " - "(stale .pyc? try: rm %s)", - self.path_hash_mode, - getattr( - __import__("meshcore.reader", fromlist=["reader"]), - "__cached__", - "meshcore __pycache__/reader.*.pyc", - ), - ) - if self.path_hash_mode_supported: - logger.info("Path hash mode: %d (supported)", self.path_hash_mode) - else: - logger.debug("Firmware does not report path_hash_mode") - except Exception as exc: - logger.debug("Failed to query path_hash_mode: %s", exc) - finally: - reader.handle_rx = _original_handle_rx - - # Sync contacts/channels from radio to DB and clear radio - logger.info("Syncing and offloading radio data...") - result = await sync_and_offload_all(mc) - logger.info("Sync complete: %s", result) - - # Send advertisement to announce our presence (if enabled and not throttled) - if await send_advertisement(mc): - logger.info("Advertisement sent") - else: - logger.debug("Advertisement skipped (disabled or throttled)") - - # Drain any messages that were queued before we connected. - # This must happen BEFORE starting auto-fetch, otherwise both - # compete on get_msg() with interleaved radio I/O. - drained = await drain_pending_messages(mc) - if drained > 0: - logger.info("Drained %d pending message(s)", drained) - - await mc.start_auto_message_fetching() - logger.info("Auto message fetching started") - finally: - self._release_operation_lock("post_connect_setup") - - # Start background tasks AFTER releasing the operation lock. - # These tasks acquire their own locks when they need radio access. - start_periodic_sync() - start_periodic_advert() - start_message_polling() - - self._setup_complete = True - finally: - self._setup_in_progress = False - - logger.info("Post-connect setup complete") + await run_post_connect_setup(self) @property def meshcore(self) -> MeshCore | None: @@ -516,77 +380,12 @@ class RadioManager: async def start_connection_monitor(self) -> None: """Start background task to monitor connection and auto-reconnect.""" + from app.services.radio_lifecycle import connection_monitor_loop + if self._reconnect_task is not None: return - async def monitor_loop(): - from app.websocket import broadcast_health - - CHECK_INTERVAL_SECONDS = 5 - UNRESPONSIVE_THRESHOLD = 3 - consecutive_setup_failures = 0 - - while True: - try: - await asyncio.sleep(CHECK_INTERVAL_SECONDS) - - current_connected = self.is_connected - - # Detect status change - if self._last_connected and not current_connected: - # Connection lost - logger.warning("Radio connection lost, broadcasting status change") - broadcast_health(False, self._connection_info) - self._last_connected = False - consecutive_setup_failures = 0 - - if not current_connected: - # Attempt reconnection on every loop while disconnected - if not self.is_reconnecting and await self.reconnect( - broadcast_on_success=False - ): - await self.post_connect_setup() - broadcast_health(True, self._connection_info) - self._last_connected = True - consecutive_setup_failures = 0 - - elif not self._last_connected and current_connected: - # Connection restored (might have reconnected automatically). - # Always run setup before reporting healthy. - logger.info("Radio connection restored") - await self.post_connect_setup() - broadcast_health(True, self._connection_info) - self._last_connected = True - consecutive_setup_failures = 0 - - elif current_connected and not self._setup_complete: - # Transport connected but setup incomplete — retry - logger.info("Retrying post-connect setup...") - await self.post_connect_setup() - broadcast_health(True, self._connection_info) - consecutive_setup_failures = 0 - - except asyncio.CancelledError: - # Task is being cancelled, exit cleanly - break - except Exception as e: - consecutive_setup_failures += 1 - if consecutive_setup_failures == UNRESPONSIVE_THRESHOLD: - logger.error( - "Post-connect setup has failed %d times in a row. " - "The radio port appears open but the radio is not " - "responding to commands. Common causes: another " - "process has the serial port open (check for other " - "RemoteTerm instances, serial monitors, etc.), the " - "firmware is in repeater mode (not client), or the " - "radio needs a power cycle. Will keep retrying.", - consecutive_setup_failures, - ) - elif consecutive_setup_failures < UNRESPONSIVE_THRESHOLD: - logger.exception("Error in connection monitor, continuing: %s", e) - # After the threshold, silently retry (avoid log spam) - - self._reconnect_task = asyncio.create_task(monitor_loop()) + self._reconnect_task = asyncio.create_task(connection_monitor_loop(self)) logger.info("Radio connection monitor started") async def stop_connection_monitor(self) -> None: diff --git a/app/routers/radio.py b/app/routers/radio.py index 56b052a..934372d 100644 --- a/app/routers/radio.py +++ b/app/routers/radio.py @@ -207,6 +207,8 @@ async def send_advertisement() -> dict: async def _attempt_reconnect() -> dict: """Shared reconnection logic for reboot and reconnect endpoints.""" + from app.services.radio_lifecycle import reconnect_and_prepare_radio + if radio_manager.is_reconnecting: return { "status": "pending", @@ -214,14 +216,11 @@ async def _attempt_reconnect() -> dict: "connected": False, } - success = await radio_manager.reconnect() - if not success: - raise HTTPException( - status_code=503, detail="Failed to reconnect. Check radio connection and power." - ) - try: - await radio_manager.post_connect_setup() + success = await reconnect_and_prepare_radio( + radio_manager, + broadcast_on_success=True, + ) except Exception as e: logger.exception("Post-connect setup failed after reconnect") raise HTTPException( @@ -229,6 +228,11 @@ async def _attempt_reconnect() -> dict: detail=f"Radio connected but setup failed: {e}", ) from e + if not success: + raise HTTPException( + status_code=503, detail="Failed to reconnect. Check radio connection and power." + ) + return {"status": "ok", "message": "Reconnected successfully", "connected": True} @@ -260,13 +264,15 @@ async def reconnect_radio() -> dict: if no specific port is configured. Useful when the radio has been disconnected or power-cycled. """ + from app.services.radio_lifecycle import prepare_connected_radio + if radio_manager.is_connected: if radio_manager.is_setup_complete: return {"status": "ok", "message": "Already connected", "connected": True} logger.info("Radio connected but setup incomplete, retrying setup") try: - await radio_manager.post_connect_setup() + await prepare_connected_radio(radio_manager, broadcast_on_success=True) return {"status": "ok", "message": "Setup completed", "connected": True} except Exception as e: logger.exception("Post-connect setup failed") diff --git a/app/services/radio_lifecycle.py b/app/services/radio_lifecycle.py new file mode 100644 index 0000000..0133d14 --- /dev/null +++ b/app/services/radio_lifecycle.py @@ -0,0 +1,221 @@ +import asyncio +import logging + +logger = logging.getLogger(__name__) + + +async def run_post_connect_setup(radio_manager) -> None: + """Run shared radio initialization after a transport connection succeeds.""" + from app.event_handlers import register_event_handlers + from app.keystore import export_and_store_private_key + from app.radio_sync import ( + drain_pending_messages, + send_advertisement, + start_message_polling, + start_periodic_advert, + start_periodic_sync, + sync_and_offload_all, + sync_radio_time, + ) + + if not radio_manager.meshcore: + return + + if radio_manager._setup_lock is None: + radio_manager._setup_lock = asyncio.Lock() + + async with radio_manager._setup_lock: + if not radio_manager.meshcore: + return + radio_manager._setup_in_progress = True + radio_manager._setup_complete = False + mc = radio_manager.meshcore + try: + # Register event handlers (no radio I/O, just callback setup) + register_event_handlers(mc) + + # Hold the operation lock for all radio I/O during setup. + # This prevents user-initiated operations (send message, etc.) + # from interleaving commands on the serial link. + await radio_manager._acquire_operation_lock("post_connect_setup", blocking=True) + try: + await export_and_store_private_key(mc) + + # Sync radio clock with system time + await sync_radio_time(mc) + + # Apply flood scope from settings (best-effort; older firmware + # may not support set_flood_scope) + from app.region_scope import normalize_region_scope + from app.repository import AppSettingsRepository + + app_settings = await AppSettingsRepository.get() + scope = normalize_region_scope(app_settings.flood_scope) + try: + await mc.commands.set_flood_scope(scope if scope else "") + logger.info("Applied flood_scope=%r", scope or "(disabled)") + except Exception as exc: + logger.warning("set_flood_scope failed (firmware may not support it): %s", exc) + + # Query path hash mode support (best-effort; older firmware won't report it). + # If the library's parsed payload is missing path_hash_mode (e.g. stale + # .pyc on WSL2 Windows mounts), fall back to raw-frame extraction. + reader = mc._reader + _original_handle_rx = reader.handle_rx + _captured_frame: list[bytes] = [] + + async def _capture_handle_rx(data: bytearray) -> None: + from meshcore.packets import PacketType + + if len(data) > 0 and data[0] == PacketType.DEVICE_INFO.value: + _captured_frame.append(bytes(data)) + return await _original_handle_rx(data) + + reader.handle_rx = _capture_handle_rx + radio_manager.path_hash_mode = 0 + radio_manager.path_hash_mode_supported = False + try: + device_query = await mc.commands.send_device_query() + if device_query and "path_hash_mode" in device_query.payload: + radio_manager.path_hash_mode = device_query.payload["path_hash_mode"] + radio_manager.path_hash_mode_supported = True + elif _captured_frame: + # Raw-frame fallback: byte 1 = fw_ver, byte 81 = path_hash_mode + raw = _captured_frame[-1] + fw_ver = raw[1] if len(raw) > 1 else 0 + if fw_ver >= 10 and len(raw) >= 82: + radio_manager.path_hash_mode = raw[81] + radio_manager.path_hash_mode_supported = True + logger.warning( + "path_hash_mode=%d extracted from raw frame " + "(stale .pyc? try: rm %s)", + radio_manager.path_hash_mode, + getattr( + __import__("meshcore.reader", fromlist=["reader"]), + "__cached__", + "meshcore __pycache__/reader.*.pyc", + ), + ) + if radio_manager.path_hash_mode_supported: + logger.info("Path hash mode: %d (supported)", radio_manager.path_hash_mode) + else: + logger.debug("Firmware does not report path_hash_mode") + except Exception as exc: + logger.debug("Failed to query path_hash_mode: %s", exc) + finally: + reader.handle_rx = _original_handle_rx + + # Sync contacts/channels from radio to DB and clear radio + logger.info("Syncing and offloading radio data...") + result = await sync_and_offload_all(mc) + logger.info("Sync complete: %s", result) + + # Send advertisement to announce our presence (if enabled and not throttled) + if await send_advertisement(mc): + logger.info("Advertisement sent") + else: + logger.debug("Advertisement skipped (disabled or throttled)") + + # Drain any messages that were queued before we connected. + # This must happen BEFORE starting auto-fetch, otherwise both + # compete on get_msg() with interleaved radio I/O. + drained = await drain_pending_messages(mc) + if drained > 0: + logger.info("Drained %d pending message(s)", drained) + + await mc.start_auto_message_fetching() + logger.info("Auto message fetching started") + finally: + radio_manager._release_operation_lock("post_connect_setup") + + # Start background tasks AFTER releasing the operation lock. + # These tasks acquire their own locks when they need radio access. + start_periodic_sync() + start_periodic_advert() + start_message_polling() + + radio_manager._setup_complete = True + finally: + radio_manager._setup_in_progress = False + + logger.info("Post-connect setup complete") + + +async def prepare_connected_radio(radio_manager, *, broadcast_on_success: bool = True) -> None: + """Finish setup for an already-connected radio and optionally broadcast health.""" + from app.websocket import broadcast_health + + await radio_manager.post_connect_setup() + radio_manager._last_connected = True + if broadcast_on_success: + broadcast_health(True, radio_manager.connection_info) + + +async def reconnect_and_prepare_radio( + radio_manager, + *, + broadcast_on_success: bool = True, +) -> bool: + """Reconnect the transport, then run post-connect setup before reporting healthy.""" + connected = await radio_manager.reconnect(broadcast_on_success=False) + if not connected: + return False + + await prepare_connected_radio(radio_manager, broadcast_on_success=broadcast_on_success) + return True + + +async def connection_monitor_loop(radio_manager) -> None: + """Monitor radio health and keep transport/setup state converged.""" + from app.websocket import broadcast_health + + check_interval_seconds = 5 + unresponsive_threshold = 3 + consecutive_setup_failures = 0 + + while True: + try: + await asyncio.sleep(check_interval_seconds) + + current_connected = radio_manager.is_connected + + if radio_manager._last_connected and not current_connected: + logger.warning("Radio connection lost, broadcasting status change") + broadcast_health(False, radio_manager.connection_info) + radio_manager._last_connected = False + consecutive_setup_failures = 0 + + if not current_connected: + if not radio_manager.is_reconnecting and await reconnect_and_prepare_radio( + radio_manager, + broadcast_on_success=True, + ): + consecutive_setup_failures = 0 + + elif not radio_manager._last_connected and current_connected: + logger.info("Radio connection restored") + await prepare_connected_radio(radio_manager, broadcast_on_success=True) + consecutive_setup_failures = 0 + + elif current_connected and not radio_manager.is_setup_complete: + logger.info("Retrying post-connect setup...") + await prepare_connected_radio(radio_manager, broadcast_on_success=True) + consecutive_setup_failures = 0 + + except asyncio.CancelledError: + break + except Exception as e: + consecutive_setup_failures += 1 + if consecutive_setup_failures == unresponsive_threshold: + logger.error( + "Post-connect setup has failed %d times in a row. " + "The radio port appears open but the radio is not " + "responding to commands. Common causes: another " + "process has the serial port open (check for other " + "RemoteTerm instances, serial monitors, etc.), the " + "firmware is in repeater mode (not client), or the " + "radio needs a power cycle. Will keep retrying.", + consecutive_setup_failures, + ) + elif consecutive_setup_failures < unresponsive_threshold: + logger.exception("Error in connection monitor, continuing: %s", e) diff --git a/tests/test_radio_lifecycle_service.py b/tests/test_radio_lifecycle_service.py new file mode 100644 index 0000000..6b96234 --- /dev/null +++ b/tests/test_radio_lifecycle_service.py @@ -0,0 +1,84 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.services.radio_lifecycle import prepare_connected_radio, reconnect_and_prepare_radio + + +class TestPrepareConnectedRadio: + @pytest.mark.asyncio + async def test_runs_setup_then_broadcasts_health(self): + radio_manager = MagicMock() + radio_manager._last_connected = False + radio_manager.connection_info = "TCP: test:4000" + + call_order: list[str] = [] + + async def _setup(): + call_order.append("setup") + + radio_manager.post_connect_setup = AsyncMock(side_effect=_setup) + + with patch("app.websocket.broadcast_health") as mock_broadcast: + await prepare_connected_radio(radio_manager, broadcast_on_success=True) + + assert call_order == ["setup"] + assert radio_manager._last_connected is True + mock_broadcast.assert_called_once_with(True, "TCP: test:4000") + + @pytest.mark.asyncio + async def test_can_skip_broadcast(self): + radio_manager = MagicMock() + radio_manager._last_connected = False + radio_manager.connection_info = "TCP: test:4000" + radio_manager.post_connect_setup = AsyncMock() + + with patch("app.websocket.broadcast_health") as mock_broadcast: + await prepare_connected_radio(radio_manager, broadcast_on_success=False) + + assert radio_manager._last_connected is True + mock_broadcast.assert_not_called() + + +class TestReconnectAndPrepareRadio: + @pytest.mark.asyncio + async def test_reconnects_without_early_health_broadcast(self): + radio_manager = MagicMock() + radio_manager._last_connected = False + radio_manager.connection_info = "Serial: /dev/ttyUSB0" + + reconnect_calls: list[bool] = [] + call_order: list[str] = [] + + async def _reconnect(*, broadcast_on_success: bool): + reconnect_calls.append(broadcast_on_success) + call_order.append("reconnect") + return True + + async def _setup(): + call_order.append("setup") + + radio_manager.reconnect = AsyncMock(side_effect=_reconnect) + radio_manager.post_connect_setup = AsyncMock(side_effect=_setup) + + with patch("app.websocket.broadcast_health") as mock_broadcast: + result = await reconnect_and_prepare_radio(radio_manager, broadcast_on_success=True) + + assert result is True + assert reconnect_calls == [False] + assert call_order == ["reconnect", "setup"] + assert radio_manager._last_connected is True + mock_broadcast.assert_called_once_with(True, "Serial: /dev/ttyUSB0") + + @pytest.mark.asyncio + async def test_returns_false_without_running_setup_when_reconnect_fails(self): + radio_manager = MagicMock() + radio_manager.reconnect = AsyncMock(return_value=False) + radio_manager.post_connect_setup = AsyncMock() + + with patch("app.websocket.broadcast_health") as mock_broadcast: + result = await reconnect_and_prepare_radio(radio_manager, broadcast_on_success=True) + + assert result is False + radio_manager.post_connect_setup.assert_not_awaited() + mock_broadcast.assert_not_called()