import asyncio import glob import logging import platform from pathlib import Path from meshcore import MeshCore from app.config import settings logger = logging.getLogger(__name__) def detect_serial_devices() -> list[str]: """Detect available serial devices based on platform.""" devices: list[str] = [] system = platform.system() if system == "Darwin": # macOS: Use /dev/cu.* devices (callout devices, preferred over tty.*) patterns = [ "/dev/cu.usb*", "/dev/cu.wchusbserial*", "/dev/cu.SLAB_USBtoUART*", ] for pattern in patterns: devices.extend(glob.glob(pattern)) devices.sort() else: # Linux: Prefer /dev/serial/by-id/ for persistent naming by_id_path = Path("/dev/serial/by-id") if by_id_path.is_dir(): devices.extend(str(p) for p in by_id_path.iterdir()) # Also check /dev/ttyACM* and /dev/ttyUSB* as fallback resolved_paths = set() for dev in devices: try: resolved_paths.add(str(Path(dev).resolve())) except OSError: pass for pattern in ["/dev/ttyACM*", "/dev/ttyUSB*"]: for dev in glob.glob(pattern): try: if str(Path(dev).resolve()) not in resolved_paths: devices.append(dev) except OSError: devices.append(dev) devices.sort() return devices async def test_serial_device(port: str, baudrate: int, timeout: float = 3.0) -> bool: """Test if a MeshCore radio responds on the given serial port.""" try: logger.debug("Testing serial device %s", port) mc = await asyncio.wait_for( MeshCore.create_serial(port=port, baudrate=baudrate), timeout=timeout, ) # Check if we got valid self_info (indicates successful communication) if mc.is_connected and mc.self_info: logger.debug("Device %s responded with valid self_info", port) await mc.disconnect() return True await mc.disconnect() return False except asyncio.TimeoutError: logger.debug("Device %s timed out", port) return False except Exception as e: logger.debug("Device %s failed: %s", port, e) return False async def find_radio_port(baudrate: int) -> str | None: """Find the first serial port with a responding MeshCore radio.""" devices = detect_serial_devices() if not devices: logger.warning("No serial devices found") return None logger.info("Found %d serial device(s), testing for MeshCore radio...", len(devices)) for device in devices: if await test_serial_device(device, baudrate): logger.info("Found MeshCore radio at %s", device) return device logger.warning("No MeshCore radio found on any serial device") return None class RadioManager: """Manages the MeshCore radio connection.""" def __init__(self): self._meshcore: MeshCore | None = None self._port: str | None = None self._reconnect_task: asyncio.Task | None = None self._last_connected: bool = False self._reconnect_lock: asyncio.Lock | None = None async def post_connect_setup(self) -> None: """Full post-connection setup: handlers, key export, sync, advertisements, polling. Called after every successful connection or reconnection. Idempotent — safe to call repeatedly (periodic tasks have start guards). """ from app.event_handlers import register_event_handlers from app.keystore import export_and_store_private_key from app.radio_sync import ( drain_pending_messages, send_advertisement, start_message_polling, start_periodic_advert, start_periodic_sync, sync_and_offload_all, sync_radio_time, ) if not self._meshcore: return register_event_handlers(self._meshcore) await export_and_store_private_key(self._meshcore) # Sync radio clock with system time await sync_radio_time() # Sync contacts/channels from radio to DB and clear radio logger.info("Syncing and offloading radio data...") result = await sync_and_offload_all() logger.info("Sync complete: %s", result) # Start periodic sync (idempotent) start_periodic_sync() # Send advertisement to announce our presence (if enabled and not throttled) if await send_advertisement(): logger.info("Advertisement sent") else: logger.debug("Advertisement skipped (disabled or throttled)") # Start periodic advertisement (idempotent) start_periodic_advert() await self._meshcore.start_auto_message_fetching() logger.info("Auto message fetching started") # Drain any messages that were queued before we connected drained = await drain_pending_messages() if drained > 0: logger.info("Drained %d pending message(s)", drained) # Start periodic message polling as fallback (idempotent) start_message_polling() logger.info("Post-connect setup complete") @property def meshcore(self) -> MeshCore | None: return self._meshcore @property def port(self) -> str | None: return self._port @property def is_connected(self) -> bool: return self._meshcore is not None and self._meshcore.is_connected @property def is_reconnecting(self) -> bool: return self._reconnect_lock is not None and self._reconnect_lock.locked() async def connect(self) -> None: """Connect to the radio over serial.""" if self._meshcore is not None: await self.disconnect() port = settings.serial_port # Auto-detect if no port specified if not port: logger.info("No serial port specified, auto-detecting...") port = await find_radio_port(settings.serial_baudrate) if not port: raise RuntimeError("No MeshCore radio found. Please specify MESHCORE_SERIAL_PORT.") logger.debug( "Connecting to radio at %s (baud %d)", port, settings.serial_baudrate, ) self._meshcore = await MeshCore.create_serial( port=port, baudrate=settings.serial_baudrate, auto_reconnect=True, max_reconnect_attempts=10, ) self._port = port self._last_connected = True logger.debug("Serial connection established") async def disconnect(self) -> None: """Disconnect from the radio.""" if self._meshcore is not None: logger.debug("Disconnecting from radio") await self._meshcore.disconnect() self._meshcore = None logger.debug("Radio disconnected") async def reconnect(self) -> bool: """Attempt to reconnect to the radio. Returns True if reconnection was successful, False otherwise. Uses a lock to prevent concurrent reconnection attempts. """ from app.websocket import broadcast_error, broadcast_health # Lazily initialize lock (can't create in __init__ before event loop exists) if self._reconnect_lock is None: self._reconnect_lock = asyncio.Lock() # Try to acquire lock without blocking to check if reconnect is in progress if self._reconnect_lock.locked(): logger.debug("Reconnection already in progress") return False async with self._reconnect_lock: logger.info("Attempting to reconnect to radio...") try: # Disconnect if we have a stale connection if self._meshcore is not None: try: await self._meshcore.disconnect() except Exception: pass self._meshcore = None # Try to connect (will auto-detect if no port specified) await self.connect() if self.is_connected: logger.info("Radio reconnected successfully at %s", self._port) broadcast_health(True, self._port) return True else: logger.warning("Reconnection failed: not connected after connect()") return False except Exception as e: logger.warning("Reconnection failed: %s", e) broadcast_error("Reconnection failed", str(e)) return False async def start_connection_monitor(self) -> None: """Start background task to monitor connection and auto-reconnect.""" if self._reconnect_task is not None: return async def monitor_loop(): from app.websocket import broadcast_health while True: try: await asyncio.sleep(5) # Check every 5 seconds current_connected = self.is_connected # Detect status change if self._last_connected and not current_connected: # Connection lost logger.warning("Radio connection lost, broadcasting status change") broadcast_health(False, self._port) self._last_connected = False # Attempt reconnection await asyncio.sleep(3) # Wait a bit before trying if await self.reconnect(): await self.post_connect_setup() elif not self._last_connected and current_connected: # Connection restored (might have reconnected automatically) logger.info("Radio connection restored") broadcast_health(True, self._port) self._last_connected = True except asyncio.CancelledError: # Task is being cancelled, exit cleanly break except Exception as e: # Log error but continue monitoring - don't let the monitor die logger.exception("Error in connection monitor, continuing: %s", e) self._reconnect_task = asyncio.create_task(monitor_loop()) logger.info("Radio connection monitor started") async def stop_connection_monitor(self) -> None: """Stop the connection monitor task.""" if self._reconnect_task is not None: self._reconnect_task.cancel() try: await self._reconnect_task except asyncio.CancelledError: pass self._reconnect_task = None logger.info("Radio connection monitor stopped") radio_manager = RadioManager()