diff --git a/app/AGENTS.md b/app/AGENTS.md index 6021c85..a30c3f4 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -41,7 +41,7 @@ app/ │ ├── radio_lifecycle.py # Post-connect setup and reconnect/setup helpers │ ├── radio_commands.py # Radio config/private-key command workflows │ ├── radio_stats.py # In-memory local radio stats sampling and noise-floor history -│ └── radio_runtime.py # Router/dependency seam over the global RadioManager +│ └── radio_runtime.py # Explicit router/dependency seam over the live radio manager + runtime state ├── radio.py # RadioManager transport/session state + lock management ├── radio_sync.py # Polling, sync, periodic advertisement loop ├── decoder.py # Packet parsing/decryption @@ -95,7 +95,7 @@ app/ - `RadioManager.start_connection_monitor()` checks health every 5s. - `RadioManager.post_connect_setup()` delegates to `services/radio_lifecycle.py`. -- Routers, startup/lifespan code, fanout helpers, and `radio_sync.py` should reach radio state through `services/radio_runtime.py`, not by importing `app.radio.radio_manager` directly. +- Routers, startup/lifespan code, fanout helpers, and `radio_sync.py` should reach radio state through `services/radio_runtime.py`, not by importing `app.radio.radio_manager` directly. `RadioManager` owns transport/session operations; mutable runtime metadata and caches now live in its composed runtime-state object. - Shared reconnect/setup helpers in `services/radio_lifecycle.py` are used by startup, the monitor, and manual reconnect/reboot flows before broadcasting healthy state. - Setup still includes handler registration, key export, time sync, contact/channel sync, and advertisement tasks. The message-poll task always starts: by default it runs as a low-frequency hourly audit, and `MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK=true` switches it to aggressive 10-second polling. That audit checks both missed-radio-message drift and channel-slot cache drift; cache mismatches are logged, toasted, and the send-slot cache is reset. - Post-connect setup is timeout-bounded. If initial radio offload/setup hangs too long, the backend logs the failure and broadcasts an `error` toast telling the operator to reboot the radio and restart the server. diff --git a/app/radio.py b/app/radio.py index a9b19e2..3903375 100644 --- a/app/radio.py +++ b/app/radio.py @@ -3,7 +3,6 @@ import glob import logging import platform import re -from collections import OrderedDict from contextlib import asynccontextmanager, nullcontext from pathlib import Path @@ -12,22 +11,23 @@ from serial.serialutil import SerialException from app.config import settings from app.keystore import clear_keys +from app.radio_runtime_state import ( + RadioDisconnectedError, + RadioOperationBusyError, + RadioOperationError, + RadioRuntimeState, +) logger = logging.getLogger(__name__) MAX_FRONTEND_RECONNECT_ERROR_BROADCASTS = 3 _SERIAL_PORT_ERROR_RE = re.compile(r"could not open port (?P.+?):") - - -class RadioOperationError(RuntimeError): - """Base class for shared radio operation lock errors.""" - - -class RadioOperationBusyError(RadioOperationError): - """Raised when a non-blocking radio operation cannot acquire the lock.""" - - -class RadioDisconnectedError(RadioOperationError): - """Raised when the radio disconnects between pre-check and lock acquisition.""" +__all__ = [ + "RadioDisconnectedError", + "RadioManager", + "RadioOperationBusyError", + "RadioOperationError", + "radio_manager", +] def detect_serial_devices() -> list[str]: @@ -154,29 +154,189 @@ async def find_radio_port(baudrate: int) -> str | None: class RadioManager: """Manages the MeshCore radio connection.""" - def __init__(self): + def __init__(self, runtime_state: RadioRuntimeState | None = None): self._meshcore: MeshCore | None = None - self._connection_info: str | None = None - self._connection_desired: bool = True - self._reconnect_task: asyncio.Task | None = None - self._last_connected: bool = False - self._reconnect_lock: asyncio.Lock | None = None - self._operation_lock: asyncio.Lock | None = None - self._setup_lock: asyncio.Lock | None = None - self._setup_in_progress: bool = False - self._setup_complete: bool = False - self._frontend_reconnect_error_broadcasts: int = 0 - self.device_info_loaded: bool = False - self.max_contacts: int | None = None - self.device_model: str | None = None - self.firmware_build: str | None = None - self.firmware_version: str | None = None - self.max_channels: int = 40 - self.path_hash_mode: int = 0 - self.path_hash_mode_supported: bool = False - self._channel_slot_by_key: OrderedDict[str, int] = OrderedDict() - self._channel_key_by_slot: dict[int, str] = {} - self._pending_message_channel_key_by_slot: dict[int, str] = {} + self._state = runtime_state or RadioRuntimeState() + + @property + def state(self) -> RadioRuntimeState: + return self._state + + @property + def _connection_info(self) -> str | None: + return self._state.connection_info + + @_connection_info.setter + def _connection_info(self, value: str | None) -> None: + self._state.connection_info = value + + @property + def _connection_desired(self) -> bool: + return self._state.connection_desired + + @_connection_desired.setter + def _connection_desired(self, value: bool) -> None: + self._state.connection_desired = value + + @property + def _reconnect_task(self) -> asyncio.Task | None: + return self._state.reconnect_task + + @_reconnect_task.setter + def _reconnect_task(self, value: asyncio.Task | None) -> None: + self._state.reconnect_task = value + + @property + def _last_connected(self) -> bool: + return self._state.last_connected + + @_last_connected.setter + def _last_connected(self, value: bool) -> None: + self._state.last_connected = value + + @property + def _reconnect_lock(self) -> asyncio.Lock | None: + return self._state.reconnect_lock + + @_reconnect_lock.setter + def _reconnect_lock(self, value: asyncio.Lock | None) -> None: + self._state.reconnect_lock = value + + @property + def _operation_lock(self) -> asyncio.Lock | None: + return self._state.operation_lock + + @_operation_lock.setter + def _operation_lock(self, value: asyncio.Lock | None) -> None: + self._state.operation_lock = value + + @property + def _setup_lock(self) -> asyncio.Lock | None: + return self._state.setup_lock + + @_setup_lock.setter + def _setup_lock(self, value: asyncio.Lock | None) -> None: + self._state.setup_lock = value + + @property + def _setup_in_progress(self) -> bool: + return self._state.setup_in_progress + + @_setup_in_progress.setter + def _setup_in_progress(self, value: bool) -> None: + self._state.setup_in_progress = value + + @property + def _setup_complete(self) -> bool: + return self._state.setup_complete + + @_setup_complete.setter + def _setup_complete(self, value: bool) -> None: + self._state.setup_complete = value + + @property + def _frontend_reconnect_error_broadcasts(self) -> int: + return self._state.frontend_reconnect_error_broadcasts + + @_frontend_reconnect_error_broadcasts.setter + def _frontend_reconnect_error_broadcasts(self, value: int) -> None: + self._state.frontend_reconnect_error_broadcasts = value + + @property + def device_info_loaded(self) -> bool: + return self._state.device_info_loaded + + @device_info_loaded.setter + def device_info_loaded(self, value: bool) -> None: + self._state.device_info_loaded = value + + @property + def max_contacts(self) -> int | None: + return self._state.max_contacts + + @max_contacts.setter + def max_contacts(self, value: int | None) -> None: + self._state.max_contacts = value + + @property + def device_model(self) -> str | None: + return self._state.device_model + + @device_model.setter + def device_model(self, value: str | None) -> None: + self._state.device_model = value + + @property + def firmware_build(self) -> str | None: + return self._state.firmware_build + + @firmware_build.setter + def firmware_build(self, value: str | None) -> None: + self._state.firmware_build = value + + @property + def firmware_version(self) -> str | None: + return self._state.firmware_version + + @firmware_version.setter + def firmware_version(self, value: str | None) -> None: + self._state.firmware_version = value + + @property + def max_channels(self) -> int: + return self._state.max_channels + + @max_channels.setter + def max_channels(self, value: int) -> None: + self._state.max_channels = value + + @property + def path_hash_mode(self) -> int: + return self._state.path_hash_mode + + @path_hash_mode.setter + def path_hash_mode(self, value: int) -> None: + self._state.path_hash_mode = value + + @path_hash_mode.deleter + def path_hash_mode(self) -> None: + self._state.path_hash_mode = 0 + + @property + def path_hash_mode_supported(self) -> bool: + return self._state.path_hash_mode_supported + + @path_hash_mode_supported.setter + def path_hash_mode_supported(self, value: bool) -> None: + self._state.path_hash_mode_supported = value + + @path_hash_mode_supported.deleter + def path_hash_mode_supported(self) -> None: + self._state.path_hash_mode_supported = False + + @property + def _channel_slot_by_key(self): + return self._state.channel_slot_by_key + + @_channel_slot_by_key.setter + def _channel_slot_by_key(self, value) -> None: + self._state.channel_slot_by_key = value + + @property + def _channel_key_by_slot(self): + return self._state.channel_key_by_slot + + @_channel_key_by_slot.setter + def _channel_key_by_slot(self, value) -> None: + self._state.channel_key_by_slot = value + + @property + def _pending_message_channel_key_by_slot(self): + return self._state.pending_message_channel_key_by_slot + + @_pending_message_channel_key_by_slot.setter + def _pending_message_channel_key_by_slot(self, value) -> None: + self._state.pending_message_channel_key_by_slot = value async def _acquire_operation_lock( self, @@ -185,43 +345,23 @@ class RadioManager: blocking: bool, ) -> None: """Acquire the shared radio operation lock.""" - - if self._operation_lock is None: - self._operation_lock = asyncio.Lock() - - if not blocking: - if self._operation_lock.locked(): - raise RadioOperationBusyError(f"Radio is busy (operation: {name})") - # In single-threaded asyncio the lock cannot be acquired between the - # check above and the await below (no other coroutine runs until we - # yield). The await returns immediately for an uncontested lock. - await self._operation_lock.acquire() - else: - await self._operation_lock.acquire() - - logger.debug("Acquired radio operation lock (%s)", name) + await self._state.acquire_operation_lock(name, blocking=blocking) def _release_operation_lock(self, name: str) -> None: """Release the shared radio operation lock.""" - if self._operation_lock and self._operation_lock.locked(): - self._operation_lock.release() - logger.debug("Released radio operation lock (%s)", name) - else: - logger.error("Attempted to release unlocked radio operation lock (%s)", name) + self._state.release_operation_lock(name) + + async def acquire_operation_lock(self, name: str, *, blocking: bool = True) -> None: + """Acquire the shared radio operation lock.""" + await self._acquire_operation_lock(name, blocking=blocking) + + def release_operation_lock(self, name: str) -> None: + """Release the shared radio operation lock.""" + self._release_operation_lock(name) def _reset_connected_runtime_state(self) -> None: """Clear cached runtime state after a transport teardown completes.""" - self._setup_complete = False - self.device_info_loaded = False - self.max_contacts = None - self.device_model = None - self.firmware_build = None - self.firmware_version = None - self.max_channels = 40 - self.path_hash_mode = 0 - self.path_hash_mode_supported = False - self.reset_channel_send_cache() - self.clear_pending_message_channel_slots() + self._state.reset_connected_runtime_state() @asynccontextmanager async def radio_operation( diff --git a/app/radio_runtime_state.py b/app/radio_runtime_state.py new file mode 100644 index 0000000..1ab0829 --- /dev/null +++ b/app/radio_runtime_state.py @@ -0,0 +1,187 @@ +import asyncio +import logging +from collections import OrderedDict + +from app.config import settings + +logger = logging.getLogger(__name__) + + +class RadioOperationError(RuntimeError): + """Base class for shared radio operation lock errors.""" + + +class RadioOperationBusyError(RadioOperationError): + """Raised when a non-blocking radio operation cannot acquire the lock.""" + + +class RadioDisconnectedError(RadioOperationError): + """Raised when the radio disconnects between pre-check and lock acquisition.""" + + +class RadioRuntimeState: + """Mutable runtime state for one live radio session manager.""" + + def __init__(self) -> None: + self.connection_info: str | None = None + self.connection_desired: bool = True + self.reconnect_task: asyncio.Task | None = None + self.last_connected: bool = False + self.reconnect_lock: asyncio.Lock | None = None + self.operation_lock: asyncio.Lock | None = None + self.setup_lock: asyncio.Lock | None = None + self.setup_in_progress: bool = False + self.setup_complete: bool = False + self.frontend_reconnect_error_broadcasts: int = 0 + self.device_info_loaded: bool = False + self.max_contacts: int | None = None + self.device_model: str | None = None + self.firmware_build: str | None = None + self.firmware_version: str | None = None + self.max_channels: int = 40 + self.path_hash_mode: int = 0 + self.path_hash_mode_supported: bool = False + self.channel_slot_by_key: OrderedDict[str, int] = OrderedDict() + self.channel_key_by_slot: dict[int, str] = {} + self.pending_message_channel_key_by_slot: dict[int, str] = {} + + @property + def is_reconnecting(self) -> bool: + return self.reconnect_lock is not None and self.reconnect_lock.locked() + + async def acquire_operation_lock(self, name: str, *, blocking: bool) -> None: + if self.operation_lock is None: + self.operation_lock = asyncio.Lock() + + if not blocking: + if self.operation_lock.locked(): + raise RadioOperationBusyError(f"Radio is busy (operation: {name})") + # No coroutine can acquire the lock between the check above and + # this await because we have not yielded yet. + await self.operation_lock.acquire() + else: + await self.operation_lock.acquire() + + logger.debug("Acquired radio operation lock (%s)", name) + + def release_operation_lock(self, name: str) -> None: + if self.operation_lock and self.operation_lock.locked(): + self.operation_lock.release() + logger.debug("Released radio operation lock (%s)", name) + else: + logger.error("Attempted to release unlocked radio operation lock (%s)", name) + + def reset_connected_runtime_state(self) -> None: + self.setup_complete = False + self.device_info_loaded = False + self.max_contacts = None + self.device_model = None + self.firmware_build = None + self.firmware_version = None + self.max_channels = 40 + self.path_hash_mode = 0 + self.path_hash_mode_supported = False + self.reset_channel_send_cache() + self.clear_pending_message_channel_slots() + + def reset_channel_send_cache(self) -> None: + self.channel_slot_by_key.clear() + self.channel_key_by_slot.clear() + + def remember_pending_message_channel_slot(self, channel_key: str, slot: int) -> None: + self.pending_message_channel_key_by_slot[slot] = channel_key.upper() + + def get_pending_message_channel_key(self, slot: int) -> str | None: + return self.pending_message_channel_key_by_slot.get(slot) + + def clear_pending_message_channel_slots(self) -> None: + self.pending_message_channel_key_by_slot.clear() + + def channel_slot_reuse_enabled(self) -> bool: + if settings.force_channel_slot_reconfigure: + return False + if self.connection_info: + return not self.connection_info.startswith("TCP:") + return settings.connection_type != "tcp" + + def get_channel_send_cache_capacity(self) -> int: + try: + return max(1, int(self.max_channels)) + except (TypeError, ValueError): + return 1 + + def get_cached_channel_slot(self, channel_key: str) -> int | None: + return self.channel_slot_by_key.get(channel_key.upper()) + + def plan_channel_send_slot( + self, + channel_key: str, + *, + preferred_slot: int = 0, + ) -> tuple[int, bool, str | None]: + if not self.channel_slot_reuse_enabled(): + return preferred_slot, True, None + + normalized_key = channel_key.upper() + cached_slot = self.channel_slot_by_key.get(normalized_key) + if cached_slot is not None: + return cached_slot, False, None + + capacity = self.get_channel_send_cache_capacity() + if len(self.channel_slot_by_key) < capacity: + slot = self._find_first_free_channel_slot(capacity, preferred_slot) + return slot, True, None + + evicted_key, slot = next(iter(self.channel_slot_by_key.items())) + return slot, True, evicted_key + + def note_channel_slot_loaded(self, channel_key: str, slot: int) -> None: + if not self.channel_slot_reuse_enabled(): + return + + normalized_key = channel_key.upper() + previous_slot = self.channel_slot_by_key.pop(normalized_key, None) + if previous_slot is not None and previous_slot != slot: + self.channel_key_by_slot.pop(previous_slot, None) + + displaced_key = self.channel_key_by_slot.get(slot) + if displaced_key is not None and displaced_key != normalized_key: + self.channel_slot_by_key.pop(displaced_key, None) + + self.channel_key_by_slot[slot] = normalized_key + self.channel_slot_by_key[normalized_key] = slot + + def note_channel_slot_used(self, channel_key: str) -> None: + if not self.channel_slot_reuse_enabled(): + return + + normalized_key = channel_key.upper() + slot = self.channel_slot_by_key.get(normalized_key) + if slot is None: + return + self.channel_slot_by_key.move_to_end(normalized_key) + self.channel_key_by_slot[slot] = normalized_key + + def invalidate_cached_channel_slot(self, channel_key: str) -> None: + normalized_key = channel_key.upper() + slot = self.channel_slot_by_key.pop(normalized_key, None) + if slot is None: + return + if self.channel_key_by_slot.get(slot) == normalized_key: + self.channel_key_by_slot.pop(slot, None) + + def get_channel_send_cache_snapshot(self) -> list[tuple[str, int]]: + return list(self.channel_slot_by_key.items()) + + def reset_reconnect_error_broadcasts(self) -> None: + self.frontend_reconnect_error_broadcasts = 0 + + def _find_first_free_channel_slot(self, capacity: int, preferred_slot: int) -> int: + if preferred_slot < capacity and preferred_slot not in self.channel_key_by_slot: + return preferred_slot + + for slot in range(capacity): + if slot not in self.channel_key_by_slot: + return slot + + return preferred_slot diff --git a/app/services/radio_runtime.py b/app/services/radio_runtime.py index a5649c5..aebe360 100644 --- a/app/services/radio_runtime.py +++ b/app/services/radio_runtime.py @@ -1,8 +1,8 @@ -"""Shared access seam over the global RadioManager instance. +"""Shared access seam over the process-global radio runtime. -This module deliberately keeps behavior thin and forwarding-only. The goal is -to reduce direct `app.radio.radio_manager` imports across routers and helpers -without changing radio lifecycle, lock, or connection semantics. +The runtime object is the public boundary for application code. It exposes the +current manager plus its mutable session state through an explicit API instead +of forwarding arbitrary attribute access to the manager instance. """ from collections.abc import Callable @@ -15,7 +15,7 @@ import app.radio as radio_module class RadioRuntime: - """Thin forwarding wrapper around the process-global RadioManager.""" + """Explicit access seam over the process-global RadioManager.""" def __init__(self, manager_or_getter=None): if manager_or_getter is None: @@ -30,24 +30,90 @@ class RadioRuntime: return self._manager_getter() def __getattr__(self, name: str) -> Any: - """Forward unknown attributes to the current global manager.""" - return getattr(self.manager, name) + raise AttributeError( + f"{type(self).__name__!s} does not expose attribute {name!r}. " + "Use an explicit RadioRuntime property or method." + ) - @staticmethod - def _is_local_runtime_attr(name: str) -> bool: - return name.startswith("_") or hasattr(RadioRuntime, name) + @property + def state(self) -> Any: + return self.manager.state - def __setattr__(self, name: str, value: Any) -> None: - if self._is_local_runtime_attr(name): - object.__setattr__(self, name, value) - return - setattr(self.manager, name, value) + @property + def meshcore(self) -> Any: + return self.manager.meshcore - def __delattr__(self, name: str) -> None: - if self._is_local_runtime_attr(name): - object.__delattr__(self, name) - return - delattr(self.manager, name) + @property + def connection_info(self) -> str | None: + return self.manager.connection_info + + @property + def is_connected(self) -> bool: + return self.manager.is_connected + + @property + def is_reconnecting(self) -> bool: + return self.manager.is_reconnecting + + @property + def is_setup_in_progress(self) -> bool: + return self.manager.is_setup_in_progress + + @property + def is_setup_complete(self) -> bool: + return self.manager.is_setup_complete + + @property + def connection_desired(self) -> bool: + return self.manager.connection_desired + + @property + def max_contacts(self) -> int | None: + return self.state.max_contacts + + @max_contacts.setter + def max_contacts(self, value: int | None) -> None: + self.state.max_contacts = value + + @property + def max_channels(self) -> int: + return self.state.max_channels + + @max_channels.setter + def max_channels(self, value: int) -> None: + self.state.max_channels = value + + @property + def path_hash_mode(self) -> int: + return self.state.path_hash_mode + + @path_hash_mode.setter + def path_hash_mode(self, value: int) -> None: + self.state.path_hash_mode = value + + @property + def path_hash_mode_supported(self) -> bool: + return self.state.path_hash_mode_supported + + @path_hash_mode_supported.setter + def path_hash_mode_supported(self, value: bool) -> None: + self.state.path_hash_mode_supported = value + + @property + def device_info_loaded(self) -> bool: + return self.state.device_info_loaded + + @property + def device_model(self) -> str | None: + return self.state.device_model + + @property + def firmware_build(self) -> str | None: + return self.state.firmware_build + + @property + def firmware_version(self) -> str | None: + return self.state.firmware_version def require_connected(self): """Return MeshCore when available, mirroring existing HTTP semantics.""" @@ -89,5 +155,52 @@ class RadioRuntime: broadcast_on_success=broadcast_on_success, ) + def reset_channel_send_cache(self) -> None: + self.state.reset_channel_send_cache() + + def remember_pending_message_channel_slot(self, channel_key: str, slot: int) -> None: + self.state.remember_pending_message_channel_slot(channel_key, slot) + + def get_pending_message_channel_key(self, slot: int) -> str | None: + return self.state.get_pending_message_channel_key(slot) + + def clear_pending_message_channel_slots(self) -> None: + self.state.clear_pending_message_channel_slots() + + def channel_slot_reuse_enabled(self) -> bool: + return self.state.channel_slot_reuse_enabled() + + def get_channel_send_cache_capacity(self) -> int: + return self.state.get_channel_send_cache_capacity() + + def get_cached_channel_slot(self, channel_key: str) -> int | None: + return self.state.get_cached_channel_slot(channel_key) + + def plan_channel_send_slot( + self, + channel_key: str, + *, + preferred_slot: int = 0, + ) -> tuple[int, bool, str | None]: + return self.state.plan_channel_send_slot(channel_key, preferred_slot=preferred_slot) + + def note_channel_slot_loaded(self, channel_key: str, slot: int) -> None: + self.state.note_channel_slot_loaded(channel_key, slot) + + def note_channel_slot_used(self, channel_key: str) -> None: + self.state.note_channel_slot_used(channel_key) + + def invalidate_cached_channel_slot(self, channel_key: str) -> None: + self.state.invalidate_cached_channel_slot(channel_key) + + def get_channel_send_cache_snapshot(self) -> list[tuple[str, int]]: + return self.state.get_channel_send_cache_snapshot() + + def resume_connection(self) -> None: + self.manager.resume_connection() + + async def pause_connection(self) -> None: + await self.manager.pause_connection() + radio_runtime = RadioRuntime() diff --git a/tests/test_radio_runtime_service.py b/tests/test_radio_runtime_service.py index 41ae42e..cb6e5ed 100644 --- a/tests/test_radio_runtime_service.py +++ b/tests/test_radio_runtime_service.py @@ -4,6 +4,7 @@ from unittest.mock import AsyncMock import pytest from fastapi import HTTPException +from app.radio_runtime_state import RadioRuntimeState from app.services.radio_runtime import RadioRuntime @@ -114,3 +115,17 @@ async def test_lifecycle_passthrough_methods_delegate_to_current_manager(): manager.start_connection_monitor.assert_awaited_once() manager.stop_connection_monitor.assert_awaited_once() manager.disconnect.assert_awaited_once() + + +def test_explicit_runtime_state_api_replaces_attribute_forwarding(): + manager = _Manager(meshcore="meshcore", is_connected=True) + manager.state = RadioRuntimeState() + manager.state.path_hash_mode = 2 + runtime = RadioRuntime(manager) + + assert runtime.path_hash_mode == 2 + runtime.path_hash_mode = 1 + assert manager.state.path_hash_mode == 1 + + with pytest.raises(AttributeError, match="does not expose attribute"): + _ = runtime.some_random_attr