diff --git a/app/config.py b/app/config.py
index 1ef9197..39cbd9a 100644
--- a/app/config.py
+++ b/app/config.py
@@ -35,6 +35,11 @@ class Settings(BaseSettings):
raise ValueError("MESHCORE_BLE_PIN is required when MESHCORE_BLE_ADDRESS is set.")
return self
+ @property
+ def loopback_eligible(self) -> bool:
+ """True when no explicit transport env var is set."""
+ return not self.serial_port and not self.tcp_host and not self.ble_address
+
@property
def connection_type(self) -> Literal["serial", "tcp", "ble"]:
if self.tcp_host:
diff --git a/app/loopback.py b/app/loopback.py
new file mode 100644
index 0000000..57085d1
--- /dev/null
+++ b/app/loopback.py
@@ -0,0 +1,125 @@
+"""Loopback transport: bridges a browser-side serial/BLE connection over WebSocket."""
+
+import asyncio
+import logging
+from typing import Any, Literal
+
+from starlette.websockets import WebSocket, WebSocketState
+
+logger = logging.getLogger(__name__)
+
+
+class LoopbackTransport:
+ """ConnectionProtocol implementation that tunnels bytes over a WebSocket.
+
+ For serial mode, applies the same 0x3c + 2-byte LE size framing that
+ meshcore's SerialConnection uses. For BLE mode, passes raw bytes through
+ (matching BLEConnection behaviour).
+ """
+
+ def __init__(self, websocket: WebSocket, mode: Literal["serial", "ble"]) -> None:
+ self._ws = websocket
+ self._mode = mode
+ self._reader: Any = None
+ self._disconnect_callback: Any = None
+
+ # Serial framing state (mirrors meshcore serial_cx.py handle_rx)
+ self._header = b""
+ self._inframe = b""
+ self._frame_started = False
+ self._frame_size = 0
+
+ # -- ConnectionProtocol methods ------------------------------------------
+
+ async def connect(self) -> str:
+ """No-op — the WebSocket is already established."""
+ info = f"Loopback ({self._mode})"
+ logger.info("Loopback transport connected: %s", info)
+ return info
+
+ async def disconnect(self) -> None:
+ """Ask the browser to release the hardware and close the WS."""
+ try:
+ if self._ws.client_state == WebSocketState.CONNECTED:
+ await self._ws.send_json({"type": "disconnect"})
+ except Exception:
+ pass # WS may already be closed
+
+ async def send(self, data: Any) -> None:
+ """Send data to the browser (which writes it to the physical radio).
+
+ Serial mode: prepend 0x3c + 2-byte LE size header.
+ BLE mode: send raw bytes.
+ """
+ try:
+ if self._ws.client_state != WebSocketState.CONNECTED:
+ return
+ if self._mode == "serial":
+ size = len(data)
+ pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + bytes(data)
+ await self._ws.send_bytes(pkt)
+ else:
+ await self._ws.send_bytes(bytes(data))
+ except Exception as e:
+ logger.debug("Loopback send error: %s", e)
+
+ def set_reader(self, reader: Any) -> None:
+ self._reader = reader
+
+ def set_disconnect_callback(self, callback: Any) -> None:
+ self._disconnect_callback = callback
+
+ # -- Incoming data from browser ------------------------------------------
+
+ def handle_rx(self, data: bytes) -> None:
+ """Process bytes received from the browser.
+
+ Serial mode: accumulate bytes, strip framing, deliver payload.
+ BLE mode: deliver raw bytes directly.
+ """
+ if self._mode == "serial":
+ self._handle_rx_serial(data)
+ else:
+ self._handle_rx_ble(data)
+
+ def _handle_rx_ble(self, data: bytes) -> None:
+ if self._reader is not None:
+ asyncio.create_task(self._reader.handle_rx(data))
+
+ def _handle_rx_serial(self, data: bytes) -> None:
+ """Mirror meshcore's SerialConnection.handle_rx state machine."""
+ raw = bytes(data)
+ headerlen = len(self._header)
+
+ if not self._frame_started:
+ if len(raw) >= 3 - headerlen:
+ self._header = self._header + raw[: 3 - headerlen]
+ self._frame_started = True
+ self._frame_size = int.from_bytes(self._header[1:], byteorder="little")
+ remainder = raw[3 - headerlen :]
+ # Reset header for next frame
+ self._header = b""
+ if remainder:
+ self._handle_rx_serial(remainder)
+ else:
+ self._header = self._header + raw
+ else:
+ framelen = len(self._inframe)
+ if framelen + len(raw) < self._frame_size:
+ self._inframe = self._inframe + raw
+ else:
+ self._inframe = self._inframe + raw[: self._frame_size - framelen]
+ if self._reader is not None:
+ asyncio.create_task(self._reader.handle_rx(self._inframe))
+ remainder = raw[self._frame_size - framelen :]
+ self._frame_started = False
+ self._inframe = b""
+ if remainder:
+ self._handle_rx_serial(remainder)
+
+ def reset_framing(self) -> None:
+ """Reset the serial framing state machine."""
+ self._header = b""
+ self._inframe = b""
+ self._frame_started = False
+ self._frame_size = 0
diff --git a/app/main.py b/app/main.py
index ee7e4c2..60b903c 100644
--- a/app/main.py
+++ b/app/main.py
@@ -19,6 +19,7 @@ from app.routers import (
channels,
contacts,
health,
+ loopback,
messages,
packets,
radio,
@@ -126,6 +127,7 @@ app.include_router(read_state.router, prefix="/api")
app.include_router(settings.router, prefix="/api")
app.include_router(statistics.router, prefix="/api")
app.include_router(ws.router, prefix="/api")
+app.include_router(loopback.router, prefix="/api")
# Serve frontend static files in production
FRONTEND_DIR = Path(__file__).parent.parent / "frontend" / "dist"
diff --git a/app/radio.py b/app/radio.py
index bb5fbe3..368351a 100644
--- a/app/radio.py
+++ b/app/radio.py
@@ -128,6 +128,7 @@ class RadioManager:
self._setup_lock: asyncio.Lock | None = None
self._setup_in_progress: bool = False
self._setup_complete: bool = False
+ self._loopback_active: bool = False
async def _acquire_operation_lock(
self,
@@ -317,6 +318,36 @@ class RadioManager:
def is_setup_complete(self) -> bool:
return self._setup_complete
+ @property
+ def loopback_active(self) -> bool:
+ return self._loopback_active
+
+ def connect_loopback(self, mc: MeshCore, connection_info: str) -> None:
+ """Adopt a MeshCore instance created by the loopback WebSocket endpoint."""
+ self._meshcore = mc
+ self._connection_info = connection_info
+ self._loopback_active = True
+ self._last_connected = True
+ self._setup_complete = False
+
+ async def disconnect_loopback(self) -> None:
+ """Tear down a loopback session and resume normal auto-detect."""
+ from app.websocket import broadcast_health
+
+ mc = self._meshcore
+ self._meshcore = None
+ self._loopback_active = False
+ self._connection_info = None
+ self._setup_complete = False
+
+ if mc is not None:
+ try:
+ await mc.disconnect()
+ except Exception:
+ pass
+
+ broadcast_health(False, None)
+
async def connect(self) -> None:
"""Connect to the radio using the configured transport."""
if self._meshcore is not None:
@@ -461,6 +492,10 @@ class RadioManager:
try:
await asyncio.sleep(CHECK_INTERVAL_SECONDS)
+ # Skip auto-detect/reconnect while loopback is active
+ if self._loopback_active:
+ continue
+
current_connected = self.is_connected
# Detect status change
diff --git a/app/routers/health.py b/app/routers/health.py
index ab43c20..78e0fb1 100644
--- a/app/routers/health.py
+++ b/app/routers/health.py
@@ -17,6 +17,7 @@ class HealthResponse(BaseModel):
database_size_mb: float
oldest_undecrypted_timestamp: int | None
mqtt_status: str | None = None
+ loopback_eligible: bool = False
async def build_health_data(radio_connected: bool, connection_info: str | None) -> dict:
@@ -53,6 +54,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None)
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
"mqtt_status": mqtt_status,
+ "loopback_eligible": settings.loopback_eligible,
}
diff --git a/app/routers/loopback.py b/app/routers/loopback.py
new file mode 100644
index 0000000..771c048
--- /dev/null
+++ b/app/routers/loopback.py
@@ -0,0 +1,116 @@
+"""WebSocket endpoint for loopback transport (browser-bridged radio connection)."""
+
+import asyncio
+import json
+import logging
+
+from fastapi import APIRouter, WebSocket, WebSocketDisconnect
+from starlette.websockets import WebSocketState
+
+from app.config import settings
+from app.loopback import LoopbackTransport
+from app.radio import radio_manager
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter()
+
+
+@router.websocket("/ws/transport")
+async def loopback_transport(websocket: WebSocket) -> None:
+ """Bridge a browser-side serial/BLE connection to the backend MeshCore stack.
+
+ Protocol:
+ 1. Client sends init JSON: {"type": "init", "mode": "serial"|"ble"}
+ 2. Binary frames flow bidirectionally (raw bytes for BLE, framed for serial)
+ 3. Either side can send {"type": "disconnect"} to tear down
+ """
+ # Guard: reject if an explicit transport is configured via env vars
+ if not settings.loopback_eligible:
+ await websocket.accept()
+ await websocket.close(code=4003, reason="Explicit transport configured")
+ return
+
+ # Guard: reject if the radio is already connected (direct or another loopback)
+ if radio_manager.is_connected:
+ await websocket.accept()
+ await websocket.close(code=4004, reason="Radio already connected")
+ return
+
+ await websocket.accept()
+
+ transport: LoopbackTransport | None = None
+ setup_task: asyncio.Task | None = None
+
+ try:
+ # Wait for init message
+ init_raw = await asyncio.wait_for(websocket.receive_text(), timeout=10.0)
+ init_msg = json.loads(init_raw)
+
+ if init_msg.get("type") != "init" or init_msg.get("mode") not in ("serial", "ble"):
+ await websocket.close(code=4001, reason="Invalid init message")
+ return
+
+ mode = init_msg["mode"]
+ logger.info("Loopback init: mode=%s", mode)
+
+ # Create transport and MeshCore instance
+ transport = LoopbackTransport(websocket, mode)
+
+ from meshcore import MeshCore
+
+ mc = MeshCore(transport, auto_reconnect=False, max_reconnect_attempts=0)
+ await mc.connect()
+
+ if not mc.is_connected:
+ logger.warning("Loopback MeshCore failed to connect")
+ await websocket.close(code=4005, reason="MeshCore handshake failed")
+ return
+
+ connection_info = f"Loopback ({mode})"
+ radio_manager.connect_loopback(mc, connection_info)
+
+ # Run post-connect setup in background so the receive loop can run
+ setup_task = asyncio.create_task(radio_manager.post_connect_setup())
+
+ # Main receive loop
+ while True:
+ message = await websocket.receive()
+
+ if message.get("type") == "websocket.disconnect":
+ break
+
+ if "bytes" in message and message["bytes"]:
+ transport.handle_rx(message["bytes"])
+ elif "text" in message and message["text"]:
+ try:
+ text_msg = json.loads(message["text"])
+ if text_msg.get("type") == "disconnect":
+ logger.info("Loopback client requested disconnect")
+ break
+ except (json.JSONDecodeError, TypeError):
+ pass
+
+ except WebSocketDisconnect:
+ logger.info("Loopback WebSocket disconnected")
+ except asyncio.TimeoutError:
+ logger.warning("Loopback init timeout")
+ if websocket.client_state == WebSocketState.CONNECTED:
+ await websocket.close(code=4002, reason="Init timeout")
+ except Exception as e:
+ logger.exception("Loopback error: %s", e)
+ finally:
+ if setup_task is not None:
+ setup_task.cancel()
+ try:
+ await setup_task
+ except (asyncio.CancelledError, Exception):
+ pass
+
+ await radio_manager.disconnect_loopback()
+
+ if websocket.client_state == WebSocketState.CONNECTED:
+ try:
+ await websocket.close()
+ except Exception:
+ pass
diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx
index 0258c1d..be0eb49 100644
--- a/frontend/src/App.tsx
+++ b/frontend/src/App.tsx
@@ -19,6 +19,7 @@ import {
useAppSettings,
useConversationRouter,
useContactsAndChannels,
+ useLoopback,
} from './hooks';
import * as messageCache from './messageCache';
import { StatusBar } from './components/StatusBar';
@@ -186,6 +187,8 @@ export function App() {
refreshUnreads,
} = useUnreadCounts(channels, contacts, activeConversation);
+ const loopback = useLoopback(handleHealthRefresh);
+
// Determine if active contact is a repeater (used for routing to dashboard)
const activeContactIsRepeater = useMemo(() => {
if (!activeConversation || activeConversation.type !== 'contact') return false;
@@ -481,7 +484,9 @@ export function App() {
- {SETTINGS_SECTION_ORDER.map((section) => (
+ {SETTINGS_SECTION_ORDER.filter(
+ (s) => s !== 'loopback' || (health?.loopback_eligible && !health?.radio_connected)
+ ).map((section) => (
)}
+ {shouldRenderSection('loopback') &&
+ health?.loopback_eligible &&
+ !health?.radio_connected &&
+ loopback && (
+
+ {renderSectionHeader('loopback')}
+ {isSectionVisible('loopback') && (
+
+ )}
+
+ )}
+
{shouldRenderSection('database') && (
{renderSectionHeader('database')}
diff --git a/frontend/src/components/settings/SettingsLoopbackSection.tsx b/frontend/src/components/settings/SettingsLoopbackSection.tsx
new file mode 100644
index 0000000..7fff6aa
--- /dev/null
+++ b/frontend/src/components/settings/SettingsLoopbackSection.tsx
@@ -0,0 +1,136 @@
+import { useState } from 'react';
+import { Button } from '../ui/button';
+import { Input } from '../ui/input';
+import { Label } from '../ui/label';
+import { Separator } from '../ui/separator';
+import type { UseLoopbackReturn } from '../../hooks/useLoopback';
+
+export function SettingsLoopbackSection({
+ loopback,
+ className,
+}: {
+ loopback: UseLoopbackReturn;
+ className?: string;
+}) {
+ const {
+ status,
+ error,
+ transportType,
+ serialAvailable,
+ bluetoothAvailable,
+ connectSerial,
+ connectBluetooth,
+ disconnect,
+ } = loopback;
+
+ const [baudRate, setBaudRate] = useState('115200');
+ const [selectedTransport, setSelectedTransport] = useState<'serial' | 'ble'>(
+ serialAvailable ? 'serial' : 'ble'
+ );
+
+ const isConnecting = status === 'connecting';
+ const isConnected = status === 'connected';
+ const busy = isConnecting || isConnected;
+
+ const handleConnect = async () => {
+ if (selectedTransport === 'serial') {
+ await connectSerial(parseInt(baudRate, 10) || 115200);
+ } else {
+ await connectBluetooth();
+ }
+ };
+
+ const neitherAvailable = !serialAvailable && !bluetoothAvailable;
+
+ return (
+
+
+ No direct radio connection detected. You can bridge a radio connected to{' '}
+ this browser's device via Web Serial or Web Bluetooth.
+
+
+ {neitherAvailable && (
+
+ Your browser does not support Web Serial or Web Bluetooth. Use Chrome or Edge on a secure
+ context (HTTPS or localhost).
+
+ )}
+
+ {!neitherAvailable && (
+ <>
+ {isConnected ? (
+ <>
+
+
+
+ Connected via {transportType === 'serial' ? 'Serial' : 'Bluetooth'}
+
+
+
+
+ >
+ ) : (
+ <>
+ {/* Transport selector */}
+
+
+
+
+
+
+ {!serialAvailable && (
+
+ Web Serial not available in this browser
+
+ )}
+ {!bluetoothAvailable && (
+
+ Web Bluetooth not available in this browser
+
+ )}
+
+
+ {/* Baud rate (serial only) */}
+ {selectedTransport === 'serial' && (
+
+
+ setBaudRate(e.target.value)}
+ disabled={busy}
+ />
+
+ )}
+
+
+
+
+ >
+ )}
+ >
+ )}
+
+ {error &&
{error}
}
+
+ );
+}
diff --git a/frontend/src/components/settings/settingsConstants.ts b/frontend/src/components/settings/settingsConstants.ts
index 6cbcaef..5ec1a1a 100644
--- a/frontend/src/components/settings/settingsConstants.ts
+++ b/frontend/src/components/settings/settingsConstants.ts
@@ -2,6 +2,7 @@ export type SettingsSection =
| 'radio'
| 'identity'
| 'connectivity'
+ | 'loopback'
| 'mqtt'
| 'database'
| 'bot'
@@ -12,6 +13,7 @@ export const SETTINGS_SECTION_ORDER: SettingsSection[] = [
'radio',
'identity',
'connectivity',
+ 'loopback',
'database',
'bot',
'mqtt',
@@ -23,6 +25,7 @@ export const SETTINGS_SECTION_LABELS: Record
= {
radio: '📻 Radio',
identity: '🪪 Identity',
connectivity: '📡 Connectivity',
+ loopback: '🔁 Loopback',
database: '🗄️ Database & Interface',
bot: '🤖 Bots',
mqtt: '📤 MQTT',
diff --git a/frontend/src/hooks/index.ts b/frontend/src/hooks/index.ts
index 62acf24..8d8ec55 100644
--- a/frontend/src/hooks/index.ts
+++ b/frontend/src/hooks/index.ts
@@ -5,3 +5,4 @@ export { useRepeaterDashboard } from './useRepeaterDashboard';
export { useAppSettings } from './useAppSettings';
export { useConversationRouter } from './useConversationRouter';
export { useContactsAndChannels } from './useContactsAndChannels';
+export { useLoopback } from './useLoopback';
diff --git a/frontend/src/hooks/useLoopback.ts b/frontend/src/hooks/useLoopback.ts
new file mode 100644
index 0000000..2d9b20d
--- /dev/null
+++ b/frontend/src/hooks/useLoopback.ts
@@ -0,0 +1,332 @@
+import { useState, useCallback, useRef, useEffect } from 'react';
+
+export type LoopbackStatus = 'idle' | 'connecting' | 'connected' | 'error';
+export type LoopbackTransportType = 'serial' | 'ble';
+
+// Nordic UART Service UUIDs (used by MeshCore BLE)
+const UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e';
+const UART_TX_CHAR_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'; // notifications from radio
+const UART_RX_CHAR_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'; // write to radio
+
+function getTransportWsUrl(): string {
+ const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
+ const isDev = window.location.port === '5173';
+ return isDev
+ ? `ws://localhost:8000/api/ws/transport`
+ : `${protocol}//${window.location.host}/api/ws/transport`;
+}
+
+export interface UseLoopbackReturn {
+ status: LoopbackStatus;
+ error: string | null;
+ transportType: LoopbackTransportType | null;
+ serialAvailable: boolean;
+ bluetoothAvailable: boolean;
+ connectSerial: (baudRate?: number) => Promise;
+ connectBluetooth: () => Promise;
+ disconnect: () => void;
+}
+
+export function useLoopback(onConnected?: () => void): UseLoopbackReturn {
+ const [status, setStatus] = useState('idle');
+ const [error, setError] = useState(null);
+ const [transportType, setTransportType] = useState(null);
+
+ const wsRef = useRef(null);
+ const serialPortRef = useRef(null);
+ const serialReaderRef = useRef | null>(null);
+ const bleDeviceRef = useRef(null);
+ const cleaningUpRef = useRef(false);
+
+ const serialAvailable = typeof navigator !== 'undefined' && 'serial' in navigator;
+ const bluetoothAvailable = typeof navigator !== 'undefined' && 'bluetooth' in navigator;
+
+ const cleanup = useCallback(() => {
+ if (cleaningUpRef.current) return;
+ cleaningUpRef.current = true;
+
+ // Close WebSocket
+ const ws = wsRef.current;
+ if (ws && ws.readyState <= WebSocket.OPEN) {
+ try {
+ ws.close();
+ } catch {
+ // ignore
+ }
+ }
+ wsRef.current = null;
+
+ // Close serial reader and port
+ const reader = serialReaderRef.current;
+ if (reader) {
+ try {
+ reader.cancel();
+ } catch {
+ // ignore
+ }
+ }
+ serialReaderRef.current = null;
+
+ const port = serialPortRef.current;
+ if (port) {
+ try {
+ port.close();
+ } catch {
+ // ignore
+ }
+ }
+ serialPortRef.current = null;
+
+ // Disconnect BLE
+ const bleDevice = bleDeviceRef.current;
+ if (bleDevice?.gatt?.connected) {
+ try {
+ bleDevice.gatt.disconnect();
+ } catch {
+ // ignore
+ }
+ }
+ bleDeviceRef.current = null;
+
+ setTransportType(null);
+ setStatus('idle');
+ cleaningUpRef.current = false;
+ }, []);
+
+ // Cleanup on unmount
+ useEffect(() => cleanup, [cleanup]);
+
+ const connectSerial = useCallback(
+ async (baudRate = 115200) => {
+ setError(null);
+ setStatus('connecting');
+ setTransportType('serial');
+
+ try {
+ // Request serial port from user
+ const port = await navigator.serial!.requestPort();
+ await port.open({ baudRate, flowControl: 'none' });
+
+ // Match meshcore serial behaviour
+ try {
+ await port.setSignals({ requestToSend: false });
+ } catch {
+ // Not all adapters support setSignals
+ }
+
+ serialPortRef.current = port;
+
+ // Open transport WebSocket
+ const ws = new WebSocket(getTransportWsUrl());
+ ws.binaryType = 'arraybuffer';
+ wsRef.current = ws;
+
+ await new Promise((resolve, reject) => {
+ ws.onopen = () => resolve();
+ ws.onerror = () => reject(new Error('Transport WebSocket failed to connect'));
+ // Timeout
+ const timeout = setTimeout(() => reject(new Error('Transport WebSocket timeout')), 10000);
+ ws.addEventListener('open', () => clearTimeout(timeout), { once: true });
+ });
+
+ // Send init
+ ws.send(JSON.stringify({ type: 'init', mode: 'serial' }));
+
+ // Start serial → WS read loop
+ const reader = port.readable!.getReader();
+ serialReaderRef.current = reader;
+
+ const readLoop = async () => {
+ try {
+ while (true) {
+ const { value, done } = await reader.read();
+ if (done) break;
+ if (value && ws.readyState === WebSocket.OPEN) {
+ ws.send(value);
+ }
+ }
+ } catch (err) {
+ // Reader cancelled or port closed — expected during disconnect
+ if (!cleaningUpRef.current) {
+ console.debug('Serial read loop ended:', err);
+ }
+ }
+ };
+ readLoop();
+
+ // WS → serial write
+ ws.onmessage = async (event) => {
+ if (event.data instanceof ArrayBuffer) {
+ const writer = port.writable!.getWriter();
+ try {
+ await writer.write(new Uint8Array(event.data));
+ } finally {
+ writer.releaseLock();
+ }
+ } else if (typeof event.data === 'string') {
+ try {
+ const msg = JSON.parse(event.data);
+ if (msg.type === 'disconnect') {
+ cleanup();
+ }
+ } catch {
+ // ignore non-JSON text
+ }
+ }
+ };
+
+ ws.onclose = () => {
+ if (!cleaningUpRef.current) {
+ cleanup();
+ }
+ };
+
+ ws.onerror = () => {
+ if (!cleaningUpRef.current) {
+ setError('Transport WebSocket error');
+ cleanup();
+ setStatus('error');
+ }
+ };
+
+ setStatus('connected');
+ onConnected?.();
+ } catch (err) {
+ const message = err instanceof Error ? err.message : 'Serial connection failed';
+ // Don't show error for user-cancelled port picker
+ if (err instanceof DOMException && err.name === 'NotFoundError') {
+ setStatus('idle');
+ setTransportType(null);
+ return;
+ }
+ setError(message);
+ cleanup();
+ setStatus('error');
+ }
+ },
+ [cleanup, onConnected]
+ );
+
+ const connectBluetooth = useCallback(async () => {
+ setError(null);
+ setStatus('connecting');
+ setTransportType('ble');
+
+ try {
+ const device = await navigator.bluetooth!.requestDevice({
+ filters: [{ namePrefix: 'MeshCore' }],
+ optionalServices: [UART_SERVICE_UUID],
+ });
+
+ if (!device.gatt) {
+ throw new Error('Bluetooth GATT not available');
+ }
+
+ bleDeviceRef.current = device;
+
+ const server = await device.gatt.connect();
+ const service = await server.getPrimaryService(UART_SERVICE_UUID);
+ const txChar = await service.getCharacteristic(UART_TX_CHAR_UUID);
+ const rxChar = await service.getCharacteristic(UART_RX_CHAR_UUID);
+
+ // Open transport WebSocket
+ const ws = new WebSocket(getTransportWsUrl());
+ ws.binaryType = 'arraybuffer';
+ wsRef.current = ws;
+
+ await new Promise((resolve, reject) => {
+ ws.onopen = () => resolve();
+ ws.onerror = () => reject(new Error('Transport WebSocket failed to connect'));
+ const timeout = setTimeout(() => reject(new Error('Transport WebSocket timeout')), 10000);
+ ws.addEventListener('open', () => clearTimeout(timeout), { once: true });
+ });
+
+ // Send init
+ ws.send(JSON.stringify({ type: 'init', mode: 'ble' }));
+
+ // BLE RX notifications → WS
+ await txChar.startNotifications();
+ txChar.addEventListener('characteristicvaluechanged', (event) => {
+ const value = (event.target as BluetoothRemoteGATTCharacteristic).value;
+ if (value && ws.readyState === WebSocket.OPEN) {
+ ws.send(value.buffer);
+ }
+ });
+
+ // WS → BLE TX
+ ws.onmessage = async (event) => {
+ if (event.data instanceof ArrayBuffer) {
+ await rxChar.writeValueWithResponse(new Uint8Array(event.data));
+ } else if (typeof event.data === 'string') {
+ try {
+ const msg = JSON.parse(event.data);
+ if (msg.type === 'disconnect') {
+ cleanup();
+ }
+ } catch {
+ // ignore non-JSON text
+ }
+ }
+ };
+
+ ws.onclose = () => {
+ if (!cleaningUpRef.current) {
+ cleanup();
+ }
+ };
+
+ ws.onerror = () => {
+ if (!cleaningUpRef.current) {
+ setError('Transport WebSocket error');
+ cleanup();
+ setStatus('error');
+ }
+ };
+
+ // Handle BLE disconnect
+ device.addEventListener('gattserverdisconnected', () => {
+ if (!cleaningUpRef.current) {
+ cleanup();
+ }
+ });
+
+ setStatus('connected');
+ onConnected?.();
+ } catch (err) {
+ const message = err instanceof Error ? err.message : 'Bluetooth connection failed';
+ // Don't show error for user-cancelled device picker
+ if (err instanceof DOMException && err.name === 'NotFoundError') {
+ setStatus('idle');
+ setTransportType(null);
+ return;
+ }
+ setError(message);
+ cleanup();
+ setStatus('error');
+ }
+ }, [cleanup, onConnected]);
+
+ const disconnect = useCallback(() => {
+ // Send graceful disconnect before cleanup
+ const ws = wsRef.current;
+ if (ws && ws.readyState === WebSocket.OPEN) {
+ try {
+ ws.send(JSON.stringify({ type: 'disconnect' }));
+ } catch {
+ // ignore
+ }
+ }
+ cleanup();
+ }, [cleanup]);
+
+ return {
+ status,
+ error,
+ transportType,
+ serialAvailable,
+ bluetoothAvailable,
+ connectSerial,
+ connectBluetooth,
+ disconnect,
+ };
+}
diff --git a/frontend/src/test/loopback.test.tsx b/frontend/src/test/loopback.test.tsx
new file mode 100644
index 0000000..c32baf3
--- /dev/null
+++ b/frontend/src/test/loopback.test.tsx
@@ -0,0 +1,139 @@
+import { render, screen, fireEvent } from '@testing-library/react';
+import { describe, expect, it, vi, beforeEach } from 'vitest';
+
+import { SettingsLoopbackSection } from '../components/settings/SettingsLoopbackSection';
+import type { UseLoopbackReturn } from '../hooks/useLoopback';
+
+function makeLoopback(overrides?: Partial): UseLoopbackReturn {
+ return {
+ status: 'idle',
+ error: null,
+ transportType: null,
+ serialAvailable: true,
+ bluetoothAvailable: true,
+ connectSerial: vi.fn(async () => {}),
+ connectBluetooth: vi.fn(async () => {}),
+ disconnect: vi.fn(),
+ ...overrides,
+ };
+}
+
+describe('SettingsLoopbackSection', () => {
+ beforeEach(() => {
+ vi.restoreAllMocks();
+ });
+
+ it('renders transport selector and connect button when idle', () => {
+ render();
+
+ expect(screen.getByText('Serial')).toBeInTheDocument();
+ expect(screen.getByText('Bluetooth')).toBeInTheDocument();
+ expect(screen.getByRole('button', { name: 'Connect via Loopback' })).toBeInTheDocument();
+ });
+
+ it('shows baud rate input when serial is selected', () => {
+ render();
+
+ expect(screen.getByLabelText('Baud Rate')).toBeInTheDocument();
+ });
+
+ it('hides baud rate input when BLE is selected', () => {
+ render();
+
+ // Click BLE button
+ fireEvent.click(screen.getByText('Bluetooth'));
+
+ expect(screen.queryByLabelText('Baud Rate')).not.toBeInTheDocument();
+ });
+
+ it('calls connectSerial with baud rate on connect', () => {
+ const connectSerial = vi.fn(async () => {});
+ render();
+
+ fireEvent.click(screen.getByRole('button', { name: 'Connect via Loopback' }));
+
+ expect(connectSerial).toHaveBeenCalledWith(115200);
+ });
+
+ it('calls connectBluetooth when BLE selected and connect clicked', () => {
+ const connectBluetooth = vi.fn(async () => {});
+ render();
+
+ fireEvent.click(screen.getByText('Bluetooth'));
+ fireEvent.click(screen.getByRole('button', { name: 'Connect via Loopback' }));
+
+ expect(connectBluetooth).toHaveBeenCalled();
+ });
+
+ it('shows connected state with disconnect button', () => {
+ render(
+
+ );
+
+ expect(screen.getByText(/Connected via Serial/i)).toBeInTheDocument();
+ expect(screen.getByRole('button', { name: 'Disconnect Loopback' })).toBeInTheDocument();
+ expect(screen.queryByRole('button', { name: 'Connect via Loopback' })).not.toBeInTheDocument();
+ });
+
+ it('calls disconnect on disconnect button click', () => {
+ const disconnect = vi.fn();
+ render(
+
+ );
+
+ fireEvent.click(screen.getByRole('button', { name: 'Disconnect Loopback' }));
+
+ expect(disconnect).toHaveBeenCalled();
+ });
+
+ it('shows connecting state', () => {
+ render();
+
+ expect(screen.getByRole('button', { name: 'Connecting...' })).toBeDisabled();
+ });
+
+ it('shows error message', () => {
+ render(
+
+ );
+
+ expect(screen.getByText('Port failed')).toBeInTheDocument();
+ });
+
+ it('shows warning when neither serial nor bluetooth available', () => {
+ render(
+
+ );
+
+ expect(screen.getByText(/does not support Web Serial or Web Bluetooth/)).toBeInTheDocument();
+ // Connect button should not appear
+ expect(screen.queryByRole('button', { name: 'Connect via Loopback' })).not.toBeInTheDocument();
+ });
+
+ it('disables serial button when serial not available', () => {
+ render();
+
+ expect(screen.getByText('Serial')).toBeDisabled();
+ expect(screen.getByText(/Web Serial not available/)).toBeInTheDocument();
+ });
+
+ it('disables bluetooth button when bluetooth not available', () => {
+ render();
+
+ expect(screen.getByText('Bluetooth')).toBeDisabled();
+ expect(screen.getByText(/Web Bluetooth not available/)).toBeInTheDocument();
+ });
+
+ it('defaults to BLE when serial is not available', () => {
+ render();
+
+ // BLE should be selected, so baud rate should NOT be visible
+ expect(screen.queryByLabelText('Baud Rate')).not.toBeInTheDocument();
+ });
+});
diff --git a/frontend/src/test/settingsModal.test.tsx b/frontend/src/test/settingsModal.test.tsx
index 9c16ebd..8d26b85 100644
--- a/frontend/src/test/settingsModal.test.tsx
+++ b/frontend/src/test/settingsModal.test.tsx
@@ -39,6 +39,7 @@ const baseHealth: HealthStatus = {
database_size_mb: 1.2,
oldest_undecrypted_timestamp: null,
mqtt_status: null,
+ loopback_eligible: false,
};
const baseSettings: AppSettings = {
diff --git a/frontend/src/types.ts b/frontend/src/types.ts
index 7609161..acc70a8 100644
--- a/frontend/src/types.ts
+++ b/frontend/src/types.ts
@@ -30,6 +30,7 @@ export interface HealthStatus {
database_size_mb: number;
oldest_undecrypted_timestamp: number | null;
mqtt_status: string | null;
+ loopback_eligible: boolean;
}
export interface MaintenanceResult {
diff --git a/frontend/src/types/web-serial-bluetooth.d.ts b/frontend/src/types/web-serial-bluetooth.d.ts
new file mode 100644
index 0000000..0a7fb10
--- /dev/null
+++ b/frontend/src/types/web-serial-bluetooth.d.ts
@@ -0,0 +1,81 @@
+// Type declarations for Web Serial API and Web Bluetooth API
+// These APIs are only available in Chrome/Edge and require secure context.
+
+// --- Web Serial API ---
+
+interface SerialPortRequestOptions {
+ filters?: SerialPortFilter[];
+}
+
+interface SerialPortFilter {
+ usbVendorId?: number;
+ usbProductId?: number;
+}
+
+interface SerialOptions {
+ baudRate: number;
+ dataBits?: number;
+ stopBits?: number;
+ parity?: 'none' | 'even' | 'odd';
+ bufferSize?: number;
+ flowControl?: 'none' | 'hardware';
+}
+
+interface SerialPort {
+ readable: ReadableStream | null;
+ writable: WritableStream | null;
+ open(options: SerialOptions): Promise;
+ close(): Promise;
+ setSignals(signals: { requestToSend?: boolean; dataTerminalReady?: boolean }): Promise;
+}
+
+interface Serial {
+ requestPort(options?: SerialPortRequestOptions): Promise;
+}
+
+// --- Web Bluetooth API ---
+
+interface BluetoothRequestDeviceFilter {
+ namePrefix?: string;
+ name?: string;
+ services?: BluetoothServiceUUID[];
+}
+
+type BluetoothServiceUUID = string | number;
+
+interface RequestDeviceOptions {
+ filters?: BluetoothRequestDeviceFilter[];
+ optionalServices?: BluetoothServiceUUID[];
+ acceptAllDevices?: boolean;
+}
+
+interface BluetoothRemoteGATTCharacteristic extends EventTarget {
+ value: DataView | null;
+ startNotifications(): Promise;
+ writeValueWithResponse(value: BufferSource): Promise;
+}
+
+interface BluetoothRemoteGATTService {
+ getCharacteristic(uuid: string): Promise;
+}
+
+interface BluetoothRemoteGATTServer {
+ connected: boolean;
+ connect(): Promise;
+ disconnect(): void;
+ getPrimaryService(uuid: string): Promise;
+}
+
+interface BluetoothDevice extends EventTarget {
+ gatt?: BluetoothRemoteGATTServer;
+}
+
+interface Bluetooth {
+ requestDevice(options: RequestDeviceOptions): Promise;
+}
+
+// Extend Navigator interface
+interface Navigator {
+ serial?: Serial;
+ bluetooth?: Bluetooth;
+}
diff --git a/tests/test_loopback.py b/tests/test_loopback.py
new file mode 100644
index 0000000..de894b2
--- /dev/null
+++ b/tests/test_loopback.py
@@ -0,0 +1,426 @@
+"""Tests for loopback transport and WebSocket endpoint."""
+
+import asyncio
+import json
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from starlette.websockets import WebSocketState
+
+# ---------------------------------------------------------------------------
+# LoopbackTransport unit tests
+# ---------------------------------------------------------------------------
+
+
+class TestLoopbackTransportFraming:
+ """Serial framing round-trip tests (0x3c + 2-byte LE size)."""
+
+ def _make_transport(self, mode="serial"):
+ from app.loopback import LoopbackTransport
+
+ ws = MagicMock()
+ ws.client_state = WebSocketState.CONNECTED
+ ws.send_bytes = AsyncMock()
+ ws.send_json = AsyncMock()
+ return LoopbackTransport(ws, mode), ws
+
+ @pytest.mark.asyncio
+ async def test_send_serial_adds_framing(self):
+ """send() in serial mode prepends 0x3c + 2-byte LE size."""
+ transport, ws = self._make_transport("serial")
+ payload = b"\x01\x02\x03\x04\x05"
+
+ await transport.send(payload)
+
+ expected = b"\x3c\x05\x00\x01\x02\x03\x04\x05"
+ ws.send_bytes.assert_awaited_once_with(expected)
+
+ @pytest.mark.asyncio
+ async def test_send_ble_raw(self):
+ """send() in BLE mode sends raw bytes (no framing)."""
+ transport, ws = self._make_transport("ble")
+ payload = b"\x01\x02\x03"
+
+ await transport.send(payload)
+
+ ws.send_bytes.assert_awaited_once_with(payload)
+
+ def test_handle_rx_serial_strips_framing(self):
+ """handle_rx in serial mode strips 0x3c header and delivers payload."""
+ transport, _ = self._make_transport("serial")
+ reader = MagicMock()
+ reader.handle_rx = AsyncMock()
+ transport.set_reader(reader)
+
+ payload = b"\xaa\xbb\xcc"
+ # Build framed data: 0x3c + 2-byte LE size + payload
+ framed = b"\x3c" + len(payload).to_bytes(2, "little") + payload
+
+ with patch("app.loopback.asyncio.create_task") as mock_task:
+ transport.handle_rx(framed)
+
+ # reader.handle_rx should be called with the payload only
+ mock_task.assert_called_once()
+ assert reader.handle_rx.call_count == 1
+ assert reader.handle_rx.call_args[0][0] == payload
+
+ def test_handle_rx_serial_incremental(self):
+ """handle_rx in serial mode handles data arriving byte by byte."""
+ transport, _ = self._make_transport("serial")
+ reader = MagicMock()
+ reader.handle_rx = AsyncMock()
+ transport.set_reader(reader)
+
+ payload = b"\x01\x02"
+ framed = b"\x3c" + len(payload).to_bytes(2, "little") + payload
+
+ with patch("app.loopback.asyncio.create_task") as mock_task:
+ # Feed one byte at a time
+ for byte in framed:
+ transport.handle_rx(bytes([byte]))
+
+ mock_task.assert_called_once()
+ assert reader.handle_rx.call_args[0][0] == payload
+
+ def test_handle_rx_serial_multiple_frames(self):
+ """handle_rx handles two frames concatenated in one chunk."""
+ transport, _ = self._make_transport("serial")
+ reader = MagicMock()
+ reader.handle_rx = AsyncMock()
+ transport.set_reader(reader)
+
+ p1 = b"\x01\x02"
+ p2 = b"\x03\x04\x05"
+ framed = (
+ b"\x3c"
+ + len(p1).to_bytes(2, "little")
+ + p1
+ + b"\x3c"
+ + len(p2).to_bytes(2, "little")
+ + p2
+ )
+
+ with patch("app.loopback.asyncio.create_task") as mock_task:
+ transport.handle_rx(framed)
+
+ assert mock_task.call_count == 2
+ assert reader.handle_rx.call_args_list[0][0][0] == p1
+ assert reader.handle_rx.call_args_list[1][0][0] == p2
+
+ def test_handle_rx_ble_passthrough(self):
+ """handle_rx in BLE mode passes raw bytes to reader directly."""
+ transport, _ = self._make_transport("ble")
+ reader = MagicMock()
+ reader.handle_rx = AsyncMock()
+ transport.set_reader(reader)
+
+ data = b"\xde\xad\xbe\xef"
+
+ with patch("app.loopback.asyncio.create_task") as mock_task:
+ transport.handle_rx(data)
+
+ mock_task.assert_called_once()
+ assert reader.handle_rx.call_args[0][0] == data
+
+ @pytest.mark.asyncio
+ async def test_connect_returns_info_string(self):
+ """connect() returns a descriptive string."""
+ transport, _ = self._make_transport("serial")
+ result = await transport.connect()
+ assert "Loopback" in result
+ assert "serial" in result
+
+ @pytest.mark.asyncio
+ async def test_disconnect_sends_json(self):
+ """disconnect() sends a disconnect JSON message."""
+ transport, ws = self._make_transport("serial")
+ await transport.disconnect()
+ ws.send_json.assert_awaited_once_with({"type": "disconnect"})
+
+ @pytest.mark.asyncio
+ async def test_disconnect_handles_closed_ws(self):
+ """disconnect() does not raise if WS is already closed."""
+ transport, ws = self._make_transport("serial")
+ ws.client_state = WebSocketState.DISCONNECTED
+ # Should not raise
+ await transport.disconnect()
+ ws.send_json.assert_not_awaited()
+
+ @pytest.mark.asyncio
+ async def test_send_noop_when_ws_closed(self):
+ """send() does nothing when WebSocket is not connected."""
+ transport, ws = self._make_transport("serial")
+ ws.client_state = WebSocketState.DISCONNECTED
+ await transport.send(b"\x01\x02")
+ ws.send_bytes.assert_not_awaited()
+
+ def test_set_reader_and_callback(self):
+ """set_reader and set_disconnect_callback store references."""
+ transport, _ = self._make_transport("serial")
+ reader = MagicMock()
+ callback = MagicMock()
+ transport.set_reader(reader)
+ transport.set_disconnect_callback(callback)
+ assert transport._reader is reader
+ assert transport._disconnect_callback is callback
+
+ def test_reset_framing(self):
+ """reset_framing clears the state machine."""
+ transport, _ = self._make_transport("serial")
+ transport._frame_started = True
+ transport._header = b"\x3c\x05"
+ transport._inframe = b"\x01\x02"
+ transport._frame_size = 5
+
+ transport.reset_framing()
+
+ assert transport._frame_started is False
+ assert transport._header == b""
+ assert transport._inframe == b""
+ assert transport._frame_size == 0
+
+
+# ---------------------------------------------------------------------------
+# RadioManager loopback methods
+# ---------------------------------------------------------------------------
+
+
+class TestRadioManagerLoopback:
+ """Tests for connect_loopback / disconnect_loopback state transitions."""
+
+ def test_connect_loopback_sets_state(self):
+ from app.radio import RadioManager
+
+ rm = RadioManager()
+ mc = MagicMock()
+ rm.connect_loopback(mc, "Loopback (serial)")
+
+ assert rm.meshcore is mc
+ assert rm.connection_info == "Loopback (serial)"
+ assert rm.loopback_active is True
+ assert rm._last_connected is True
+ assert rm._setup_complete is False
+
+ @pytest.mark.asyncio
+ async def test_disconnect_loopback_clears_state(self):
+ from app.radio import RadioManager
+
+ rm = RadioManager()
+ mc = MagicMock()
+ mc.disconnect = AsyncMock()
+ rm.connect_loopback(mc, "Loopback (serial)")
+
+ with patch("app.websocket.broadcast_health"):
+ await rm.disconnect_loopback()
+
+ assert rm.meshcore is None
+ assert rm.connection_info is None
+ assert rm.loopback_active is False
+ mc.disconnect.assert_awaited_once()
+
+ @pytest.mark.asyncio
+ async def test_disconnect_loopback_handles_mc_disconnect_error(self):
+ """disconnect_loopback doesn't raise if mc.disconnect() fails."""
+ from app.radio import RadioManager
+
+ rm = RadioManager()
+ mc = MagicMock()
+ mc.disconnect = AsyncMock(side_effect=OSError("transport closed"))
+ rm.connect_loopback(mc, "Loopback (ble)")
+
+ with patch("app.websocket.broadcast_health"):
+ # Should not raise
+ await rm.disconnect_loopback()
+
+ assert rm.loopback_active is False
+
+ @pytest.mark.asyncio
+ async def test_disconnect_loopback_broadcasts_health_false(self):
+ from app.radio import RadioManager
+
+ rm = RadioManager()
+ mc = MagicMock()
+ mc.disconnect = AsyncMock()
+ rm.connect_loopback(mc, "Loopback (serial)")
+
+ with patch("app.websocket.broadcast_health") as mock_bh:
+ await rm.disconnect_loopback()
+
+ mock_bh.assert_called_once_with(False, None)
+
+ @pytest.mark.asyncio
+ async def test_monitor_skips_reconnect_during_loopback(self):
+ """Connection monitor skips auto-detect when _loopback_active is True."""
+ from app.radio import RadioManager
+
+ rm = RadioManager()
+ mc = MagicMock()
+ mc.is_connected = True
+ rm.connect_loopback(mc, "Loopback (serial)")
+
+ rm.reconnect = AsyncMock()
+
+ sleep_count = 0
+
+ async def _sleep(_seconds: float):
+ nonlocal sleep_count
+ sleep_count += 1
+ if sleep_count >= 3:
+ raise asyncio.CancelledError()
+
+ with patch("app.radio.asyncio.sleep", side_effect=_sleep):
+ await rm.start_connection_monitor()
+ try:
+ await rm._reconnect_task
+ finally:
+ await rm.stop_connection_monitor()
+
+ # reconnect should never be called while loopback is active
+ rm.reconnect.assert_not_awaited()
+
+
+# ---------------------------------------------------------------------------
+# WebSocket endpoint tests
+# ---------------------------------------------------------------------------
+
+
+class TestLoopbackEndpointGuards:
+ """Tests for the /ws/transport WebSocket endpoint guard conditions."""
+
+ def test_rejects_when_explicit_transport_configured(self):
+ """Endpoint closes when explicit transport env is set."""
+ from fastapi.testclient import TestClient
+
+ from app.main import app
+
+ with (
+ patch("app.routers.loopback.settings") as mock_settings,
+ patch("app.routers.loopback.radio_manager"),
+ ):
+ mock_settings.loopback_eligible = False
+
+ client = TestClient(app)
+ # Endpoint accepts then immediately closes — verify by catching the close
+ with client.websocket_connect("/api/ws/transport") as ws:
+ closed = False
+ try:
+ ws.receive_text()
+ except Exception: # noqa: BLE001
+ closed = True
+ assert closed
+
+ def test_rejects_when_radio_already_connected(self):
+ """Endpoint closes when radio is already connected."""
+ from fastapi.testclient import TestClient
+
+ from app.main import app
+
+ with (
+ patch("app.routers.loopback.settings") as mock_settings,
+ patch("app.routers.loopback.radio_manager") as mock_rm,
+ ):
+ mock_settings.loopback_eligible = True
+ mock_rm.is_connected = True
+
+ client = TestClient(app)
+ with client.websocket_connect("/api/ws/transport") as ws:
+ closed = False
+ try:
+ ws.receive_text()
+ except Exception: # noqa: BLE001
+ closed = True
+ assert closed
+
+
+class TestLoopbackEndpointInit:
+ """Tests for the init handshake and basic operation."""
+
+ def test_rejects_invalid_init_message(self):
+ """Endpoint closes on invalid init JSON."""
+ from fastapi.testclient import TestClient
+
+ from app.main import app
+
+ with (
+ patch("app.routers.loopback.settings") as mock_settings,
+ patch("app.routers.loopback.radio_manager") as mock_rm,
+ ):
+ mock_settings.loopback_eligible = True
+ mock_rm.is_connected = False
+ mock_rm.disconnect_loopback = AsyncMock()
+
+ client = TestClient(app)
+ with client.websocket_connect("/api/ws/transport") as ws:
+ ws.send_text(json.dumps({"type": "init", "mode": "invalid"}))
+ closed = False
+ try:
+ ws.receive_text()
+ except Exception: # noqa: BLE001
+ closed = True
+ assert closed
+
+ def test_accepts_valid_serial_init(self):
+ """Endpoint accepts valid serial init and proceeds to MeshCore creation."""
+ mock_mc = MagicMock()
+ mock_mc.is_connected = True
+ mock_mc.connect = AsyncMock()
+
+ with (
+ patch("app.routers.loopback.settings") as mock_settings,
+ patch("app.routers.loopback.radio_manager") as mock_rm,
+ patch("meshcore.MeshCore", return_value=mock_mc) as mock_mc_cls,
+ ):
+ mock_settings.loopback_eligible = True
+ mock_rm.is_connected = False
+ mock_rm.connect_loopback = MagicMock()
+ mock_rm.post_connect_setup = AsyncMock()
+ mock_rm.disconnect_loopback = AsyncMock()
+
+ from fastapi.testclient import TestClient
+
+ from app.main import app
+
+ client = TestClient(app)
+ with client.websocket_connect("/api/ws/transport") as ws:
+ ws.send_text(json.dumps({"type": "init", "mode": "serial"}))
+ # Send disconnect to close cleanly
+ ws.send_text(json.dumps({"type": "disconnect"}))
+
+ # Verify MeshCore was created and connect_loopback called
+ mock_mc_cls.assert_called_once()
+ mock_mc.connect.assert_awaited_once()
+ mock_rm.connect_loopback.assert_called_once()
+ mock_rm.disconnect_loopback.assert_awaited_once()
+
+
+# ---------------------------------------------------------------------------
+# Config loopback_eligible property
+# ---------------------------------------------------------------------------
+
+
+class TestConfigLoopbackEligible:
+ """Tests for the loopback_eligible property."""
+
+ def test_eligible_when_no_transport_set(self):
+ from app.config import Settings
+
+ s = Settings(serial_port="", tcp_host="", ble_address="")
+ assert s.loopback_eligible is True
+
+ def test_not_eligible_with_serial_port(self):
+ from app.config import Settings
+
+ s = Settings(serial_port="/dev/ttyUSB0")
+ assert s.loopback_eligible is False
+
+ def test_not_eligible_with_tcp_host(self):
+ from app.config import Settings
+
+ s = Settings(tcp_host="192.168.1.1")
+ assert s.loopback_eligible is False
+
+ def test_not_eligible_with_ble(self):
+ from app.config import Settings
+
+ s = Settings(ble_address="AA:BB:CC:DD:EE:FF", ble_pin="1234")
+ assert s.loopback_eligible is False