Compare commits

...

1 Commits

Author SHA1 Message Date
Jack Kingsman 0d6d287aa9 Overhaul radio responsibility 2026-04-10 16:48:20 -07:00
5 changed files with 543 additions and 88 deletions
+2 -2
View File
@@ -41,7 +41,7 @@ app/
│ ├── radio_lifecycle.py # Post-connect setup and reconnect/setup helpers
│ ├── radio_commands.py # Radio config/private-key command workflows
│ ├── radio_stats.py # In-memory local radio stats sampling and noise-floor history
│ └── radio_runtime.py # Router/dependency seam over the global RadioManager
│ └── radio_runtime.py # Explicit router/dependency seam over the live radio manager + runtime state
├── radio.py # RadioManager transport/session state + lock management
├── radio_sync.py # Polling, sync, periodic advertisement loop
├── decoder.py # Packet parsing/decryption
@@ -95,7 +95,7 @@ app/
- `RadioManager.start_connection_monitor()` checks health every 5s.
- `RadioManager.post_connect_setup()` delegates to `services/radio_lifecycle.py`.
- Routers, startup/lifespan code, fanout helpers, and `radio_sync.py` should reach radio state through `services/radio_runtime.py`, not by importing `app.radio.radio_manager` directly.
- Routers, startup/lifespan code, fanout helpers, and `radio_sync.py` should reach radio state through `services/radio_runtime.py`, not by importing `app.radio.radio_manager` directly. `RadioManager` owns transport/session operations; mutable runtime metadata and caches now live in its composed runtime-state object.
- Shared reconnect/setup helpers in `services/radio_lifecycle.py` are used by startup, the monitor, and manual reconnect/reboot flows before broadcasting healthy state.
- Setup still includes handler registration, key export, time sync, contact/channel sync, and advertisement tasks. The message-poll task always starts: by default it runs as a low-frequency hourly audit, and `MESHCORE_ENABLE_MESSAGE_POLL_FALLBACK=true` switches it to aggressive 10-second polling. That audit checks both missed-radio-message drift and channel-slot cache drift; cache mismatches are logged, toasted, and the send-slot cache is reset.
- Post-connect setup is timeout-bounded. If initial radio offload/setup hangs too long, the backend logs the failure and broadcasts an `error` toast telling the operator to reboot the radio and restart the server.
+206 -66
View File
@@ -3,7 +3,6 @@ import glob
import logging
import platform
import re
from collections import OrderedDict
from contextlib import asynccontextmanager, nullcontext
from pathlib import Path
@@ -12,22 +11,23 @@ from serial.serialutil import SerialException
from app.config import settings
from app.keystore import clear_keys
from app.radio_runtime_state import (
RadioDisconnectedError,
RadioOperationBusyError,
RadioOperationError,
RadioRuntimeState,
)
logger = logging.getLogger(__name__)
MAX_FRONTEND_RECONNECT_ERROR_BROADCASTS = 3
_SERIAL_PORT_ERROR_RE = re.compile(r"could not open port (?P<port>.+?):")
class RadioOperationError(RuntimeError):
"""Base class for shared radio operation lock errors."""
class RadioOperationBusyError(RadioOperationError):
"""Raised when a non-blocking radio operation cannot acquire the lock."""
class RadioDisconnectedError(RadioOperationError):
"""Raised when the radio disconnects between pre-check and lock acquisition."""
__all__ = [
"RadioDisconnectedError",
"RadioManager",
"RadioOperationBusyError",
"RadioOperationError",
"radio_manager",
]
def detect_serial_devices() -> list[str]:
@@ -154,29 +154,189 @@ async def find_radio_port(baudrate: int) -> str | None:
class RadioManager:
"""Manages the MeshCore radio connection."""
def __init__(self):
def __init__(self, runtime_state: RadioRuntimeState | None = None):
self._meshcore: MeshCore | None = None
self._connection_info: str | None = None
self._connection_desired: bool = True
self._reconnect_task: asyncio.Task | None = None
self._last_connected: bool = False
self._reconnect_lock: asyncio.Lock | None = None
self._operation_lock: asyncio.Lock | None = None
self._setup_lock: asyncio.Lock | None = None
self._setup_in_progress: bool = False
self._setup_complete: bool = False
self._frontend_reconnect_error_broadcasts: int = 0
self.device_info_loaded: bool = False
self.max_contacts: int | None = None
self.device_model: str | None = None
self.firmware_build: str | None = None
self.firmware_version: str | None = None
self.max_channels: int = 40
self.path_hash_mode: int = 0
self.path_hash_mode_supported: bool = False
self._channel_slot_by_key: OrderedDict[str, int] = OrderedDict()
self._channel_key_by_slot: dict[int, str] = {}
self._pending_message_channel_key_by_slot: dict[int, str] = {}
self._state = runtime_state or RadioRuntimeState()
@property
def state(self) -> RadioRuntimeState:
return self._state
@property
def _connection_info(self) -> str | None:
return self._state.connection_info
@_connection_info.setter
def _connection_info(self, value: str | None) -> None:
self._state.connection_info = value
@property
def _connection_desired(self) -> bool:
return self._state.connection_desired
@_connection_desired.setter
def _connection_desired(self, value: bool) -> None:
self._state.connection_desired = value
@property
def _reconnect_task(self) -> asyncio.Task | None:
return self._state.reconnect_task
@_reconnect_task.setter
def _reconnect_task(self, value: asyncio.Task | None) -> None:
self._state.reconnect_task = value
@property
def _last_connected(self) -> bool:
return self._state.last_connected
@_last_connected.setter
def _last_connected(self, value: bool) -> None:
self._state.last_connected = value
@property
def _reconnect_lock(self) -> asyncio.Lock | None:
return self._state.reconnect_lock
@_reconnect_lock.setter
def _reconnect_lock(self, value: asyncio.Lock | None) -> None:
self._state.reconnect_lock = value
@property
def _operation_lock(self) -> asyncio.Lock | None:
return self._state.operation_lock
@_operation_lock.setter
def _operation_lock(self, value: asyncio.Lock | None) -> None:
self._state.operation_lock = value
@property
def _setup_lock(self) -> asyncio.Lock | None:
return self._state.setup_lock
@_setup_lock.setter
def _setup_lock(self, value: asyncio.Lock | None) -> None:
self._state.setup_lock = value
@property
def _setup_in_progress(self) -> bool:
return self._state.setup_in_progress
@_setup_in_progress.setter
def _setup_in_progress(self, value: bool) -> None:
self._state.setup_in_progress = value
@property
def _setup_complete(self) -> bool:
return self._state.setup_complete
@_setup_complete.setter
def _setup_complete(self, value: bool) -> None:
self._state.setup_complete = value
@property
def _frontend_reconnect_error_broadcasts(self) -> int:
return self._state.frontend_reconnect_error_broadcasts
@_frontend_reconnect_error_broadcasts.setter
def _frontend_reconnect_error_broadcasts(self, value: int) -> None:
self._state.frontend_reconnect_error_broadcasts = value
@property
def device_info_loaded(self) -> bool:
return self._state.device_info_loaded
@device_info_loaded.setter
def device_info_loaded(self, value: bool) -> None:
self._state.device_info_loaded = value
@property
def max_contacts(self) -> int | None:
return self._state.max_contacts
@max_contacts.setter
def max_contacts(self, value: int | None) -> None:
self._state.max_contacts = value
@property
def device_model(self) -> str | None:
return self._state.device_model
@device_model.setter
def device_model(self, value: str | None) -> None:
self._state.device_model = value
@property
def firmware_build(self) -> str | None:
return self._state.firmware_build
@firmware_build.setter
def firmware_build(self, value: str | None) -> None:
self._state.firmware_build = value
@property
def firmware_version(self) -> str | None:
return self._state.firmware_version
@firmware_version.setter
def firmware_version(self, value: str | None) -> None:
self._state.firmware_version = value
@property
def max_channels(self) -> int:
return self._state.max_channels
@max_channels.setter
def max_channels(self, value: int) -> None:
self._state.max_channels = value
@property
def path_hash_mode(self) -> int:
return self._state.path_hash_mode
@path_hash_mode.setter
def path_hash_mode(self, value: int) -> None:
self._state.path_hash_mode = value
@path_hash_mode.deleter
def path_hash_mode(self) -> None:
self._state.path_hash_mode = 0
@property
def path_hash_mode_supported(self) -> bool:
return self._state.path_hash_mode_supported
@path_hash_mode_supported.setter
def path_hash_mode_supported(self, value: bool) -> None:
self._state.path_hash_mode_supported = value
@path_hash_mode_supported.deleter
def path_hash_mode_supported(self) -> None:
self._state.path_hash_mode_supported = False
@property
def _channel_slot_by_key(self):
return self._state.channel_slot_by_key
@_channel_slot_by_key.setter
def _channel_slot_by_key(self, value) -> None:
self._state.channel_slot_by_key = value
@property
def _channel_key_by_slot(self):
return self._state.channel_key_by_slot
@_channel_key_by_slot.setter
def _channel_key_by_slot(self, value) -> None:
self._state.channel_key_by_slot = value
@property
def _pending_message_channel_key_by_slot(self):
return self._state.pending_message_channel_key_by_slot
@_pending_message_channel_key_by_slot.setter
def _pending_message_channel_key_by_slot(self, value) -> None:
self._state.pending_message_channel_key_by_slot = value
async def _acquire_operation_lock(
self,
@@ -185,43 +345,23 @@ class RadioManager:
blocking: bool,
) -> None:
"""Acquire the shared radio operation lock."""
if self._operation_lock is None:
self._operation_lock = asyncio.Lock()
if not blocking:
if self._operation_lock.locked():
raise RadioOperationBusyError(f"Radio is busy (operation: {name})")
# In single-threaded asyncio the lock cannot be acquired between the
# check above and the await below (no other coroutine runs until we
# yield). The await returns immediately for an uncontested lock.
await self._operation_lock.acquire()
else:
await self._operation_lock.acquire()
logger.debug("Acquired radio operation lock (%s)", name)
await self._state.acquire_operation_lock(name, blocking=blocking)
def _release_operation_lock(self, name: str) -> None:
"""Release the shared radio operation lock."""
if self._operation_lock and self._operation_lock.locked():
self._operation_lock.release()
logger.debug("Released radio operation lock (%s)", name)
else:
logger.error("Attempted to release unlocked radio operation lock (%s)", name)
self._state.release_operation_lock(name)
async def acquire_operation_lock(self, name: str, *, blocking: bool = True) -> None:
"""Acquire the shared radio operation lock."""
await self._acquire_operation_lock(name, blocking=blocking)
def release_operation_lock(self, name: str) -> None:
"""Release the shared radio operation lock."""
self._release_operation_lock(name)
def _reset_connected_runtime_state(self) -> None:
"""Clear cached runtime state after a transport teardown completes."""
self._setup_complete = False
self.device_info_loaded = False
self.max_contacts = None
self.device_model = None
self.firmware_build = None
self.firmware_version = None
self.max_channels = 40
self.path_hash_mode = 0
self.path_hash_mode_supported = False
self.reset_channel_send_cache()
self.clear_pending_message_channel_slots()
self._state.reset_connected_runtime_state()
@asynccontextmanager
async def radio_operation(
+187
View File
@@ -0,0 +1,187 @@
import asyncio
import logging
from collections import OrderedDict
from app.config import settings
logger = logging.getLogger(__name__)
class RadioOperationError(RuntimeError):
"""Base class for shared radio operation lock errors."""
class RadioOperationBusyError(RadioOperationError):
"""Raised when a non-blocking radio operation cannot acquire the lock."""
class RadioDisconnectedError(RadioOperationError):
"""Raised when the radio disconnects between pre-check and lock acquisition."""
class RadioRuntimeState:
"""Mutable runtime state for one live radio session manager."""
def __init__(self) -> None:
self.connection_info: str | None = None
self.connection_desired: bool = True
self.reconnect_task: asyncio.Task | None = None
self.last_connected: bool = False
self.reconnect_lock: asyncio.Lock | None = None
self.operation_lock: asyncio.Lock | None = None
self.setup_lock: asyncio.Lock | None = None
self.setup_in_progress: bool = False
self.setup_complete: bool = False
self.frontend_reconnect_error_broadcasts: int = 0
self.device_info_loaded: bool = False
self.max_contacts: int | None = None
self.device_model: str | None = None
self.firmware_build: str | None = None
self.firmware_version: str | None = None
self.max_channels: int = 40
self.path_hash_mode: int = 0
self.path_hash_mode_supported: bool = False
self.channel_slot_by_key: OrderedDict[str, int] = OrderedDict()
self.channel_key_by_slot: dict[int, str] = {}
self.pending_message_channel_key_by_slot: dict[int, str] = {}
@property
def is_reconnecting(self) -> bool:
return self.reconnect_lock is not None and self.reconnect_lock.locked()
async def acquire_operation_lock(self, name: str, *, blocking: bool) -> None:
if self.operation_lock is None:
self.operation_lock = asyncio.Lock()
if not blocking:
if self.operation_lock.locked():
raise RadioOperationBusyError(f"Radio is busy (operation: {name})")
# No coroutine can acquire the lock between the check above and
# this await because we have not yielded yet.
await self.operation_lock.acquire()
else:
await self.operation_lock.acquire()
logger.debug("Acquired radio operation lock (%s)", name)
def release_operation_lock(self, name: str) -> None:
if self.operation_lock and self.operation_lock.locked():
self.operation_lock.release()
logger.debug("Released radio operation lock (%s)", name)
else:
logger.error("Attempted to release unlocked radio operation lock (%s)", name)
def reset_connected_runtime_state(self) -> None:
self.setup_complete = False
self.device_info_loaded = False
self.max_contacts = None
self.device_model = None
self.firmware_build = None
self.firmware_version = None
self.max_channels = 40
self.path_hash_mode = 0
self.path_hash_mode_supported = False
self.reset_channel_send_cache()
self.clear_pending_message_channel_slots()
def reset_channel_send_cache(self) -> None:
self.channel_slot_by_key.clear()
self.channel_key_by_slot.clear()
def remember_pending_message_channel_slot(self, channel_key: str, slot: int) -> None:
self.pending_message_channel_key_by_slot[slot] = channel_key.upper()
def get_pending_message_channel_key(self, slot: int) -> str | None:
return self.pending_message_channel_key_by_slot.get(slot)
def clear_pending_message_channel_slots(self) -> None:
self.pending_message_channel_key_by_slot.clear()
def channel_slot_reuse_enabled(self) -> bool:
if settings.force_channel_slot_reconfigure:
return False
if self.connection_info:
return not self.connection_info.startswith("TCP:")
return settings.connection_type != "tcp"
def get_channel_send_cache_capacity(self) -> int:
try:
return max(1, int(self.max_channels))
except (TypeError, ValueError):
return 1
def get_cached_channel_slot(self, channel_key: str) -> int | None:
return self.channel_slot_by_key.get(channel_key.upper())
def plan_channel_send_slot(
self,
channel_key: str,
*,
preferred_slot: int = 0,
) -> tuple[int, bool, str | None]:
if not self.channel_slot_reuse_enabled():
return preferred_slot, True, None
normalized_key = channel_key.upper()
cached_slot = self.channel_slot_by_key.get(normalized_key)
if cached_slot is not None:
return cached_slot, False, None
capacity = self.get_channel_send_cache_capacity()
if len(self.channel_slot_by_key) < capacity:
slot = self._find_first_free_channel_slot(capacity, preferred_slot)
return slot, True, None
evicted_key, slot = next(iter(self.channel_slot_by_key.items()))
return slot, True, evicted_key
def note_channel_slot_loaded(self, channel_key: str, slot: int) -> None:
if not self.channel_slot_reuse_enabled():
return
normalized_key = channel_key.upper()
previous_slot = self.channel_slot_by_key.pop(normalized_key, None)
if previous_slot is not None and previous_slot != slot:
self.channel_key_by_slot.pop(previous_slot, None)
displaced_key = self.channel_key_by_slot.get(slot)
if displaced_key is not None and displaced_key != normalized_key:
self.channel_slot_by_key.pop(displaced_key, None)
self.channel_key_by_slot[slot] = normalized_key
self.channel_slot_by_key[normalized_key] = slot
def note_channel_slot_used(self, channel_key: str) -> None:
if not self.channel_slot_reuse_enabled():
return
normalized_key = channel_key.upper()
slot = self.channel_slot_by_key.get(normalized_key)
if slot is None:
return
self.channel_slot_by_key.move_to_end(normalized_key)
self.channel_key_by_slot[slot] = normalized_key
def invalidate_cached_channel_slot(self, channel_key: str) -> None:
normalized_key = channel_key.upper()
slot = self.channel_slot_by_key.pop(normalized_key, None)
if slot is None:
return
if self.channel_key_by_slot.get(slot) == normalized_key:
self.channel_key_by_slot.pop(slot, None)
def get_channel_send_cache_snapshot(self) -> list[tuple[str, int]]:
return list(self.channel_slot_by_key.items())
def reset_reconnect_error_broadcasts(self) -> None:
self.frontend_reconnect_error_broadcasts = 0
def _find_first_free_channel_slot(self, capacity: int, preferred_slot: int) -> int:
if preferred_slot < capacity and preferred_slot not in self.channel_key_by_slot:
return preferred_slot
for slot in range(capacity):
if slot not in self.channel_key_by_slot:
return slot
return preferred_slot
+133 -20
View File
@@ -1,8 +1,8 @@
"""Shared access seam over the global RadioManager instance.
"""Shared access seam over the process-global radio runtime.
This module deliberately keeps behavior thin and forwarding-only. The goal is
to reduce direct `app.radio.radio_manager` imports across routers and helpers
without changing radio lifecycle, lock, or connection semantics.
The runtime object is the public boundary for application code. It exposes the
current manager plus its mutable session state through an explicit API instead
of forwarding arbitrary attribute access to the manager instance.
"""
from collections.abc import Callable
@@ -15,7 +15,7 @@ import app.radio as radio_module
class RadioRuntime:
"""Thin forwarding wrapper around the process-global RadioManager."""
"""Explicit access seam over the process-global RadioManager."""
def __init__(self, manager_or_getter=None):
if manager_or_getter is None:
@@ -30,24 +30,90 @@ class RadioRuntime:
return self._manager_getter()
def __getattr__(self, name: str) -> Any:
"""Forward unknown attributes to the current global manager."""
return getattr(self.manager, name)
raise AttributeError(
f"{type(self).__name__!s} does not expose attribute {name!r}. "
"Use an explicit RadioRuntime property or method."
)
@staticmethod
def _is_local_runtime_attr(name: str) -> bool:
return name.startswith("_") or hasattr(RadioRuntime, name)
@property
def state(self) -> Any:
return self.manager.state
def __setattr__(self, name: str, value: Any) -> None:
if self._is_local_runtime_attr(name):
object.__setattr__(self, name, value)
return
setattr(self.manager, name, value)
@property
def meshcore(self) -> Any:
return self.manager.meshcore
def __delattr__(self, name: str) -> None:
if self._is_local_runtime_attr(name):
object.__delattr__(self, name)
return
delattr(self.manager, name)
@property
def connection_info(self) -> str | None:
return self.manager.connection_info
@property
def is_connected(self) -> bool:
return self.manager.is_connected
@property
def is_reconnecting(self) -> bool:
return self.manager.is_reconnecting
@property
def is_setup_in_progress(self) -> bool:
return self.manager.is_setup_in_progress
@property
def is_setup_complete(self) -> bool:
return self.manager.is_setup_complete
@property
def connection_desired(self) -> bool:
return self.manager.connection_desired
@property
def max_contacts(self) -> int | None:
return self.state.max_contacts
@max_contacts.setter
def max_contacts(self, value: int | None) -> None:
self.state.max_contacts = value
@property
def max_channels(self) -> int:
return self.state.max_channels
@max_channels.setter
def max_channels(self, value: int) -> None:
self.state.max_channels = value
@property
def path_hash_mode(self) -> int:
return self.state.path_hash_mode
@path_hash_mode.setter
def path_hash_mode(self, value: int) -> None:
self.state.path_hash_mode = value
@property
def path_hash_mode_supported(self) -> bool:
return self.state.path_hash_mode_supported
@path_hash_mode_supported.setter
def path_hash_mode_supported(self, value: bool) -> None:
self.state.path_hash_mode_supported = value
@property
def device_info_loaded(self) -> bool:
return self.state.device_info_loaded
@property
def device_model(self) -> str | None:
return self.state.device_model
@property
def firmware_build(self) -> str | None:
return self.state.firmware_build
@property
def firmware_version(self) -> str | None:
return self.state.firmware_version
def require_connected(self):
"""Return MeshCore when available, mirroring existing HTTP semantics."""
@@ -89,5 +155,52 @@ class RadioRuntime:
broadcast_on_success=broadcast_on_success,
)
def reset_channel_send_cache(self) -> None:
self.state.reset_channel_send_cache()
def remember_pending_message_channel_slot(self, channel_key: str, slot: int) -> None:
self.state.remember_pending_message_channel_slot(channel_key, slot)
def get_pending_message_channel_key(self, slot: int) -> str | None:
return self.state.get_pending_message_channel_key(slot)
def clear_pending_message_channel_slots(self) -> None:
self.state.clear_pending_message_channel_slots()
def channel_slot_reuse_enabled(self) -> bool:
return self.state.channel_slot_reuse_enabled()
def get_channel_send_cache_capacity(self) -> int:
return self.state.get_channel_send_cache_capacity()
def get_cached_channel_slot(self, channel_key: str) -> int | None:
return self.state.get_cached_channel_slot(channel_key)
def plan_channel_send_slot(
self,
channel_key: str,
*,
preferred_slot: int = 0,
) -> tuple[int, bool, str | None]:
return self.state.plan_channel_send_slot(channel_key, preferred_slot=preferred_slot)
def note_channel_slot_loaded(self, channel_key: str, slot: int) -> None:
self.state.note_channel_slot_loaded(channel_key, slot)
def note_channel_slot_used(self, channel_key: str) -> None:
self.state.note_channel_slot_used(channel_key)
def invalidate_cached_channel_slot(self, channel_key: str) -> None:
self.state.invalidate_cached_channel_slot(channel_key)
def get_channel_send_cache_snapshot(self) -> list[tuple[str, int]]:
return self.state.get_channel_send_cache_snapshot()
def resume_connection(self) -> None:
self.manager.resume_connection()
async def pause_connection(self) -> None:
await self.manager.pause_connection()
radio_runtime = RadioRuntime()
+15
View File
@@ -4,6 +4,7 @@ from unittest.mock import AsyncMock
import pytest
from fastapi import HTTPException
from app.radio_runtime_state import RadioRuntimeState
from app.services.radio_runtime import RadioRuntime
@@ -114,3 +115,17 @@ async def test_lifecycle_passthrough_methods_delegate_to_current_manager():
manager.start_connection_monitor.assert_awaited_once()
manager.stop_connection_monitor.assert_awaited_once()
manager.disconnect.assert_awaited_once()
def test_explicit_runtime_state_api_replaces_attribute_forwarding():
manager = _Manager(meshcore="meshcore", is_connected=True)
manager.state = RadioRuntimeState()
manager.state.path_hash_mode = 2
runtime = RadioRuntime(manager)
assert runtime.path_hash_mode == 2
runtime.path_hash_mode = 1
assert manager.state.path_hash_mode == 1
with pytest.raises(AttributeError, match="does not expose attribute"):
_ = runtime.some_random_attr