1 Commits

Author SHA1 Message Date
Jack Kingsman
d00bc68a83 Initial loopbacl 2026-03-02 07:16:58 -08:00
17 changed files with 1429 additions and 1 deletions

View File

@@ -35,6 +35,11 @@ class Settings(BaseSettings):
raise ValueError("MESHCORE_BLE_PIN is required when MESHCORE_BLE_ADDRESS is set.")
return self
@property
def loopback_eligible(self) -> bool:
"""True when no explicit transport env var is set."""
return not self.serial_port and not self.tcp_host and not self.ble_address
@property
def connection_type(self) -> Literal["serial", "tcp", "ble"]:
if self.tcp_host:

125
app/loopback.py Normal file
View File

@@ -0,0 +1,125 @@
"""Loopback transport: bridges a browser-side serial/BLE connection over WebSocket."""
import asyncio
import logging
from typing import Any, Literal
from starlette.websockets import WebSocket, WebSocketState
logger = logging.getLogger(__name__)
class LoopbackTransport:
"""ConnectionProtocol implementation that tunnels bytes over a WebSocket.
For serial mode, applies the same 0x3c + 2-byte LE size framing that
meshcore's SerialConnection uses. For BLE mode, passes raw bytes through
(matching BLEConnection behaviour).
"""
def __init__(self, websocket: WebSocket, mode: Literal["serial", "ble"]) -> None:
self._ws = websocket
self._mode = mode
self._reader: Any = None
self._disconnect_callback: Any = None
# Serial framing state (mirrors meshcore serial_cx.py handle_rx)
self._header = b""
self._inframe = b""
self._frame_started = False
self._frame_size = 0
# -- ConnectionProtocol methods ------------------------------------------
async def connect(self) -> str:
"""No-op — the WebSocket is already established."""
info = f"Loopback ({self._mode})"
logger.info("Loopback transport connected: %s", info)
return info
async def disconnect(self) -> None:
"""Ask the browser to release the hardware and close the WS."""
try:
if self._ws.client_state == WebSocketState.CONNECTED:
await self._ws.send_json({"type": "disconnect"})
except Exception:
pass # WS may already be closed
async def send(self, data: Any) -> None:
"""Send data to the browser (which writes it to the physical radio).
Serial mode: prepend 0x3c + 2-byte LE size header.
BLE mode: send raw bytes.
"""
try:
if self._ws.client_state != WebSocketState.CONNECTED:
return
if self._mode == "serial":
size = len(data)
pkt = b"\x3c" + size.to_bytes(2, byteorder="little") + bytes(data)
await self._ws.send_bytes(pkt)
else:
await self._ws.send_bytes(bytes(data))
except Exception as e:
logger.debug("Loopback send error: %s", e)
def set_reader(self, reader: Any) -> None:
self._reader = reader
def set_disconnect_callback(self, callback: Any) -> None:
self._disconnect_callback = callback
# -- Incoming data from browser ------------------------------------------
def handle_rx(self, data: bytes) -> None:
"""Process bytes received from the browser.
Serial mode: accumulate bytes, strip framing, deliver payload.
BLE mode: deliver raw bytes directly.
"""
if self._mode == "serial":
self._handle_rx_serial(data)
else:
self._handle_rx_ble(data)
def _handle_rx_ble(self, data: bytes) -> None:
if self._reader is not None:
asyncio.create_task(self._reader.handle_rx(data))
def _handle_rx_serial(self, data: bytes) -> None:
"""Mirror meshcore's SerialConnection.handle_rx state machine."""
raw = bytes(data)
headerlen = len(self._header)
if not self._frame_started:
if len(raw) >= 3 - headerlen:
self._header = self._header + raw[: 3 - headerlen]
self._frame_started = True
self._frame_size = int.from_bytes(self._header[1:], byteorder="little")
remainder = raw[3 - headerlen :]
# Reset header for next frame
self._header = b""
if remainder:
self._handle_rx_serial(remainder)
else:
self._header = self._header + raw
else:
framelen = len(self._inframe)
if framelen + len(raw) < self._frame_size:
self._inframe = self._inframe + raw
else:
self._inframe = self._inframe + raw[: self._frame_size - framelen]
if self._reader is not None:
asyncio.create_task(self._reader.handle_rx(self._inframe))
remainder = raw[self._frame_size - framelen :]
self._frame_started = False
self._inframe = b""
if remainder:
self._handle_rx_serial(remainder)
def reset_framing(self) -> None:
"""Reset the serial framing state machine."""
self._header = b""
self._inframe = b""
self._frame_started = False
self._frame_size = 0

View File

@@ -19,6 +19,7 @@ from app.routers import (
channels,
contacts,
health,
loopback,
messages,
packets,
radio,
@@ -126,6 +127,7 @@ app.include_router(read_state.router, prefix="/api")
app.include_router(settings.router, prefix="/api")
app.include_router(statistics.router, prefix="/api")
app.include_router(ws.router, prefix="/api")
app.include_router(loopback.router, prefix="/api")
# Serve frontend static files in production
FRONTEND_DIR = Path(__file__).parent.parent / "frontend" / "dist"

View File

@@ -128,6 +128,7 @@ class RadioManager:
self._setup_lock: asyncio.Lock | None = None
self._setup_in_progress: bool = False
self._setup_complete: bool = False
self._loopback_active: bool = False
async def _acquire_operation_lock(
self,
@@ -317,6 +318,36 @@ class RadioManager:
def is_setup_complete(self) -> bool:
return self._setup_complete
@property
def loopback_active(self) -> bool:
return self._loopback_active
def connect_loopback(self, mc: MeshCore, connection_info: str) -> None:
"""Adopt a MeshCore instance created by the loopback WebSocket endpoint."""
self._meshcore = mc
self._connection_info = connection_info
self._loopback_active = True
self._last_connected = True
self._setup_complete = False
async def disconnect_loopback(self) -> None:
"""Tear down a loopback session and resume normal auto-detect."""
from app.websocket import broadcast_health
mc = self._meshcore
self._meshcore = None
self._loopback_active = False
self._connection_info = None
self._setup_complete = False
if mc is not None:
try:
await mc.disconnect()
except Exception:
pass
broadcast_health(False, None)
async def connect(self) -> None:
"""Connect to the radio using the configured transport."""
if self._meshcore is not None:
@@ -461,6 +492,10 @@ class RadioManager:
try:
await asyncio.sleep(CHECK_INTERVAL_SECONDS)
# Skip auto-detect/reconnect while loopback is active
if self._loopback_active:
continue
current_connected = self.is_connected
# Detect status change

View File

@@ -17,6 +17,7 @@ class HealthResponse(BaseModel):
database_size_mb: float
oldest_undecrypted_timestamp: int | None
mqtt_status: str | None = None
loopback_eligible: bool = False
async def build_health_data(radio_connected: bool, connection_info: str | None) -> dict:
@@ -53,6 +54,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None)
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
"mqtt_status": mqtt_status,
"loopback_eligible": settings.loopback_eligible,
}

116
app/routers/loopback.py Normal file
View File

@@ -0,0 +1,116 @@
"""WebSocket endpoint for loopback transport (browser-bridged radio connection)."""
import asyncio
import json
import logging
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from starlette.websockets import WebSocketState
from app.config import settings
from app.loopback import LoopbackTransport
from app.radio import radio_manager
logger = logging.getLogger(__name__)
router = APIRouter()
@router.websocket("/ws/transport")
async def loopback_transport(websocket: WebSocket) -> None:
"""Bridge a browser-side serial/BLE connection to the backend MeshCore stack.
Protocol:
1. Client sends init JSON: {"type": "init", "mode": "serial"|"ble"}
2. Binary frames flow bidirectionally (raw bytes for BLE, framed for serial)
3. Either side can send {"type": "disconnect"} to tear down
"""
# Guard: reject if an explicit transport is configured via env vars
if not settings.loopback_eligible:
await websocket.accept()
await websocket.close(code=4003, reason="Explicit transport configured")
return
# Guard: reject if the radio is already connected (direct or another loopback)
if radio_manager.is_connected:
await websocket.accept()
await websocket.close(code=4004, reason="Radio already connected")
return
await websocket.accept()
transport: LoopbackTransport | None = None
setup_task: asyncio.Task | None = None
try:
# Wait for init message
init_raw = await asyncio.wait_for(websocket.receive_text(), timeout=10.0)
init_msg = json.loads(init_raw)
if init_msg.get("type") != "init" or init_msg.get("mode") not in ("serial", "ble"):
await websocket.close(code=4001, reason="Invalid init message")
return
mode = init_msg["mode"]
logger.info("Loopback init: mode=%s", mode)
# Create transport and MeshCore instance
transport = LoopbackTransport(websocket, mode)
from meshcore import MeshCore
mc = MeshCore(transport, auto_reconnect=False, max_reconnect_attempts=0)
await mc.connect()
if not mc.is_connected:
logger.warning("Loopback MeshCore failed to connect")
await websocket.close(code=4005, reason="MeshCore handshake failed")
return
connection_info = f"Loopback ({mode})"
radio_manager.connect_loopback(mc, connection_info)
# Run post-connect setup in background so the receive loop can run
setup_task = asyncio.create_task(radio_manager.post_connect_setup())
# Main receive loop
while True:
message = await websocket.receive()
if message.get("type") == "websocket.disconnect":
break
if "bytes" in message and message["bytes"]:
transport.handle_rx(message["bytes"])
elif "text" in message and message["text"]:
try:
text_msg = json.loads(message["text"])
if text_msg.get("type") == "disconnect":
logger.info("Loopback client requested disconnect")
break
except (json.JSONDecodeError, TypeError):
pass
except WebSocketDisconnect:
logger.info("Loopback WebSocket disconnected")
except asyncio.TimeoutError:
logger.warning("Loopback init timeout")
if websocket.client_state == WebSocketState.CONNECTED:
await websocket.close(code=4002, reason="Init timeout")
except Exception as e:
logger.exception("Loopback error: %s", e)
finally:
if setup_task is not None:
setup_task.cancel()
try:
await setup_task
except (asyncio.CancelledError, Exception):
pass
await radio_manager.disconnect_loopback()
if websocket.client_state == WebSocketState.CONNECTED:
try:
await websocket.close()
except Exception:
pass

View File

@@ -19,6 +19,7 @@ import {
useAppSettings,
useConversationRouter,
useContactsAndChannels,
useLoopback,
} from './hooks';
import * as messageCache from './messageCache';
import { StatusBar } from './components/StatusBar';
@@ -186,6 +187,8 @@ export function App() {
refreshUnreads,
} = useUnreadCounts(channels, contacts, activeConversation);
const loopback = useLoopback(handleHealthRefresh);
// Determine if active contact is a repeater (used for routing to dashboard)
const activeContactIsRepeater = useMemo(() => {
if (!activeConversation || activeConversation.type !== 'contact') return false;
@@ -481,7 +484,9 @@ export function App() {
</button>
</div>
<div className="flex-1 overflow-y-auto py-1">
{SETTINGS_SECTION_ORDER.map((section) => (
{SETTINGS_SECTION_ORDER.filter(
(s) => s !== 'loopback' || (health?.loopback_eligible && !health?.radio_connected)
).map((section) => (
<button
key={section}
type="button"
@@ -671,6 +676,7 @@ export function App() {
config={config}
health={health}
appSettings={appSettings}
loopback={loopback}
onClose={handleCloseSettingsView}
onSave={handleSaveConfig}
onSaveAppSettings={handleSaveAppSettings}

View File

@@ -17,6 +17,8 @@ import { SettingsDatabaseSection } from './settings/SettingsDatabaseSection';
import { SettingsBotSection } from './settings/SettingsBotSection';
import { SettingsStatisticsSection } from './settings/SettingsStatisticsSection';
import { SettingsAboutSection } from './settings/SettingsAboutSection';
import { SettingsLoopbackSection } from './settings/SettingsLoopbackSection';
import type { UseLoopbackReturn } from '../hooks/useLoopback';
interface SettingsModalBaseProps {
open: boolean;
@@ -24,6 +26,7 @@ interface SettingsModalBaseProps {
config: RadioConfig | null;
health: HealthStatus | null;
appSettings: AppSettings | null;
loopback?: UseLoopbackReturn;
onClose: () => void;
onSave: (update: RadioConfigUpdate) => Promise<void>;
onSaveAppSettings: (update: AppSettingsUpdate) => Promise<void>;
@@ -57,6 +60,7 @@ export function SettingsModal(props: SettingsModalProps) {
onHealthRefresh,
onRefreshAppSettings,
onLocalLabelChange,
loopback,
} = props;
const externalSidebarNav = props.externalSidebarNav === true;
const desktopSection = props.externalSidebarNav ? props.desktopSection : undefined;
@@ -74,6 +78,7 @@ export function SettingsModal(props: SettingsModalProps) {
radio: !isMobile,
identity: false,
connectivity: false,
loopback: false,
mqtt: false,
database: false,
bot: false,
@@ -217,6 +222,18 @@ export function SettingsModal(props: SettingsModalProps) {
</div>
)}
{shouldRenderSection('loopback') &&
health?.loopback_eligible &&
!health?.radio_connected &&
loopback && (
<div className={sectionWrapperClass}>
{renderSectionHeader('loopback')}
{isSectionVisible('loopback') && (
<SettingsLoopbackSection loopback={loopback} className={sectionContentClass} />
)}
</div>
)}
{shouldRenderSection('database') && (
<div className={sectionWrapperClass}>
{renderSectionHeader('database')}

View File

@@ -0,0 +1,136 @@
import { useState } from 'react';
import { Button } from '../ui/button';
import { Input } from '../ui/input';
import { Label } from '../ui/label';
import { Separator } from '../ui/separator';
import type { UseLoopbackReturn } from '../../hooks/useLoopback';
export function SettingsLoopbackSection({
loopback,
className,
}: {
loopback: UseLoopbackReturn;
className?: string;
}) {
const {
status,
error,
transportType,
serialAvailable,
bluetoothAvailable,
connectSerial,
connectBluetooth,
disconnect,
} = loopback;
const [baudRate, setBaudRate] = useState('115200');
const [selectedTransport, setSelectedTransport] = useState<'serial' | 'ble'>(
serialAvailable ? 'serial' : 'ble'
);
const isConnecting = status === 'connecting';
const isConnected = status === 'connected';
const busy = isConnecting || isConnected;
const handleConnect = async () => {
if (selectedTransport === 'serial') {
await connectSerial(parseInt(baudRate, 10) || 115200);
} else {
await connectBluetooth();
}
};
const neitherAvailable = !serialAvailable && !bluetoothAvailable;
return (
<div className={className}>
<p className="text-sm text-muted-foreground">
No direct radio connection detected. You can bridge a radio connected to{' '}
<em>this browser's device</em> via Web Serial or Web Bluetooth.
</p>
{neitherAvailable && (
<div className="rounded-md bg-yellow-500/10 border border-yellow-500/30 p-3 text-sm text-yellow-200">
Your browser does not support Web Serial or Web Bluetooth. Use Chrome or Edge on a secure
context (HTTPS or localhost).
</div>
)}
{!neitherAvailable && (
<>
{isConnected ? (
<>
<div className="flex items-center gap-2">
<div className="w-2 h-2 rounded-full bg-green-500" />
<span className="text-sm">
Connected via {transportType === 'serial' ? 'Serial' : 'Bluetooth'}
</span>
</div>
<Button variant="outline" onClick={disconnect} className="w-full">
Disconnect Loopback
</Button>
</>
) : (
<>
{/* Transport selector */}
<div className="space-y-2">
<Label>Transport</Label>
<div className="flex gap-2">
<Button
variant={selectedTransport === 'serial' ? 'default' : 'outline'}
size="sm"
disabled={!serialAvailable || busy}
onClick={() => setSelectedTransport('serial')}
>
Serial
</Button>
<Button
variant={selectedTransport === 'ble' ? 'default' : 'outline'}
size="sm"
disabled={!bluetoothAvailable || busy}
onClick={() => setSelectedTransport('ble')}
>
Bluetooth
</Button>
</div>
{!serialAvailable && (
<p className="text-xs text-muted-foreground">
Web Serial not available in this browser
</p>
)}
{!bluetoothAvailable && (
<p className="text-xs text-muted-foreground">
Web Bluetooth not available in this browser
</p>
)}
</div>
{/* Baud rate (serial only) */}
{selectedTransport === 'serial' && (
<div className="space-y-2">
<Label htmlFor="loopback-baud">Baud Rate</Label>
<Input
id="loopback-baud"
type="number"
value={baudRate}
onChange={(e) => setBaudRate(e.target.value)}
disabled={busy}
/>
</div>
)}
<Separator />
<Button onClick={handleConnect} disabled={busy} className="w-full">
{isConnecting ? 'Connecting...' : 'Connect via Loopback'}
</Button>
</>
)}
</>
)}
{error && <div className="text-sm text-destructive">{error}</div>}
</div>
);
}

View File

@@ -2,6 +2,7 @@ export type SettingsSection =
| 'radio'
| 'identity'
| 'connectivity'
| 'loopback'
| 'mqtt'
| 'database'
| 'bot'
@@ -12,6 +13,7 @@ export const SETTINGS_SECTION_ORDER: SettingsSection[] = [
'radio',
'identity',
'connectivity',
'loopback',
'database',
'bot',
'mqtt',
@@ -23,6 +25,7 @@ export const SETTINGS_SECTION_LABELS: Record<SettingsSection, string> = {
radio: '📻 Radio',
identity: '🪪 Identity',
connectivity: '📡 Connectivity',
loopback: '🔁 Loopback',
database: '🗄️ Database & Interface',
bot: '🤖 Bots',
mqtt: '📤 MQTT',

View File

@@ -5,3 +5,4 @@ export { useRepeaterDashboard } from './useRepeaterDashboard';
export { useAppSettings } from './useAppSettings';
export { useConversationRouter } from './useConversationRouter';
export { useContactsAndChannels } from './useContactsAndChannels';
export { useLoopback } from './useLoopback';

View File

@@ -0,0 +1,332 @@
import { useState, useCallback, useRef, useEffect } from 'react';
export type LoopbackStatus = 'idle' | 'connecting' | 'connected' | 'error';
export type LoopbackTransportType = 'serial' | 'ble';
// Nordic UART Service UUIDs (used by MeshCore BLE)
const UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e';
const UART_TX_CHAR_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'; // notifications from radio
const UART_RX_CHAR_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'; // write to radio
function getTransportWsUrl(): string {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const isDev = window.location.port === '5173';
return isDev
? `ws://localhost:8000/api/ws/transport`
: `${protocol}//${window.location.host}/api/ws/transport`;
}
export interface UseLoopbackReturn {
status: LoopbackStatus;
error: string | null;
transportType: LoopbackTransportType | null;
serialAvailable: boolean;
bluetoothAvailable: boolean;
connectSerial: (baudRate?: number) => Promise<void>;
connectBluetooth: () => Promise<void>;
disconnect: () => void;
}
export function useLoopback(onConnected?: () => void): UseLoopbackReturn {
const [status, setStatus] = useState<LoopbackStatus>('idle');
const [error, setError] = useState<string | null>(null);
const [transportType, setTransportType] = useState<LoopbackTransportType | null>(null);
const wsRef = useRef<WebSocket | null>(null);
const serialPortRef = useRef<SerialPort | null>(null);
const serialReaderRef = useRef<ReadableStreamDefaultReader<Uint8Array> | null>(null);
const bleDeviceRef = useRef<BluetoothDevice | null>(null);
const cleaningUpRef = useRef(false);
const serialAvailable = typeof navigator !== 'undefined' && 'serial' in navigator;
const bluetoothAvailable = typeof navigator !== 'undefined' && 'bluetooth' in navigator;
const cleanup = useCallback(() => {
if (cleaningUpRef.current) return;
cleaningUpRef.current = true;
// Close WebSocket
const ws = wsRef.current;
if (ws && ws.readyState <= WebSocket.OPEN) {
try {
ws.close();
} catch {
// ignore
}
}
wsRef.current = null;
// Close serial reader and port
const reader = serialReaderRef.current;
if (reader) {
try {
reader.cancel();
} catch {
// ignore
}
}
serialReaderRef.current = null;
const port = serialPortRef.current;
if (port) {
try {
port.close();
} catch {
// ignore
}
}
serialPortRef.current = null;
// Disconnect BLE
const bleDevice = bleDeviceRef.current;
if (bleDevice?.gatt?.connected) {
try {
bleDevice.gatt.disconnect();
} catch {
// ignore
}
}
bleDeviceRef.current = null;
setTransportType(null);
setStatus('idle');
cleaningUpRef.current = false;
}, []);
// Cleanup on unmount
useEffect(() => cleanup, [cleanup]);
const connectSerial = useCallback(
async (baudRate = 115200) => {
setError(null);
setStatus('connecting');
setTransportType('serial');
try {
// Request serial port from user
const port = await navigator.serial!.requestPort();
await port.open({ baudRate, flowControl: 'none' });
// Match meshcore serial behaviour
try {
await port.setSignals({ requestToSend: false });
} catch {
// Not all adapters support setSignals
}
serialPortRef.current = port;
// Open transport WebSocket
const ws = new WebSocket(getTransportWsUrl());
ws.binaryType = 'arraybuffer';
wsRef.current = ws;
await new Promise<void>((resolve, reject) => {
ws.onopen = () => resolve();
ws.onerror = () => reject(new Error('Transport WebSocket failed to connect'));
// Timeout
const timeout = setTimeout(() => reject(new Error('Transport WebSocket timeout')), 10000);
ws.addEventListener('open', () => clearTimeout(timeout), { once: true });
});
// Send init
ws.send(JSON.stringify({ type: 'init', mode: 'serial' }));
// Start serial → WS read loop
const reader = port.readable!.getReader();
serialReaderRef.current = reader;
const readLoop = async () => {
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (value && ws.readyState === WebSocket.OPEN) {
ws.send(value);
}
}
} catch (err) {
// Reader cancelled or port closed — expected during disconnect
if (!cleaningUpRef.current) {
console.debug('Serial read loop ended:', err);
}
}
};
readLoop();
// WS → serial write
ws.onmessage = async (event) => {
if (event.data instanceof ArrayBuffer) {
const writer = port.writable!.getWriter();
try {
await writer.write(new Uint8Array(event.data));
} finally {
writer.releaseLock();
}
} else if (typeof event.data === 'string') {
try {
const msg = JSON.parse(event.data);
if (msg.type === 'disconnect') {
cleanup();
}
} catch {
// ignore non-JSON text
}
}
};
ws.onclose = () => {
if (!cleaningUpRef.current) {
cleanup();
}
};
ws.onerror = () => {
if (!cleaningUpRef.current) {
setError('Transport WebSocket error');
cleanup();
setStatus('error');
}
};
setStatus('connected');
onConnected?.();
} catch (err) {
const message = err instanceof Error ? err.message : 'Serial connection failed';
// Don't show error for user-cancelled port picker
if (err instanceof DOMException && err.name === 'NotFoundError') {
setStatus('idle');
setTransportType(null);
return;
}
setError(message);
cleanup();
setStatus('error');
}
},
[cleanup, onConnected]
);
const connectBluetooth = useCallback(async () => {
setError(null);
setStatus('connecting');
setTransportType('ble');
try {
const device = await navigator.bluetooth!.requestDevice({
filters: [{ namePrefix: 'MeshCore' }],
optionalServices: [UART_SERVICE_UUID],
});
if (!device.gatt) {
throw new Error('Bluetooth GATT not available');
}
bleDeviceRef.current = device;
const server = await device.gatt.connect();
const service = await server.getPrimaryService(UART_SERVICE_UUID);
const txChar = await service.getCharacteristic(UART_TX_CHAR_UUID);
const rxChar = await service.getCharacteristic(UART_RX_CHAR_UUID);
// Open transport WebSocket
const ws = new WebSocket(getTransportWsUrl());
ws.binaryType = 'arraybuffer';
wsRef.current = ws;
await new Promise<void>((resolve, reject) => {
ws.onopen = () => resolve();
ws.onerror = () => reject(new Error('Transport WebSocket failed to connect'));
const timeout = setTimeout(() => reject(new Error('Transport WebSocket timeout')), 10000);
ws.addEventListener('open', () => clearTimeout(timeout), { once: true });
});
// Send init
ws.send(JSON.stringify({ type: 'init', mode: 'ble' }));
// BLE RX notifications → WS
await txChar.startNotifications();
txChar.addEventListener('characteristicvaluechanged', (event) => {
const value = (event.target as BluetoothRemoteGATTCharacteristic).value;
if (value && ws.readyState === WebSocket.OPEN) {
ws.send(value.buffer);
}
});
// WS → BLE TX
ws.onmessage = async (event) => {
if (event.data instanceof ArrayBuffer) {
await rxChar.writeValueWithResponse(new Uint8Array(event.data));
} else if (typeof event.data === 'string') {
try {
const msg = JSON.parse(event.data);
if (msg.type === 'disconnect') {
cleanup();
}
} catch {
// ignore non-JSON text
}
}
};
ws.onclose = () => {
if (!cleaningUpRef.current) {
cleanup();
}
};
ws.onerror = () => {
if (!cleaningUpRef.current) {
setError('Transport WebSocket error');
cleanup();
setStatus('error');
}
};
// Handle BLE disconnect
device.addEventListener('gattserverdisconnected', () => {
if (!cleaningUpRef.current) {
cleanup();
}
});
setStatus('connected');
onConnected?.();
} catch (err) {
const message = err instanceof Error ? err.message : 'Bluetooth connection failed';
// Don't show error for user-cancelled device picker
if (err instanceof DOMException && err.name === 'NotFoundError') {
setStatus('idle');
setTransportType(null);
return;
}
setError(message);
cleanup();
setStatus('error');
}
}, [cleanup, onConnected]);
const disconnect = useCallback(() => {
// Send graceful disconnect before cleanup
const ws = wsRef.current;
if (ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(JSON.stringify({ type: 'disconnect' }));
} catch {
// ignore
}
}
cleanup();
}, [cleanup]);
return {
status,
error,
transportType,
serialAvailable,
bluetoothAvailable,
connectSerial,
connectBluetooth,
disconnect,
};
}

View File

@@ -0,0 +1,139 @@
import { render, screen, fireEvent } from '@testing-library/react';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { SettingsLoopbackSection } from '../components/settings/SettingsLoopbackSection';
import type { UseLoopbackReturn } from '../hooks/useLoopback';
function makeLoopback(overrides?: Partial<UseLoopbackReturn>): UseLoopbackReturn {
return {
status: 'idle',
error: null,
transportType: null,
serialAvailable: true,
bluetoothAvailable: true,
connectSerial: vi.fn(async () => {}),
connectBluetooth: vi.fn(async () => {}),
disconnect: vi.fn(),
...overrides,
};
}
describe('SettingsLoopbackSection', () => {
beforeEach(() => {
vi.restoreAllMocks();
});
it('renders transport selector and connect button when idle', () => {
render(<SettingsLoopbackSection loopback={makeLoopback()} />);
expect(screen.getByText('Serial')).toBeInTheDocument();
expect(screen.getByText('Bluetooth')).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Connect via Loopback' })).toBeInTheDocument();
});
it('shows baud rate input when serial is selected', () => {
render(<SettingsLoopbackSection loopback={makeLoopback()} />);
expect(screen.getByLabelText('Baud Rate')).toBeInTheDocument();
});
it('hides baud rate input when BLE is selected', () => {
render(<SettingsLoopbackSection loopback={makeLoopback()} />);
// Click BLE button
fireEvent.click(screen.getByText('Bluetooth'));
expect(screen.queryByLabelText('Baud Rate')).not.toBeInTheDocument();
});
it('calls connectSerial with baud rate on connect', () => {
const connectSerial = vi.fn(async () => {});
render(<SettingsLoopbackSection loopback={makeLoopback({ connectSerial })} />);
fireEvent.click(screen.getByRole('button', { name: 'Connect via Loopback' }));
expect(connectSerial).toHaveBeenCalledWith(115200);
});
it('calls connectBluetooth when BLE selected and connect clicked', () => {
const connectBluetooth = vi.fn(async () => {});
render(<SettingsLoopbackSection loopback={makeLoopback({ connectBluetooth })} />);
fireEvent.click(screen.getByText('Bluetooth'));
fireEvent.click(screen.getByRole('button', { name: 'Connect via Loopback' }));
expect(connectBluetooth).toHaveBeenCalled();
});
it('shows connected state with disconnect button', () => {
render(
<SettingsLoopbackSection
loopback={makeLoopback({ status: 'connected', transportType: 'serial' })}
/>
);
expect(screen.getByText(/Connected via Serial/i)).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Disconnect Loopback' })).toBeInTheDocument();
expect(screen.queryByRole('button', { name: 'Connect via Loopback' })).not.toBeInTheDocument();
});
it('calls disconnect on disconnect button click', () => {
const disconnect = vi.fn();
render(
<SettingsLoopbackSection
loopback={makeLoopback({ status: 'connected', transportType: 'serial', disconnect })}
/>
);
fireEvent.click(screen.getByRole('button', { name: 'Disconnect Loopback' }));
expect(disconnect).toHaveBeenCalled();
});
it('shows connecting state', () => {
render(<SettingsLoopbackSection loopback={makeLoopback({ status: 'connecting' })} />);
expect(screen.getByRole('button', { name: 'Connecting...' })).toBeDisabled();
});
it('shows error message', () => {
render(
<SettingsLoopbackSection loopback={makeLoopback({ status: 'error', error: 'Port failed' })} />
);
expect(screen.getByText('Port failed')).toBeInTheDocument();
});
it('shows warning when neither serial nor bluetooth available', () => {
render(
<SettingsLoopbackSection
loopback={makeLoopback({ serialAvailable: false, bluetoothAvailable: false })}
/>
);
expect(screen.getByText(/does not support Web Serial or Web Bluetooth/)).toBeInTheDocument();
// Connect button should not appear
expect(screen.queryByRole('button', { name: 'Connect via Loopback' })).not.toBeInTheDocument();
});
it('disables serial button when serial not available', () => {
render(<SettingsLoopbackSection loopback={makeLoopback({ serialAvailable: false })} />);
expect(screen.getByText('Serial')).toBeDisabled();
expect(screen.getByText(/Web Serial not available/)).toBeInTheDocument();
});
it('disables bluetooth button when bluetooth not available', () => {
render(<SettingsLoopbackSection loopback={makeLoopback({ bluetoothAvailable: false })} />);
expect(screen.getByText('Bluetooth')).toBeDisabled();
expect(screen.getByText(/Web Bluetooth not available/)).toBeInTheDocument();
});
it('defaults to BLE when serial is not available', () => {
render(<SettingsLoopbackSection loopback={makeLoopback({ serialAvailable: false })} />);
// BLE should be selected, so baud rate should NOT be visible
expect(screen.queryByLabelText('Baud Rate')).not.toBeInTheDocument();
});
});

View File

@@ -39,6 +39,7 @@ const baseHealth: HealthStatus = {
database_size_mb: 1.2,
oldest_undecrypted_timestamp: null,
mqtt_status: null,
loopback_eligible: false,
};
const baseSettings: AppSettings = {

View File

@@ -30,6 +30,7 @@ export interface HealthStatus {
database_size_mb: number;
oldest_undecrypted_timestamp: number | null;
mqtt_status: string | null;
loopback_eligible: boolean;
}
export interface MaintenanceResult {

View File

@@ -0,0 +1,81 @@
// Type declarations for Web Serial API and Web Bluetooth API
// These APIs are only available in Chrome/Edge and require secure context.
// --- Web Serial API ---
interface SerialPortRequestOptions {
filters?: SerialPortFilter[];
}
interface SerialPortFilter {
usbVendorId?: number;
usbProductId?: number;
}
interface SerialOptions {
baudRate: number;
dataBits?: number;
stopBits?: number;
parity?: 'none' | 'even' | 'odd';
bufferSize?: number;
flowControl?: 'none' | 'hardware';
}
interface SerialPort {
readable: ReadableStream<Uint8Array> | null;
writable: WritableStream<Uint8Array> | null;
open(options: SerialOptions): Promise<void>;
close(): Promise<void>;
setSignals(signals: { requestToSend?: boolean; dataTerminalReady?: boolean }): Promise<void>;
}
interface Serial {
requestPort(options?: SerialPortRequestOptions): Promise<SerialPort>;
}
// --- Web Bluetooth API ---
interface BluetoothRequestDeviceFilter {
namePrefix?: string;
name?: string;
services?: BluetoothServiceUUID[];
}
type BluetoothServiceUUID = string | number;
interface RequestDeviceOptions {
filters?: BluetoothRequestDeviceFilter[];
optionalServices?: BluetoothServiceUUID[];
acceptAllDevices?: boolean;
}
interface BluetoothRemoteGATTCharacteristic extends EventTarget {
value: DataView | null;
startNotifications(): Promise<BluetoothRemoteGATTCharacteristic>;
writeValueWithResponse(value: BufferSource): Promise<void>;
}
interface BluetoothRemoteGATTService {
getCharacteristic(uuid: string): Promise<BluetoothRemoteGATTCharacteristic>;
}
interface BluetoothRemoteGATTServer {
connected: boolean;
connect(): Promise<BluetoothRemoteGATTServer>;
disconnect(): void;
getPrimaryService(uuid: string): Promise<BluetoothRemoteGATTService>;
}
interface BluetoothDevice extends EventTarget {
gatt?: BluetoothRemoteGATTServer;
}
interface Bluetooth {
requestDevice(options: RequestDeviceOptions): Promise<BluetoothDevice>;
}
// Extend Navigator interface
interface Navigator {
serial?: Serial;
bluetooth?: Bluetooth;
}

426
tests/test_loopback.py Normal file
View File

@@ -0,0 +1,426 @@
"""Tests for loopback transport and WebSocket endpoint."""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from starlette.websockets import WebSocketState
# ---------------------------------------------------------------------------
# LoopbackTransport unit tests
# ---------------------------------------------------------------------------
class TestLoopbackTransportFraming:
"""Serial framing round-trip tests (0x3c + 2-byte LE size)."""
def _make_transport(self, mode="serial"):
from app.loopback import LoopbackTransport
ws = MagicMock()
ws.client_state = WebSocketState.CONNECTED
ws.send_bytes = AsyncMock()
ws.send_json = AsyncMock()
return LoopbackTransport(ws, mode), ws
@pytest.mark.asyncio
async def test_send_serial_adds_framing(self):
"""send() in serial mode prepends 0x3c + 2-byte LE size."""
transport, ws = self._make_transport("serial")
payload = b"\x01\x02\x03\x04\x05"
await transport.send(payload)
expected = b"\x3c\x05\x00\x01\x02\x03\x04\x05"
ws.send_bytes.assert_awaited_once_with(expected)
@pytest.mark.asyncio
async def test_send_ble_raw(self):
"""send() in BLE mode sends raw bytes (no framing)."""
transport, ws = self._make_transport("ble")
payload = b"\x01\x02\x03"
await transport.send(payload)
ws.send_bytes.assert_awaited_once_with(payload)
def test_handle_rx_serial_strips_framing(self):
"""handle_rx in serial mode strips 0x3c header and delivers payload."""
transport, _ = self._make_transport("serial")
reader = MagicMock()
reader.handle_rx = AsyncMock()
transport.set_reader(reader)
payload = b"\xaa\xbb\xcc"
# Build framed data: 0x3c + 2-byte LE size + payload
framed = b"\x3c" + len(payload).to_bytes(2, "little") + payload
with patch("app.loopback.asyncio.create_task") as mock_task:
transport.handle_rx(framed)
# reader.handle_rx should be called with the payload only
mock_task.assert_called_once()
assert reader.handle_rx.call_count == 1
assert reader.handle_rx.call_args[0][0] == payload
def test_handle_rx_serial_incremental(self):
"""handle_rx in serial mode handles data arriving byte by byte."""
transport, _ = self._make_transport("serial")
reader = MagicMock()
reader.handle_rx = AsyncMock()
transport.set_reader(reader)
payload = b"\x01\x02"
framed = b"\x3c" + len(payload).to_bytes(2, "little") + payload
with patch("app.loopback.asyncio.create_task") as mock_task:
# Feed one byte at a time
for byte in framed:
transport.handle_rx(bytes([byte]))
mock_task.assert_called_once()
assert reader.handle_rx.call_args[0][0] == payload
def test_handle_rx_serial_multiple_frames(self):
"""handle_rx handles two frames concatenated in one chunk."""
transport, _ = self._make_transport("serial")
reader = MagicMock()
reader.handle_rx = AsyncMock()
transport.set_reader(reader)
p1 = b"\x01\x02"
p2 = b"\x03\x04\x05"
framed = (
b"\x3c"
+ len(p1).to_bytes(2, "little")
+ p1
+ b"\x3c"
+ len(p2).to_bytes(2, "little")
+ p2
)
with patch("app.loopback.asyncio.create_task") as mock_task:
transport.handle_rx(framed)
assert mock_task.call_count == 2
assert reader.handle_rx.call_args_list[0][0][0] == p1
assert reader.handle_rx.call_args_list[1][0][0] == p2
def test_handle_rx_ble_passthrough(self):
"""handle_rx in BLE mode passes raw bytes to reader directly."""
transport, _ = self._make_transport("ble")
reader = MagicMock()
reader.handle_rx = AsyncMock()
transport.set_reader(reader)
data = b"\xde\xad\xbe\xef"
with patch("app.loopback.asyncio.create_task") as mock_task:
transport.handle_rx(data)
mock_task.assert_called_once()
assert reader.handle_rx.call_args[0][0] == data
@pytest.mark.asyncio
async def test_connect_returns_info_string(self):
"""connect() returns a descriptive string."""
transport, _ = self._make_transport("serial")
result = await transport.connect()
assert "Loopback" in result
assert "serial" in result
@pytest.mark.asyncio
async def test_disconnect_sends_json(self):
"""disconnect() sends a disconnect JSON message."""
transport, ws = self._make_transport("serial")
await transport.disconnect()
ws.send_json.assert_awaited_once_with({"type": "disconnect"})
@pytest.mark.asyncio
async def test_disconnect_handles_closed_ws(self):
"""disconnect() does not raise if WS is already closed."""
transport, ws = self._make_transport("serial")
ws.client_state = WebSocketState.DISCONNECTED
# Should not raise
await transport.disconnect()
ws.send_json.assert_not_awaited()
@pytest.mark.asyncio
async def test_send_noop_when_ws_closed(self):
"""send() does nothing when WebSocket is not connected."""
transport, ws = self._make_transport("serial")
ws.client_state = WebSocketState.DISCONNECTED
await transport.send(b"\x01\x02")
ws.send_bytes.assert_not_awaited()
def test_set_reader_and_callback(self):
"""set_reader and set_disconnect_callback store references."""
transport, _ = self._make_transport("serial")
reader = MagicMock()
callback = MagicMock()
transport.set_reader(reader)
transport.set_disconnect_callback(callback)
assert transport._reader is reader
assert transport._disconnect_callback is callback
def test_reset_framing(self):
"""reset_framing clears the state machine."""
transport, _ = self._make_transport("serial")
transport._frame_started = True
transport._header = b"\x3c\x05"
transport._inframe = b"\x01\x02"
transport._frame_size = 5
transport.reset_framing()
assert transport._frame_started is False
assert transport._header == b""
assert transport._inframe == b""
assert transport._frame_size == 0
# ---------------------------------------------------------------------------
# RadioManager loopback methods
# ---------------------------------------------------------------------------
class TestRadioManagerLoopback:
"""Tests for connect_loopback / disconnect_loopback state transitions."""
def test_connect_loopback_sets_state(self):
from app.radio import RadioManager
rm = RadioManager()
mc = MagicMock()
rm.connect_loopback(mc, "Loopback (serial)")
assert rm.meshcore is mc
assert rm.connection_info == "Loopback (serial)"
assert rm.loopback_active is True
assert rm._last_connected is True
assert rm._setup_complete is False
@pytest.mark.asyncio
async def test_disconnect_loopback_clears_state(self):
from app.radio import RadioManager
rm = RadioManager()
mc = MagicMock()
mc.disconnect = AsyncMock()
rm.connect_loopback(mc, "Loopback (serial)")
with patch("app.websocket.broadcast_health"):
await rm.disconnect_loopback()
assert rm.meshcore is None
assert rm.connection_info is None
assert rm.loopback_active is False
mc.disconnect.assert_awaited_once()
@pytest.mark.asyncio
async def test_disconnect_loopback_handles_mc_disconnect_error(self):
"""disconnect_loopback doesn't raise if mc.disconnect() fails."""
from app.radio import RadioManager
rm = RadioManager()
mc = MagicMock()
mc.disconnect = AsyncMock(side_effect=OSError("transport closed"))
rm.connect_loopback(mc, "Loopback (ble)")
with patch("app.websocket.broadcast_health"):
# Should not raise
await rm.disconnect_loopback()
assert rm.loopback_active is False
@pytest.mark.asyncio
async def test_disconnect_loopback_broadcasts_health_false(self):
from app.radio import RadioManager
rm = RadioManager()
mc = MagicMock()
mc.disconnect = AsyncMock()
rm.connect_loopback(mc, "Loopback (serial)")
with patch("app.websocket.broadcast_health") as mock_bh:
await rm.disconnect_loopback()
mock_bh.assert_called_once_with(False, None)
@pytest.mark.asyncio
async def test_monitor_skips_reconnect_during_loopback(self):
"""Connection monitor skips auto-detect when _loopback_active is True."""
from app.radio import RadioManager
rm = RadioManager()
mc = MagicMock()
mc.is_connected = True
rm.connect_loopback(mc, "Loopback (serial)")
rm.reconnect = AsyncMock()
sleep_count = 0
async def _sleep(_seconds: float):
nonlocal sleep_count
sleep_count += 1
if sleep_count >= 3:
raise asyncio.CancelledError()
with patch("app.radio.asyncio.sleep", side_effect=_sleep):
await rm.start_connection_monitor()
try:
await rm._reconnect_task
finally:
await rm.stop_connection_monitor()
# reconnect should never be called while loopback is active
rm.reconnect.assert_not_awaited()
# ---------------------------------------------------------------------------
# WebSocket endpoint tests
# ---------------------------------------------------------------------------
class TestLoopbackEndpointGuards:
"""Tests for the /ws/transport WebSocket endpoint guard conditions."""
def test_rejects_when_explicit_transport_configured(self):
"""Endpoint closes when explicit transport env is set."""
from fastapi.testclient import TestClient
from app.main import app
with (
patch("app.routers.loopback.settings") as mock_settings,
patch("app.routers.loopback.radio_manager"),
):
mock_settings.loopback_eligible = False
client = TestClient(app)
# Endpoint accepts then immediately closes — verify by catching the close
with client.websocket_connect("/api/ws/transport") as ws:
closed = False
try:
ws.receive_text()
except Exception: # noqa: BLE001
closed = True
assert closed
def test_rejects_when_radio_already_connected(self):
"""Endpoint closes when radio is already connected."""
from fastapi.testclient import TestClient
from app.main import app
with (
patch("app.routers.loopback.settings") as mock_settings,
patch("app.routers.loopback.radio_manager") as mock_rm,
):
mock_settings.loopback_eligible = True
mock_rm.is_connected = True
client = TestClient(app)
with client.websocket_connect("/api/ws/transport") as ws:
closed = False
try:
ws.receive_text()
except Exception: # noqa: BLE001
closed = True
assert closed
class TestLoopbackEndpointInit:
"""Tests for the init handshake and basic operation."""
def test_rejects_invalid_init_message(self):
"""Endpoint closes on invalid init JSON."""
from fastapi.testclient import TestClient
from app.main import app
with (
patch("app.routers.loopback.settings") as mock_settings,
patch("app.routers.loopback.radio_manager") as mock_rm,
):
mock_settings.loopback_eligible = True
mock_rm.is_connected = False
mock_rm.disconnect_loopback = AsyncMock()
client = TestClient(app)
with client.websocket_connect("/api/ws/transport") as ws:
ws.send_text(json.dumps({"type": "init", "mode": "invalid"}))
closed = False
try:
ws.receive_text()
except Exception: # noqa: BLE001
closed = True
assert closed
def test_accepts_valid_serial_init(self):
"""Endpoint accepts valid serial init and proceeds to MeshCore creation."""
mock_mc = MagicMock()
mock_mc.is_connected = True
mock_mc.connect = AsyncMock()
with (
patch("app.routers.loopback.settings") as mock_settings,
patch("app.routers.loopback.radio_manager") as mock_rm,
patch("meshcore.MeshCore", return_value=mock_mc) as mock_mc_cls,
):
mock_settings.loopback_eligible = True
mock_rm.is_connected = False
mock_rm.connect_loopback = MagicMock()
mock_rm.post_connect_setup = AsyncMock()
mock_rm.disconnect_loopback = AsyncMock()
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
with client.websocket_connect("/api/ws/transport") as ws:
ws.send_text(json.dumps({"type": "init", "mode": "serial"}))
# Send disconnect to close cleanly
ws.send_text(json.dumps({"type": "disconnect"}))
# Verify MeshCore was created and connect_loopback called
mock_mc_cls.assert_called_once()
mock_mc.connect.assert_awaited_once()
mock_rm.connect_loopback.assert_called_once()
mock_rm.disconnect_loopback.assert_awaited_once()
# ---------------------------------------------------------------------------
# Config loopback_eligible property
# ---------------------------------------------------------------------------
class TestConfigLoopbackEligible:
"""Tests for the loopback_eligible property."""
def test_eligible_when_no_transport_set(self):
from app.config import Settings
s = Settings(serial_port="", tcp_host="", ble_address="")
assert s.loopback_eligible is True
def test_not_eligible_with_serial_port(self):
from app.config import Settings
s = Settings(serial_port="/dev/ttyUSB0")
assert s.loopback_eligible is False
def test_not_eligible_with_tcp_host(self):
from app.config import Settings
s = Settings(tcp_host="192.168.1.1")
assert s.loopback_eligible is False
def test_not_eligible_with_ble(self):
from app.config import Settings
s = Settings(ble_address="AA:BB:CC:DD:EE:FF", ble_pin="1234")
assert s.loopback_eligible is False