Initial mqtt implementation

This commit is contained in:
Jack Kingsman
2026-02-28 21:24:46 -08:00
parent 727ac913de
commit f993110ec4
23 changed files with 1462 additions and 27 deletions

View File

@@ -91,7 +91,7 @@ A web interface for MeshCore mesh radio networks. The backend connects to a Mesh
1. **Store-and-serve**: Backend stores all packets even when no client is connected
2. **Parallel storage**: Messages stored both decrypted (when possible) and as raw packets
3. **Extended capacity**: Server stores contacts/channels beyond radio limits (~350 contacts, ~40 channels)
4. **Real-time updates**: WebSocket pushes events; REST for actions
4. **Real-time updates**: WebSocket pushes events; REST for actions; optional MQTT forwarding
5. **Offline-capable**: Radio operates independently; server syncs when connected
6. **Auto-reconnect**: Background monitor detects disconnection and attempts reconnection
@@ -126,7 +126,7 @@ To improve repeater disambiguation in the network visualizer, the backend stores
1. Radio receives raw bytes → `packet_processor.py` parses, decrypts, deduplicates, and stores in database (primary path via `RX_LOG_DATA` event)
2. `event_handlers.py` handles higher-level events (`CONTACT_MSG_RECV`, `ACK`) as a fallback/supplement
3. `ws_manager` broadcasts to connected clients
3. `broadcast_event()` in `websocket.py` fans out to both WebSocket clients and MQTT
4. Frontend `useWebSocket` receives → updates React state
### Outgoing Messages
@@ -158,7 +158,8 @@ This message-layer echo/path handling is independent of raw-packet storage dedup
│ ├── repository.py # Database CRUD
│ ├── event_handlers.py # Radio events
│ ├── decoder.py # Packet decryption
── websocket.py # Real-time broadcasts
── websocket.py # Real-time broadcasts
│ └── mqtt.py # Optional MQTT publisher
├── frontend/ # React frontend
│ ├── AGENTS.md # Frontend documentation
│ ├── src/
@@ -354,6 +355,23 @@ Read state (`last_read_at`) is tracked **server-side** for consistency across de
**Note:** These are NOT the same as `Message.conversation_key` (the database field).
### MQTT Publishing
Optional MQTT integration forwards mesh events to an external broker for home automation, logging, or alerting. All MQTT config is stored in the database (`app_settings`), not env vars — configured from the Settings pane, no server restart needed.
**Two independent toggles**: publish decrypted messages, publish raw packets.
**Topic structure** (default prefix `meshcore`):
- `meshcore/dm:<contact_public_key>` — decrypted DM
- `meshcore/gm:<channel_key>` — decrypted channel message
- `meshcore/raw/dm:<contact_key>` — raw packet attributed to a DM contact
- `meshcore/raw/gm:<channel_key>` — raw packet attributed to a channel
- `meshcore/raw/unrouted` — raw packets that couldn't be attributed
**Architecture**: `broadcast_event()` in `websocket.py` calls `mqtt_broadcast()` — a single hook covering all message and raw_packet broadcasts. The `MqttPublisher` in `app/mqtt.py` manages a background connection loop with auto-reconnect and backoff. Publishes are fire-and-forget (silent drop if disconnected). Connection state changes trigger toasts via `broadcast_error`/`broadcast_success`. The health endpoint includes `mqtt_status`.
**Security**: MQTT password stored in plaintext in SQLite, consistent with the project's trusted-network design.
### Server-Side Decryption
The server can decrypt packets using stored keys, both in real-time and for historical packets.
@@ -395,7 +413,7 @@ mc.subscribe(EventType.ACK, handler)
| `MESHCORE_LOG_LEVEL` | `INFO` | Logging level (`DEBUG`/`INFO`/`WARNING`/`ERROR`) |
| `MESHCORE_DATABASE_PATH` | `data/meshcore.db` | SQLite database location |
**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, and `bots`. They are configured via `GET/PATCH /api/settings` (and related settings endpoints).
**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `bots`, and all MQTT configuration (`mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`, `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`). They are configured via `GET/PATCH /api/settings` (and related settings endpoints).
Byte-perfect channel retries are user-triggered via `POST /api/messages/channel/{message_id}/resend` and are allowed for 30 seconds after the original send.

View File

@@ -8,6 +8,7 @@ Backend server + browser interface for MeshCore mesh radio networks. Connect you
* Monitor unlimited contacts and channels (radio limits don't apply -- packets are decrypted server-side)
* Access your radio remotely over your network or VPN
* Search for hashtag room names for channels you don't have keys for yet
* Forward decrypted packets to MQTT brokers
* Visualize the mesh as a map or node set, view repeater stats, and more!
**Warning:** This app has no auth, and is for trusted environments only. _Do not put this on an untrusted network, or open it to the public._ The bots can execute arbitrary Python code which means anyone on your network can, too. If you need access control, consider using a reverse proxy like Nginx, or extending FastAPI; access control and user management are outside the scope of this app.

View File

@@ -27,6 +27,7 @@ app/
├── packet_processor.py # Raw packet pipeline, dedup, path handling
├── event_handlers.py # MeshCore event subscriptions and ACK tracking
├── websocket.py # WS manager + broadcast helpers
├── mqtt.py # Optional MQTT publisher (fire-and-forget forwarding)
├── bot.py # Bot execution and outbound bot sends
├── dependencies.py # Shared FastAPI dependency providers
├── keystore.py # Ephemeral private/public key storage for DM decryption
@@ -99,6 +100,19 @@ app/
- `0` means disabled.
- Last send time tracked in `app_settings.last_advert_time`.
### MQTT publishing
- Optional forwarding of mesh events to an external MQTT broker.
- All config in `app_settings` (not env vars): `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`, `mqtt_use_tls`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`.
- Disabled when `mqtt_broker_host` is empty.
- `broadcast_event()` in `websocket.py` calls `mqtt_broadcast()` — single hook covers all message and raw_packet events.
- `MqttPublisher` (`app/mqtt.py`) runs a background connection loop with auto-reconnect and exponential backoff (5s → 30s).
- Publishes are fire-and-forget; individual publish failures logged but not surfaced to users.
- Connection state changes surface via `broadcast_error`/`broadcast_success` toasts.
- Health endpoint includes `mqtt_status` field (`connected`, `disconnected`, `disabled`).
- Settings changes trigger `mqtt_publisher.restart()` — no server restart needed.
- Topics: `{prefix}/dm:{key}`, `{prefix}/gm:{key}`, `{prefix}/raw/dm:{key}`, `{prefix}/raw/gm:{key}`, `{prefix}/raw/unrouted`.
## API Surface (all under `/api`)
### Health
@@ -204,6 +218,8 @@ Main tables:
- `advert_interval`
- `last_advert_time`
- `bots`
- `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`
- `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`
## Security Posture (intentional)
@@ -239,6 +255,7 @@ tests/
├── test_message_pagination.py # Cursor-based message pagination
├── test_message_prefix_claim.py # Message prefix claim logic
├── test_migrations.py # Schema migration system
├── test_mqtt.py # MQTT publisher topic routing and lifecycle
├── test_packet_pipeline.py # End-to-end packet processing
├── test_radio.py # RadioManager, serial detection
├── test_radio_operation.py # radio_operation() context manager

View File

@@ -56,9 +56,20 @@ async def lifespan(app: FastAPI):
# Always start connection monitor (even if initial connection failed)
await radio_manager.start_connection_monitor()
# Start MQTT publisher if configured
from app.mqtt import mqtt_publisher
from app.repository import AppSettingsRepository
try:
mqtt_settings = await AppSettingsRepository.get()
await mqtt_publisher.start(mqtt_settings)
except Exception as e:
logger.warning("Failed to start MQTT publisher: %s", e)
yield
logger.info("Shutting down")
await mqtt_publisher.stop()
await radio_manager.stop_connection_monitor()
await stop_message_polling()
await stop_periodic_advert()

View File

@@ -247,6 +247,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int:
await set_version(conn, 30)
applied += 1
# Migration 31: Add MQTT configuration columns to app_settings
if version < 31:
logger.info("Applying migration 31: add MQTT columns to app_settings")
await _migrate_031_add_mqtt_columns(conn)
await set_version(conn, 31)
applied += 1
if applied > 0:
logger.info(
"Applied %d migration(s), schema now at version %d", applied, await get_version(conn)
@@ -1852,3 +1859,35 @@ async def _migrate_030_add_pagination_index(conn: aiosqlite.Connection) -> None:
)
await conn.execute("DROP INDEX IF EXISTS idx_messages_conversation")
await conn.commit()
async def _migrate_031_add_mqtt_columns(conn: aiosqlite.Connection) -> None:
"""Add MQTT configuration columns to app_settings."""
# Guard: app_settings may not exist in partial-schema test setups
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings'"
)
if not await cursor.fetchone():
await conn.commit()
return
cursor = await conn.execute("PRAGMA table_info(app_settings)")
columns = {row[1] for row in await cursor.fetchall()}
new_columns = [
("mqtt_broker_host", "TEXT DEFAULT ''"),
("mqtt_broker_port", "INTEGER DEFAULT 1883"),
("mqtt_username", "TEXT DEFAULT ''"),
("mqtt_password", "TEXT DEFAULT ''"),
("mqtt_use_tls", "INTEGER DEFAULT 0"),
("mqtt_tls_insecure", "INTEGER DEFAULT 0"),
("mqtt_topic_prefix", "TEXT DEFAULT 'meshcore'"),
("mqtt_publish_messages", "INTEGER DEFAULT 0"),
("mqtt_publish_raw_packets", "INTEGER DEFAULT 0"),
]
for col_name, col_def in new_columns:
if col_name not in columns:
await conn.execute(f"ALTER TABLE app_settings ADD COLUMN {col_name} {col_def}")
await conn.commit()

View File

@@ -173,6 +173,8 @@ class RawPacketDecryptedInfo(BaseModel):
channel_name: str | None = None
sender: str | None = None
channel_key: str | None = None
contact_key: str | None = None
class RawPacketBroadcast(BaseModel):
@@ -425,6 +427,42 @@ class AppSettings(BaseModel):
default_factory=list,
description="List of bot configurations",
)
mqtt_broker_host: str = Field(
default="",
description="MQTT broker hostname (empty = disabled)",
)
mqtt_broker_port: int = Field(
default=1883,
description="MQTT broker port",
)
mqtt_username: str = Field(
default="",
description="MQTT username (optional)",
)
mqtt_password: str = Field(
default="",
description="MQTT password (optional)",
)
mqtt_use_tls: bool = Field(
default=False,
description="Whether to use TLS for MQTT connection",
)
mqtt_tls_insecure: bool = Field(
default=False,
description="Skip TLS certificate verification (for self-signed certs)",
)
mqtt_topic_prefix: str = Field(
default="meshcore",
description="MQTT topic prefix",
)
mqtt_publish_messages: bool = Field(
default=False,
description="Whether to publish decrypted messages to MQTT",
)
mqtt_publish_raw_packets: bool = Field(
default=False,
description="Whether to publish raw packets to MQTT",
)
class BusyChannel(BaseModel):

223
app/mqtt.py Normal file
View File

@@ -0,0 +1,223 @@
"""MQTT publisher for forwarding mesh network events to an MQTT broker."""
from __future__ import annotations
import asyncio
import json
import logging
import ssl
from typing import Any
import aiomqtt
from app.models import AppSettings
logger = logging.getLogger(__name__)
# Reconnect backoff: start at 5s, cap at 30s
_BACKOFF_MIN = 5
_BACKOFF_MAX = 30
class MqttPublisher:
"""Manages an MQTT connection and publishes mesh network events."""
def __init__(self) -> None:
self._client: aiomqtt.Client | None = None
self._task: asyncio.Task[None] | None = None
self._settings: AppSettings | None = None
self._settings_version: int = 0
self._version_event: asyncio.Event = asyncio.Event()
self.connected: bool = False
async def start(self, settings: AppSettings) -> None:
"""Start the background connection loop."""
self._settings = settings
self._settings_version += 1
self._version_event.set()
if self._task is None or self._task.done():
self._task = asyncio.create_task(self._connection_loop())
async def stop(self) -> None:
"""Cancel the background task and disconnect."""
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self._client = None
self.connected = False
async def restart(self, settings: AppSettings) -> None:
"""Called when MQTT settings change — stop + start."""
await self.stop()
await self.start(settings)
async def publish(self, topic: str, payload: dict[str, Any]) -> None:
"""Publish a JSON payload. Drops silently if not connected."""
if self._client is None or not self.connected:
return
try:
await self._client.publish(topic, json.dumps(payload))
except Exception as e:
logger.warning("MQTT publish failed on %s: %s", topic, e)
self.connected = False
# Wake the connection loop so it exits the wait and reconnects
self._settings_version += 1
self._version_event.set()
def _mqtt_configured(self) -> bool:
"""Check if MQTT is configured (broker host is set)."""
return bool(self._settings and self._settings.mqtt_broker_host)
async def _connection_loop(self) -> None:
"""Background loop: connect, wait, reconnect on failure."""
from app.websocket import broadcast_error, broadcast_success
backoff = _BACKOFF_MIN
while True:
if not self._mqtt_configured():
self.connected = False
self._client = None
# Wait until settings change (which might configure MQTT)
self._version_event.clear()
try:
await self._version_event.wait()
except asyncio.CancelledError:
return
continue
settings = self._settings
assert settings is not None # guaranteed by _mqtt_configured()
version_at_connect = self._settings_version
try:
tls_context = self._build_tls_context(settings)
async with aiomqtt.Client(
hostname=settings.mqtt_broker_host,
port=settings.mqtt_broker_port,
username=settings.mqtt_username or None,
password=settings.mqtt_password or None,
tls_context=tls_context,
) as client:
self._client = client
self.connected = True
backoff = _BACKOFF_MIN
broadcast_success(
"MQTT connected",
f"{settings.mqtt_broker_host}:{settings.mqtt_broker_port}",
)
_broadcast_mqtt_health()
# Wait until cancelled or settings version changes.
# The 60s timeout is a housekeeping wake-up; actual connection
# liveness is handled by paho-mqtt's keepalive mechanism.
while self._settings_version == version_at_connect:
self._version_event.clear()
try:
await asyncio.wait_for(self._version_event.wait(), timeout=60)
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
self.connected = False
self._client = None
return
except Exception as e:
self.connected = False
self._client = None
broadcast_error(
"MQTT connection failure",
"Please correct the settings or disable.",
)
_broadcast_mqtt_health()
logger.warning("MQTT connection error: %s (reconnecting in %ds)", e, backoff)
try:
await asyncio.sleep(backoff)
except asyncio.CancelledError:
return
backoff = min(backoff * 2, _BACKOFF_MAX)
@staticmethod
def _build_tls_context(settings: AppSettings) -> ssl.SSLContext | None:
"""Build TLS context from settings, or None if TLS is disabled."""
if not settings.mqtt_use_tls:
return None
ctx = ssl.create_default_context()
if settings.mqtt_tls_insecure:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
# Module-level singleton
mqtt_publisher = MqttPublisher()
def _broadcast_mqtt_health() -> None:
"""Push updated health (including mqtt_status) to all WS clients."""
from app.radio import radio_manager
from app.websocket import broadcast_health
broadcast_health(radio_manager.is_connected, radio_manager.connection_info)
def mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None:
"""Fire-and-forget MQTT publish, matching broadcast_event's pattern."""
if event_type not in ("message", "raw_packet"):
return
if not mqtt_publisher.connected or mqtt_publisher._settings is None:
return
asyncio.create_task(_mqtt_maybe_publish(event_type, data))
async def _mqtt_maybe_publish(event_type: str, data: dict[str, Any]) -> None:
"""Check settings and build topic, then publish."""
settings = mqtt_publisher._settings
if settings is None:
return
try:
if event_type == "message" and settings.mqtt_publish_messages:
topic = _build_message_topic(settings.mqtt_topic_prefix, data)
await mqtt_publisher.publish(topic, data)
elif event_type == "raw_packet" and settings.mqtt_publish_raw_packets:
topic = _build_raw_packet_topic(settings.mqtt_topic_prefix, data)
await mqtt_publisher.publish(topic, data)
except Exception as e:
logger.warning("MQTT broadcast error: %s", e)
def _build_message_topic(prefix: str, data: dict[str, Any]) -> str:
"""Build MQTT topic for a decrypted message."""
msg_type = data.get("type", "")
conversation_key = data.get("conversation_key", "unknown")
if msg_type == "PRIV":
return f"{prefix}/dm:{conversation_key}"
elif msg_type == "CHAN":
return f"{prefix}/gm:{conversation_key}"
return f"{prefix}/message:{conversation_key}"
def _build_raw_packet_topic(prefix: str, data: dict[str, Any]) -> str:
"""Build MQTT topic for a raw packet."""
info = data.get("decrypted_info")
if info and isinstance(info, dict):
contact_key = info.get("contact_key")
channel_key = info.get("channel_key")
if contact_key:
return f"{prefix}/raw/dm:{contact_key}"
if channel_key:
return f"{prefix}/raw/gm:{channel_key}"
return f"{prefix}/raw/unrouted"

View File

@@ -576,6 +576,8 @@ async def process_raw_packet(
decrypted_info=RawPacketDecryptedInfo(
channel_name=result["channel_name"],
sender=result["sender"],
channel_key=result.get("channel_key"),
contact_key=result.get("contact_key"),
)
if result["decrypted"]
else None,
@@ -632,6 +634,7 @@ async def _process_group_text(
"channel_name": channel.name,
"sender": decrypted.sender,
"message_id": msg_id, # None if duplicate, msg_id if new
"channel_key": channel.key,
}
# Couldn't decrypt with any known key
@@ -888,6 +891,7 @@ async def _process_direct_message(
"contact_name": contact.name,
"sender": contact.name or contact.public_key[:12],
"message_id": msg_id,
"contact_key": contact.public_key,
}
# Couldn't decrypt with any known contact

View File

@@ -26,7 +26,10 @@ class AppSettingsRepository:
"""
SELECT max_radio_contacts, favorites, auto_decrypt_dm_on_advert,
sidebar_sort_order, last_message_times, preferences_migrated,
advert_interval, last_advert_time, bots
advert_interval, last_advert_time, bots,
mqtt_broker_host, mqtt_broker_port, mqtt_username, mqtt_password,
mqtt_use_tls, mqtt_tls_insecure, mqtt_topic_prefix,
mqtt_publish_messages, mqtt_publish_raw_packets
FROM app_settings WHERE id = 1
"""
)
@@ -91,6 +94,15 @@ class AppSettingsRepository:
advert_interval=row["advert_interval"] or 0,
last_advert_time=row["last_advert_time"] or 0,
bots=bots,
mqtt_broker_host=row["mqtt_broker_host"] or "",
mqtt_broker_port=row["mqtt_broker_port"] or 1883,
mqtt_username=row["mqtt_username"] or "",
mqtt_password=row["mqtt_password"] or "",
mqtt_use_tls=bool(row["mqtt_use_tls"]),
mqtt_tls_insecure=bool(row["mqtt_tls_insecure"]),
mqtt_topic_prefix=row["mqtt_topic_prefix"] or "meshcore",
mqtt_publish_messages=bool(row["mqtt_publish_messages"]),
mqtt_publish_raw_packets=bool(row["mqtt_publish_raw_packets"]),
)
@staticmethod
@@ -104,6 +116,15 @@ class AppSettingsRepository:
advert_interval: int | None = None,
last_advert_time: int | None = None,
bots: list[BotConfig] | None = None,
mqtt_broker_host: str | None = None,
mqtt_broker_port: int | None = None,
mqtt_username: str | None = None,
mqtt_password: str | None = None,
mqtt_use_tls: bool | None = None,
mqtt_tls_insecure: bool | None = None,
mqtt_topic_prefix: str | None = None,
mqtt_publish_messages: bool | None = None,
mqtt_publish_raw_packets: bool | None = None,
) -> AppSettings:
"""Update app settings. Only provided fields are updated."""
updates = []
@@ -147,6 +168,42 @@ class AppSettingsRepository:
bots_json = json.dumps([b.model_dump() for b in bots])
params.append(bots_json)
if mqtt_broker_host is not None:
updates.append("mqtt_broker_host = ?")
params.append(mqtt_broker_host)
if mqtt_broker_port is not None:
updates.append("mqtt_broker_port = ?")
params.append(mqtt_broker_port)
if mqtt_username is not None:
updates.append("mqtt_username = ?")
params.append(mqtt_username)
if mqtt_password is not None:
updates.append("mqtt_password = ?")
params.append(mqtt_password)
if mqtt_use_tls is not None:
updates.append("mqtt_use_tls = ?")
params.append(1 if mqtt_use_tls else 0)
if mqtt_tls_insecure is not None:
updates.append("mqtt_tls_insecure = ?")
params.append(1 if mqtt_tls_insecure else 0)
if mqtt_topic_prefix is not None:
updates.append("mqtt_topic_prefix = ?")
params.append(mqtt_topic_prefix)
if mqtt_publish_messages is not None:
updates.append("mqtt_publish_messages = ?")
params.append(1 if mqtt_publish_messages else 0)
if mqtt_publish_raw_packets is not None:
updates.append("mqtt_publish_raw_packets = ?")
params.append(1 if mqtt_publish_raw_packets else 0)
if updates:
query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1"
await db.conn.execute(query, params)

View File

@@ -16,6 +16,7 @@ class HealthResponse(BaseModel):
connection_info: str | None
database_size_mb: float
oldest_undecrypted_timestamp: int | None
mqtt_status: str | None = None
async def build_health_data(radio_connected: bool, connection_info: str | None) -> dict:
@@ -33,12 +34,25 @@ async def build_health_data(radio_connected: bool, connection_info: str | None)
except RuntimeError:
pass # Database not connected
# MQTT status
mqtt_status: str | None = None
try:
from app.mqtt import mqtt_publisher
if mqtt_publisher._mqtt_configured():
mqtt_status = "connected" if mqtt_publisher.connected else "disconnected"
else:
mqtt_status = "disabled"
except Exception:
pass
return {
"status": "ok" if radio_connected else "degraded",
"radio_connected": radio_connected,
"connection_info": connection_info,
"database_size_mb": db_size_mb,
"oldest_undecrypted_timestamp": oldest_ts,
"mqtt_status": mqtt_status,
}

View File

@@ -59,6 +59,44 @@ class AppSettingsUpdate(BaseModel):
default=None,
description="List of bot configurations",
)
mqtt_broker_host: str | None = Field(
default=None,
description="MQTT broker hostname (empty = disabled)",
)
mqtt_broker_port: int | None = Field(
default=None,
ge=1,
le=65535,
description="MQTT broker port",
)
mqtt_username: str | None = Field(
default=None,
description="MQTT username (optional)",
)
mqtt_password: str | None = Field(
default=None,
description="MQTT password (optional)",
)
mqtt_use_tls: bool | None = Field(
default=None,
description="Whether to use TLS for MQTT connection",
)
mqtt_tls_insecure: bool | None = Field(
default=None,
description="Skip TLS certificate verification (for self-signed certs)",
)
mqtt_topic_prefix: str | None = Field(
default=None,
description="MQTT topic prefix",
)
mqtt_publish_messages: bool | None = Field(
default=None,
description="Whether to publish decrypted messages to MQTT",
)
mqtt_publish_raw_packets: bool | None = Field(
default=None,
description="Whether to publish raw packets to MQTT",
)
class FavoriteRequest(BaseModel):
@@ -124,8 +162,35 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings:
logger.info("Updating bots (count=%d)", len(update.bots))
kwargs["bots"] = update.bots
# MQTT fields
mqtt_fields = [
"mqtt_broker_host",
"mqtt_broker_port",
"mqtt_username",
"mqtt_password",
"mqtt_use_tls",
"mqtt_tls_insecure",
"mqtt_topic_prefix",
"mqtt_publish_messages",
"mqtt_publish_raw_packets",
]
mqtt_changed = False
for field in mqtt_fields:
value = getattr(update, field)
if value is not None:
kwargs[field] = value
mqtt_changed = True
if kwargs:
return await AppSettingsRepository.update(**kwargs)
result = await AppSettingsRepository.update(**kwargs)
# Restart MQTT publisher if any MQTT settings changed
if mqtt_changed:
from app.mqtt import mqtt_publisher
await mqtt_publisher.restart(result)
return result
return await AppSettingsRepository.get()

View File

@@ -96,10 +96,14 @@ def broadcast_event(event_type: str, data: dict) -> None:
"""Schedule a broadcast without blocking.
Convenience function that creates an asyncio task to broadcast
an event to all connected WebSocket clients.
an event to all connected WebSocket clients and forward to MQTT.
"""
asyncio.create_task(ws_manager.broadcast(event_type, data))
from app.mqtt import mqtt_broadcast
mqtt_broadcast(event_type, data)
def broadcast_error(message: str, details: str | None = None) -> None:
"""Broadcast an error notification to all connected clients.

View File

@@ -206,6 +206,12 @@ LocalStorage migration helpers for favorites; canonical favorites are server-sid
- `advert_interval`
- `last_advert_time`
- `bots`
- `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`
- `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`
`HealthStatus` includes `mqtt_status` (`"connected"`, `"disconnected"`, `"disabled"`, or `null`).
`RawPacket.decrypted_info` includes `channel_key` and `contact_key` for MQTT topic routing.
## Contact Info Pane

View File

@@ -85,6 +85,7 @@ export function SettingsModal(props: SettingsModalProps) {
radio: !isMobile,
identity: false,
connectivity: false,
mqtt: false,
database: false,
bot: false,
statistics: false,
@@ -127,6 +128,17 @@ export function SettingsModal(props: SettingsModalProps) {
// Advertisement interval state (displayed in hours, stored as seconds in DB)
const [advertIntervalHours, setAdvertIntervalHours] = useState('0');
// MQTT state
const [mqttBrokerHost, setMqttBrokerHost] = useState('');
const [mqttBrokerPort, setMqttBrokerPort] = useState('1883');
const [mqttUsername, setMqttUsername] = useState('');
const [mqttPassword, setMqttPassword] = useState('');
const [mqttUseTls, setMqttUseTls] = useState(false);
const [mqttTlsInsecure, setMqttTlsInsecure] = useState(false);
const [mqttTopicPrefix, setMqttTopicPrefix] = useState('meshcore');
const [mqttPublishMessages, setMqttPublishMessages] = useState(false);
const [mqttPublishRawPackets, setMqttPublishRawPackets] = useState(false);
// Bot state
const DEFAULT_BOT_CODE = `def bot(
sender_name: str | None,
@@ -193,6 +205,15 @@ export function SettingsModal(props: SettingsModalProps) {
setAutoDecryptOnAdvert(appSettings.auto_decrypt_dm_on_advert);
setAdvertIntervalHours(String(Math.round(appSettings.advert_interval / 3600)));
setBots(appSettings.bots || []);
setMqttBrokerHost(appSettings.mqtt_broker_host ?? '');
setMqttBrokerPort(String(appSettings.mqtt_broker_port ?? 1883));
setMqttUsername(appSettings.mqtt_username ?? '');
setMqttPassword(appSettings.mqtt_password ?? '');
setMqttUseTls(appSettings.mqtt_use_tls ?? false);
setMqttTlsInsecure(appSettings.mqtt_tls_insecure ?? false);
setMqttTopicPrefix(appSettings.mqtt_topic_prefix ?? 'meshcore');
setMqttPublishMessages(appSettings.mqtt_publish_messages ?? false);
setMqttPublishRawPackets(appSettings.mqtt_publish_raw_packets ?? false);
}
}, [appSettings]);
@@ -409,6 +430,34 @@ export function SettingsModal(props: SettingsModalProps) {
}
};
const handleSaveMqtt = async () => {
setSectionError(null);
setBusySection('mqtt');
try {
const update: AppSettingsUpdate = {
mqtt_broker_host: mqttBrokerHost,
mqtt_broker_port: parseInt(mqttBrokerPort, 10) || 1883,
mqtt_username: mqttUsername,
mqtt_password: mqttPassword,
mqtt_use_tls: mqttUseTls,
mqtt_tls_insecure: mqttTlsInsecure,
mqtt_topic_prefix: mqttTopicPrefix || 'meshcore',
mqtt_publish_messages: mqttPublishMessages,
mqtt_publish_raw_packets: mqttPublishRawPackets,
};
await onSaveAppSettings(update);
toast.success('MQTT settings saved');
} catch (err) {
setSectionError({
section: 'mqtt',
message: err instanceof Error ? err.message : 'Failed to save',
});
} finally {
setBusySection(null);
}
};
const handleSetPrivateKey = async () => {
if (!privateKey.trim()) {
setSectionError({ section: 'identity', message: 'Private key is required' });
@@ -976,6 +1025,160 @@ export function SettingsModal(props: SettingsModalProps) {
</div>
)}
{shouldRenderSection('mqtt') && (
<div className={sectionWrapperClass}>
{renderSectionHeader('mqtt')}
{isSectionVisible('mqtt') && (
<div className={sectionContentClass}>
<div className="space-y-2">
<Label>Status</Label>
{health?.mqtt_status === 'connected' ? (
<div className="flex items-center gap-2">
<div className="w-2 h-2 rounded-full bg-green-500" />
<span className="text-sm text-green-400">Connected</span>
</div>
) : health?.mqtt_status === 'disconnected' ? (
<div className="flex items-center gap-2">
<div className="w-2 h-2 rounded-full bg-red-500" />
<span className="text-sm text-red-400">Disconnected</span>
</div>
) : (
<div className="flex items-center gap-2">
<div className="w-2 h-2 rounded-full bg-gray-500" />
<span className="text-sm text-muted-foreground">Disabled</span>
</div>
)}
</div>
<Separator />
<div className="space-y-2">
<Label htmlFor="mqtt-host">Broker Host</Label>
<Input
id="mqtt-host"
type="text"
placeholder="e.g. 192.168.1.100"
value={mqttBrokerHost}
onChange={(e) => setMqttBrokerHost(e.target.value)}
/>
</div>
<div className="space-y-2">
<Label htmlFor="mqtt-port">Broker Port</Label>
<Input
id="mqtt-port"
type="number"
min="1"
max="65535"
value={mqttBrokerPort}
onChange={(e) => setMqttBrokerPort(e.target.value)}
/>
</div>
<div className="space-y-2">
<Label htmlFor="mqtt-username">Username</Label>
<Input
id="mqtt-username"
type="text"
placeholder="Optional"
value={mqttUsername}
onChange={(e) => setMqttUsername(e.target.value)}
/>
</div>
<div className="space-y-2">
<Label htmlFor="mqtt-password">Password</Label>
<Input
id="mqtt-password"
type="password"
placeholder="Optional"
value={mqttPassword}
onChange={(e) => setMqttPassword(e.target.value)}
/>
</div>
<label className="flex items-center gap-3 cursor-pointer">
<input
type="checkbox"
checked={mqttUseTls}
onChange={(e) => setMqttUseTls(e.target.checked)}
className="h-4 w-4 rounded border-border"
/>
<span className="text-sm">Use TLS</span>
</label>
{mqttUseTls && (
<>
<label className="flex items-center gap-3 cursor-pointer ml-7">
<input
type="checkbox"
checked={mqttTlsInsecure}
onChange={(e) => setMqttTlsInsecure(e.target.checked)}
className="h-4 w-4 rounded border-border"
/>
<span className="text-sm">Skip certificate verification</span>
</label>
<p className="text-xs text-muted-foreground ml-7">
Allow self-signed or untrusted broker certificates
</p>
</>
)}
<Separator />
<div className="space-y-2">
<Label htmlFor="mqtt-prefix">Topic Prefix</Label>
<Input
id="mqtt-prefix"
type="text"
value={mqttTopicPrefix}
onChange={(e) => setMqttTopicPrefix(e.target.value)}
/>
<p className="text-xs text-muted-foreground">
Topics: {mqttTopicPrefix || 'meshcore'}/dm:&lt;key&gt;,{' '}
{mqttTopicPrefix || 'meshcore'}/gm:&lt;key&gt;, {mqttTopicPrefix || 'meshcore'}
/raw/...
</p>
</div>
<Separator />
<label className="flex items-center gap-3 cursor-pointer">
<input
type="checkbox"
checked={mqttPublishMessages}
onChange={(e) => setMqttPublishMessages(e.target.checked)}
className="h-4 w-4 rounded border-border"
/>
<span className="text-sm">Publish Messages</span>
</label>
<p className="text-xs text-muted-foreground ml-7">
Forward decrypted DM and channel messages
</p>
<label className="flex items-center gap-3 cursor-pointer">
<input
type="checkbox"
checked={mqttPublishRawPackets}
onChange={(e) => setMqttPublishRawPackets(e.target.checked)}
className="h-4 w-4 rounded border-border"
/>
<span className="text-sm">Publish Raw Packets</span>
</label>
<p className="text-xs text-muted-foreground ml-7">Forward all RF packets</p>
<Button onClick={handleSaveMqtt} disabled={isSectionBusy('mqtt')} className="w-full">
{isSectionBusy('mqtt') ? 'Saving...' : 'Save MQTT Settings'}
</Button>
{getSectionError('mqtt') && (
<div className="text-sm text-destructive">{getSectionError('mqtt')}</div>
)}
</div>
)}
</div>
)}
{shouldRenderSection('database') && (
<div className={sectionWrapperClass}>
{renderSectionHeader('database')}

View File

@@ -2,6 +2,7 @@ export type SettingsSection =
| 'radio'
| 'identity'
| 'connectivity'
| 'mqtt'
| 'database'
| 'bot'
| 'statistics';
@@ -10,6 +11,7 @@ export const SETTINGS_SECTION_ORDER: SettingsSection[] = [
'radio',
'identity',
'connectivity',
'mqtt',
'database',
'bot',
'statistics',
@@ -19,6 +21,7 @@ export const SETTINGS_SECTION_LABELS: Record<SettingsSection, string> = {
radio: '📻 Radio',
identity: '🪪 Identity',
connectivity: '📡 Connectivity',
mqtt: '📤 MQTT',
database: '🗄️ Database & Interface',
bot: '🤖 Bot',
statistics: '📊 Statistics',

View File

@@ -38,6 +38,7 @@ const baseHealth: HealthStatus = {
connection_info: 'Serial: /dev/ttyUSB0',
database_size_mb: 1.2,
oldest_undecrypted_timestamp: null,
mqtt_status: null,
};
const baseSettings: AppSettings = {
@@ -50,10 +51,20 @@ const baseSettings: AppSettings = {
advert_interval: 0,
last_advert_time: 0,
bots: [],
mqtt_broker_host: '',
mqtt_broker_port: 1883,
mqtt_username: '',
mqtt_password: '',
mqtt_use_tls: false,
mqtt_tls_insecure: false,
mqtt_topic_prefix: 'meshcore',
mqtt_publish_messages: false,
mqtt_publish_raw_packets: false,
};
function renderModal(overrides?: {
appSettings?: AppSettings;
health?: HealthStatus;
onSaveAppSettings?: (update: AppSettingsUpdate) => Promise<void>;
onRefreshAppSettings?: () => Promise<void>;
onSave?: (update: RadioConfigUpdate) => Promise<void>;
@@ -79,7 +90,7 @@ function renderModal(overrides?: {
open: overrides?.open ?? true,
pageMode: overrides?.pageMode,
config: baseConfig,
health: baseHealth,
health: overrides?.health ?? baseHealth,
appSettings: overrides?.appSettings ?? baseSettings,
onClose,
onSave,
@@ -133,6 +144,11 @@ function openConnectivitySection() {
fireEvent.click(connectivityToggle);
}
function openMqttSection() {
const mqttToggle = screen.getByRole('button', { name: /MQTT/i });
fireEvent.click(mqttToggle);
}
function openDatabaseSection() {
const databaseToggle = screen.getByRole('button', { name: /Database/i });
fireEvent.click(databaseToggle);
@@ -389,6 +405,66 @@ describe('SettingsModal', () => {
expect(screen.getByText('42 msgs')).toBeInTheDocument();
});
it('renders MQTT section with form inputs', () => {
renderModal();
openMqttSection();
expect(screen.getByLabelText('Broker Host')).toBeInTheDocument();
expect(screen.getByLabelText('Broker Port')).toBeInTheDocument();
expect(screen.getByLabelText('Username')).toBeInTheDocument();
expect(screen.getByLabelText('Password')).toBeInTheDocument();
expect(screen.getByLabelText('Topic Prefix')).toBeInTheDocument();
expect(screen.getByText('Publish Messages')).toBeInTheDocument();
expect(screen.getByText('Publish Raw Packets')).toBeInTheDocument();
});
it('saves MQTT settings through onSaveAppSettings', async () => {
const { onSaveAppSettings } = renderModal();
openMqttSection();
const hostInput = screen.getByLabelText('Broker Host');
fireEvent.change(hostInput, { target: { value: 'mqtt.example.com' } });
fireEvent.click(screen.getByRole('button', { name: 'Save MQTT Settings' }));
await waitFor(() => {
expect(onSaveAppSettings).toHaveBeenCalledWith(
expect.objectContaining({
mqtt_broker_host: 'mqtt.example.com',
mqtt_broker_port: 1883,
})
);
});
});
it('shows MQTT disabled status when mqtt_status is null', () => {
renderModal({
appSettings: {
...baseSettings,
mqtt_broker_host: 'broker.local',
},
});
openMqttSection();
expect(screen.getByText('Disabled')).toBeInTheDocument();
});
it('shows MQTT connected status badge', () => {
renderModal({
appSettings: {
...baseSettings,
mqtt_broker_host: 'broker.local',
},
health: {
...baseHealth,
mqtt_status: 'connected',
},
});
openMqttSection();
expect(screen.getByText('Connected')).toBeInTheDocument();
});
it('fetches statistics when expanded in mobile external-nav mode', async () => {
const mockStats: StatisticsResponse = {
busiest_channels_24h: [],

View File

@@ -29,6 +29,7 @@ export interface HealthStatus {
connection_info: string | null;
database_size_mb: number;
oldest_undecrypted_timestamp: number | null;
mqtt_status: string | null;
}
export interface MaintenanceResult {
@@ -155,6 +156,8 @@ export interface RawPacket {
decrypted_info: {
channel_name: string | null;
sender: string | null;
channel_key: string | null;
contact_key: string | null;
} | null;
}
@@ -180,6 +183,15 @@ export interface AppSettings {
advert_interval: number;
last_advert_time: number;
bots: BotConfig[];
mqtt_broker_host: string;
mqtt_broker_port: number;
mqtt_username: string;
mqtt_password: string;
mqtt_use_tls: boolean;
mqtt_tls_insecure: boolean;
mqtt_topic_prefix: string;
mqtt_publish_messages: boolean;
mqtt_publish_raw_packets: boolean;
}
export interface AppSettingsUpdate {
@@ -188,6 +200,15 @@ export interface AppSettingsUpdate {
sidebar_sort_order?: 'recent' | 'alpha';
advert_interval?: number;
bots?: BotConfig[];
mqtt_broker_host?: string;
mqtt_broker_port?: number;
mqtt_username?: string;
mqtt_password?: string;
mqtt_use_tls?: boolean;
mqtt_tls_insecure?: boolean;
mqtt_topic_prefix?: string;
mqtt_publish_messages?: boolean;
mqtt_publish_raw_packets?: boolean;
}
export interface MigratePreferencesRequest {

View File

@@ -12,6 +12,7 @@ dependencies = [
"pycryptodome>=3.20.0",
"pynacl>=1.5.0",
"meshcore",
"aiomqtt>=2.0",
]
[project.optional-dependencies]

View File

@@ -100,8 +100,8 @@ class TestMigration001:
# Run migrations
applied = await run_migrations(conn)
assert applied == 30 # All migrations run
assert await get_version(conn) == 30
assert applied == 31 # All migrations run
assert await get_version(conn) == 31
# Verify columns exist by inserting and selecting
await conn.execute(
@@ -183,9 +183,9 @@ class TestMigration001:
applied1 = await run_migrations(conn)
applied2 = await run_migrations(conn)
assert applied1 == 30 # All migrations run
assert applied1 == 31 # All migrations run
assert applied2 == 0 # No migrations on second run
assert await get_version(conn) == 30
assert await get_version(conn) == 31
finally:
await conn.close()
@@ -246,8 +246,8 @@ class TestMigration001:
applied = await run_migrations(conn)
# All migrations applied (version incremented) but no error
assert applied == 30
assert await get_version(conn) == 30
assert applied == 31
assert await get_version(conn) == 31
finally:
await conn.close()
@@ -376,8 +376,8 @@ class TestMigration013:
# Run migration 13 (plus 14-27 which also run)
applied = await run_migrations(conn)
assert applied == 18
assert await get_version(conn) == 30
assert applied == 19
assert await get_version(conn) == 31
# Verify bots array was created with migrated data
cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1")
@@ -497,7 +497,7 @@ class TestMigration018:
assert await cursor.fetchone() is not None
await run_migrations(conn)
assert await get_version(conn) == 30
assert await get_version(conn) == 31
# Verify autoindex is gone
cursor = await conn.execute(
@@ -575,8 +575,8 @@ class TestMigration018:
await conn.commit()
applied = await run_migrations(conn)
assert applied == 13 # Migrations 18-30 run (18+19 skip internally)
assert await get_version(conn) == 30
assert applied == 14 # Migrations 18-31 run (18+19 skip internally)
assert await get_version(conn) == 31
finally:
await conn.close()
@@ -648,7 +648,7 @@ class TestMigration019:
assert await cursor.fetchone() is not None
await run_migrations(conn)
assert await get_version(conn) == 30
assert await get_version(conn) == 31
# Verify autoindex is gone
cursor = await conn.execute(
@@ -714,8 +714,8 @@ class TestMigration020:
assert (await cursor.fetchone())[0] == "delete"
applied = await run_migrations(conn)
assert applied == 11 # Migrations 20-30
assert await get_version(conn) == 30
assert applied == 12 # Migrations 20-31
assert await get_version(conn) == 31
# Verify WAL mode
cursor = await conn.execute("PRAGMA journal_mode")
@@ -745,7 +745,7 @@ class TestMigration020:
await set_version(conn, 20)
applied = await run_migrations(conn)
assert applied == 10 # Migrations 21-30 still run
assert applied == 11 # Migrations 21-31 still run
# Still WAL + INCREMENTAL
cursor = await conn.execute("PRAGMA journal_mode")
@@ -803,8 +803,8 @@ class TestMigration028:
await conn.commit()
applied = await run_migrations(conn)
assert applied == 3
assert await get_version(conn) == 30
assert applied == 4
assert await get_version(conn) == 31
# Verify payload_hash column is now BLOB
cursor = await conn.execute("PRAGMA table_info(raw_packets)")
@@ -873,8 +873,8 @@ class TestMigration028:
await conn.commit()
applied = await run_migrations(conn)
assert applied == 3 # Version still bumped
assert await get_version(conn) == 30
assert applied == 4 # Version still bumped
assert await get_version(conn) == 31
# Verify data unchanged
cursor = await conn.execute("SELECT payload_hash FROM raw_packets")

550
tests/test_mqtt.py Normal file
View File

@@ -0,0 +1,550 @@
"""Tests for MQTT publisher module."""
import ssl
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.models import AppSettings
from app.mqtt import (
MqttPublisher,
_build_message_topic,
_build_raw_packet_topic,
)
def _make_settings(**overrides) -> AppSettings:
"""Create an AppSettings with MQTT fields."""
defaults = {
"mqtt_broker_host": "broker.local",
"mqtt_broker_port": 1883,
"mqtt_username": "",
"mqtt_password": "",
"mqtt_use_tls": False,
"mqtt_topic_prefix": "meshcore",
"mqtt_publish_messages": True,
"mqtt_publish_raw_packets": True,
}
defaults.update(overrides)
return AppSettings(**defaults)
class TestTopicBuilders:
def test_dm_message_topic(self):
topic = _build_message_topic("meshcore", {"type": "PRIV", "conversation_key": "abc123"})
assert topic == "meshcore/dm:abc123"
def test_channel_message_topic(self):
topic = _build_message_topic("meshcore", {"type": "CHAN", "conversation_key": "def456"})
assert topic == "meshcore/gm:def456"
def test_unknown_message_type_fallback(self):
topic = _build_message_topic("meshcore", {"type": "OTHER", "conversation_key": "xyz"})
assert topic == "meshcore/message:xyz"
def test_custom_prefix(self):
topic = _build_message_topic("myprefix", {"type": "PRIV", "conversation_key": "abc"})
assert topic == "myprefix/dm:abc"
def test_raw_packet_dm_topic(self):
data = {"decrypted_info": {"contact_key": "contact123", "channel_key": None}}
topic = _build_raw_packet_topic("meshcore", data)
assert topic == "meshcore/raw/dm:contact123"
def test_raw_packet_gm_topic(self):
data = {"decrypted_info": {"contact_key": None, "channel_key": "chan456"}}
topic = _build_raw_packet_topic("meshcore", data)
assert topic == "meshcore/raw/gm:chan456"
def test_raw_packet_unrouted_no_info(self):
data = {"decrypted_info": None}
topic = _build_raw_packet_topic("meshcore", data)
assert topic == "meshcore/raw/unrouted"
def test_raw_packet_unrouted_empty_keys(self):
data = {"decrypted_info": {"contact_key": None, "channel_key": None}}
topic = _build_raw_packet_topic("meshcore", data)
assert topic == "meshcore/raw/unrouted"
def test_raw_packet_contact_takes_precedence_over_channel(self):
data = {"decrypted_info": {"contact_key": "c1", "channel_key": "ch1"}}
topic = _build_raw_packet_topic("meshcore", data)
assert topic == "meshcore/raw/dm:c1"
class TestMqttPublisher:
def test_initial_state(self):
pub = MqttPublisher()
assert pub.connected is False
assert pub._client is None
def test_not_configured_when_host_empty(self):
pub = MqttPublisher()
pub._settings = _make_settings(mqtt_broker_host="")
assert pub._mqtt_configured() is False
def test_configured_when_host_set(self):
pub = MqttPublisher()
pub._settings = _make_settings(mqtt_broker_host="broker.local")
assert pub._mqtt_configured() is True
@pytest.mark.asyncio
async def test_publish_drops_silently_when_disconnected(self):
pub = MqttPublisher()
pub.connected = False
# Should not raise
await pub.publish("topic", {"key": "value"})
@pytest.mark.asyncio
async def test_publish_calls_client_when_connected(self):
pub = MqttPublisher()
pub.connected = True
mock_client = AsyncMock()
pub._client = mock_client
await pub.publish("test/topic", {"msg": "hello"})
mock_client.publish.assert_called_once()
call_args = mock_client.publish.call_args
assert call_args[0][0] == "test/topic"
@pytest.mark.asyncio
async def test_publish_handles_exception_gracefully(self):
pub = MqttPublisher()
pub.connected = True
mock_client = AsyncMock()
mock_client.publish.side_effect = Exception("Network error")
pub._client = mock_client
# Should not raise
await pub.publish("test/topic", {"msg": "hello"})
# After a publish failure, connected should be cleared to stop
# further attempts and reflect accurate status
assert pub.connected is False
@pytest.mark.asyncio
async def test_stop_resets_state(self):
pub = MqttPublisher()
pub.connected = True
pub._client = MagicMock()
pub._task = None # No task to cancel
await pub.stop()
assert pub.connected is False
assert pub._client is None
class TestMqttBroadcast:
@pytest.mark.asyncio
async def test_mqtt_broadcast_skips_when_disconnected(self):
"""mqtt_broadcast should return immediately if publisher is disconnected."""
from app.mqtt import mqtt_publisher
original_settings = mqtt_publisher._settings
original_connected = mqtt_publisher.connected
try:
mqtt_publisher.connected = False
mqtt_publisher._settings = _make_settings()
# This should not create any tasks or fail
from app.mqtt import mqtt_broadcast
mqtt_broadcast("message", {"type": "PRIV", "conversation_key": "abc"})
finally:
mqtt_publisher._settings = original_settings
mqtt_publisher.connected = original_connected
@pytest.mark.asyncio
async def test_mqtt_maybe_publish_message(self):
"""_mqtt_maybe_publish should call publish for message events."""
from app.mqtt import _mqtt_maybe_publish, mqtt_publisher
original_settings = mqtt_publisher._settings
original_connected = mqtt_publisher.connected
try:
mqtt_publisher._settings = _make_settings(mqtt_publish_messages=True)
mqtt_publisher.connected = True
with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub:
await _mqtt_maybe_publish("message", {"type": "PRIV", "conversation_key": "abc123"})
mock_pub.assert_called_once()
topic = mock_pub.call_args[0][0]
assert topic == "meshcore/dm:abc123"
finally:
mqtt_publisher._settings = original_settings
mqtt_publisher.connected = original_connected
@pytest.mark.asyncio
async def test_mqtt_maybe_publish_raw_packet(self):
"""_mqtt_maybe_publish should call publish for raw_packet events."""
from app.mqtt import _mqtt_maybe_publish, mqtt_publisher
original_settings = mqtt_publisher._settings
original_connected = mqtt_publisher.connected
try:
mqtt_publisher._settings = _make_settings(mqtt_publish_raw_packets=True)
mqtt_publisher.connected = True
with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub:
await _mqtt_maybe_publish(
"raw_packet",
{"decrypted_info": {"channel_key": "ch1", "contact_key": None}},
)
mock_pub.assert_called_once()
topic = mock_pub.call_args[0][0]
assert topic == "meshcore/raw/gm:ch1"
finally:
mqtt_publisher._settings = original_settings
mqtt_publisher.connected = original_connected
@pytest.mark.asyncio
async def test_mqtt_maybe_publish_skips_disabled_messages(self):
"""_mqtt_maybe_publish should skip messages when publish_messages is False."""
from app.mqtt import _mqtt_maybe_publish, mqtt_publisher
original_settings = mqtt_publisher._settings
original_connected = mqtt_publisher.connected
try:
mqtt_publisher._settings = _make_settings(mqtt_publish_messages=False)
mqtt_publisher.connected = True
with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub:
await _mqtt_maybe_publish("message", {"type": "PRIV", "conversation_key": "abc"})
mock_pub.assert_not_called()
finally:
mqtt_publisher._settings = original_settings
mqtt_publisher.connected = original_connected
@pytest.mark.asyncio
async def test_mqtt_maybe_publish_skips_disabled_raw_packets(self):
"""_mqtt_maybe_publish should skip raw_packets when publish_raw_packets is False."""
from app.mqtt import _mqtt_maybe_publish, mqtt_publisher
original_settings = mqtt_publisher._settings
original_connected = mqtt_publisher.connected
try:
mqtt_publisher._settings = _make_settings(mqtt_publish_raw_packets=False)
mqtt_publisher.connected = True
with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub:
await _mqtt_maybe_publish(
"raw_packet",
{"decrypted_info": None},
)
mock_pub.assert_not_called()
finally:
mqtt_publisher._settings = original_settings
mqtt_publisher.connected = original_connected
class TestBuildTlsContext:
def test_returns_none_when_tls_disabled(self):
settings = _make_settings(mqtt_use_tls=False)
assert MqttPublisher._build_tls_context(settings) is None
def test_returns_context_when_tls_enabled(self):
settings = _make_settings(mqtt_use_tls=True)
ctx = MqttPublisher._build_tls_context(settings)
assert isinstance(ctx, ssl.SSLContext)
assert ctx.check_hostname is True
assert ctx.verify_mode == ssl.CERT_REQUIRED
def test_insecure_skips_verification(self):
settings = _make_settings(mqtt_use_tls=True, mqtt_tls_insecure=True)
ctx = MqttPublisher._build_tls_context(settings)
assert isinstance(ctx, ssl.SSLContext)
assert ctx.check_hostname is False
assert ctx.verify_mode == ssl.CERT_NONE
def _mock_aiomqtt_client():
"""Create a mock aiomqtt.Client that works as an async context manager."""
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
return mock_client
class TestConnectionLoop:
"""Integration tests for MqttPublisher._connection_loop."""
@pytest.mark.asyncio
async def test_connects_and_sets_state(self):
"""Connection loop should connect and set connected=True."""
import asyncio
pub = MqttPublisher()
settings = _make_settings()
mock_client = _mock_aiomqtt_client()
# The connection loop will block forever in the inner wait loop.
# We let it connect, verify state, then cancel.
connected_event = asyncio.Event()
original_aenter = mock_client.__aenter__
async def side_effect_aenter(*a, **kw):
result = await original_aenter(*a, **kw)
# Signal that connection happened
connected_event.set()
return result
mock_client.__aenter__ = AsyncMock(side_effect=side_effect_aenter)
with (
patch("app.mqtt.aiomqtt.Client", return_value=mock_client),
patch("app.mqtt._broadcast_mqtt_health"),
patch("app.websocket.broadcast_success"),
patch("app.websocket.broadcast_health"),
):
await pub.start(settings)
# Wait for connection to be established
await asyncio.wait_for(connected_event.wait(), timeout=2)
assert pub.connected is True
assert pub._client is mock_client
await pub.stop()
assert pub.connected is False
@pytest.mark.asyncio
async def test_reconnects_after_connection_failure(self):
"""Connection loop should retry after a connection error with backoff."""
import asyncio
from app.mqtt import _BACKOFF_MIN
pub = MqttPublisher()
settings = _make_settings()
attempt_count = 0
connected_event = asyncio.Event()
def make_client_factory():
"""Factory that fails first, succeeds second."""
def factory(**kwargs):
nonlocal attempt_count
attempt_count += 1
mock = _mock_aiomqtt_client()
if attempt_count == 1:
# First attempt: fail on __aenter__
mock.__aenter__ = AsyncMock(side_effect=ConnectionRefusedError("refused"))
else:
# Second attempt: succeed and signal
original_aenter = mock.__aenter__
async def signal_aenter(*a, **kw):
result = await original_aenter(*a, **kw)
connected_event.set()
return result
mock.__aenter__ = AsyncMock(side_effect=signal_aenter)
return mock
return factory
with (
patch("app.mqtt.aiomqtt.Client", side_effect=make_client_factory()),
patch("app.mqtt._broadcast_mqtt_health"),
patch("app.websocket.broadcast_success"),
patch("app.websocket.broadcast_error"),
patch("app.websocket.broadcast_health"),
patch("app.mqtt.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
):
await pub.start(settings)
# Wait for second (successful) connection
await asyncio.wait_for(connected_event.wait(), timeout=5)
assert pub.connected is True
assert attempt_count == 2
# Should have slept with initial backoff after first failure
mock_sleep.assert_called_once_with(_BACKOFF_MIN)
await pub.stop()
@pytest.mark.asyncio
async def test_backoff_increases_on_repeated_failures(self):
"""Backoff should double after each failure, capped at _BACKOFF_MAX."""
import asyncio
from app.mqtt import _BACKOFF_MAX, _BACKOFF_MIN
pub = MqttPublisher()
settings = _make_settings()
max_failures = 4 # enough to observe doubling and capping
def make_failing_factory():
call_count = 0
def factory(**kwargs):
nonlocal call_count
call_count += 1
mock = _mock_aiomqtt_client()
mock.__aenter__ = AsyncMock(side_effect=OSError("network down"))
return mock
return factory, lambda: call_count
factory, get_count = make_failing_factory()
sleep_args: list[int] = []
async def capture_sleep(duration):
sleep_args.append(duration)
if len(sleep_args) >= max_failures:
# Cancel the loop after enough failures
pub._task.cancel()
raise asyncio.CancelledError
with (
patch("app.mqtt.aiomqtt.Client", side_effect=factory),
patch("app.mqtt._broadcast_mqtt_health"),
patch("app.websocket.broadcast_error"),
patch("app.websocket.broadcast_health"),
patch("app.mqtt.asyncio.sleep", side_effect=capture_sleep),
):
await pub.start(settings)
try:
await pub._task
except asyncio.CancelledError:
pass
assert sleep_args[0] == _BACKOFF_MIN
assert sleep_args[1] == _BACKOFF_MIN * 2
assert sleep_args[2] == _BACKOFF_MIN * 4
# Fourth should be capped at _BACKOFF_MAX (5*8=40 > 30)
assert sleep_args[3] == _BACKOFF_MAX
@pytest.mark.asyncio
async def test_waits_for_settings_when_unconfigured(self):
"""When host is empty, loop should block until settings change."""
import asyncio
pub = MqttPublisher()
unconfigured = _make_settings(mqtt_broker_host="")
connected_event = asyncio.Event()
def make_success_client(**kwargs):
mock = _mock_aiomqtt_client()
original_aenter = mock.__aenter__
async def signal_aenter(*a, **kw):
result = await original_aenter(*a, **kw)
connected_event.set()
return result
mock.__aenter__ = AsyncMock(side_effect=signal_aenter)
return mock
with (
patch("app.mqtt.aiomqtt.Client", side_effect=make_success_client),
patch("app.mqtt._broadcast_mqtt_health"),
patch("app.websocket.broadcast_success"),
patch("app.websocket.broadcast_health"),
):
# Start with unconfigured settings — loop should wait
await pub.start(unconfigured)
await asyncio.sleep(0.05)
assert pub.connected is False
# Now provide configured settings — loop should connect
configured = _make_settings(mqtt_broker_host="broker.local")
pub._settings = configured
pub._settings_version += 1
pub._version_event.set()
await asyncio.wait_for(connected_event.wait(), timeout=2)
assert pub.connected is True
await pub.stop()
@pytest.mark.asyncio
async def test_health_broadcast_on_connect_and_failure(self):
"""_broadcast_mqtt_health should be called on connect and on failure."""
import asyncio
pub = MqttPublisher()
settings = _make_settings()
health_calls: list[str] = []
connect_event = asyncio.Event()
def track_health():
health_calls.append("health")
def make_client(**kwargs):
mock = _mock_aiomqtt_client()
original_aenter = mock.__aenter__
async def signal_aenter(*a, **kw):
result = await original_aenter(*a, **kw)
connect_event.set()
return result
mock.__aenter__ = AsyncMock(side_effect=signal_aenter)
return mock
with (
patch("app.mqtt.aiomqtt.Client", side_effect=make_client),
patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health),
patch("app.websocket.broadcast_success"),
patch("app.websocket.broadcast_health"),
):
await pub.start(settings)
await asyncio.wait_for(connect_event.wait(), timeout=2)
# Should have been called once on successful connect
assert len(health_calls) == 1
await pub.stop()
@pytest.mark.asyncio
async def test_health_broadcast_on_connection_error(self):
"""_broadcast_mqtt_health should be called when connection fails."""
import asyncio
pub = MqttPublisher()
settings = _make_settings()
health_calls: list[str] = []
def track_health():
health_calls.append("health")
async def cancel_on_sleep(duration):
# Cancel after the first backoff sleep to stop the loop
pub._task.cancel()
raise asyncio.CancelledError
def make_failing_client(**kwargs):
mock = _mock_aiomqtt_client()
mock.__aenter__ = AsyncMock(side_effect=OSError("refused"))
return mock
with (
patch("app.mqtt.aiomqtt.Client", side_effect=make_failing_client),
patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health),
patch("app.websocket.broadcast_error"),
patch("app.websocket.broadcast_health"),
patch("app.mqtt.asyncio.sleep", side_effect=cancel_on_sleep),
):
await pub.start(settings)
try:
await pub._task
except asyncio.CancelledError:
pass
# Should have been called once on connection failure
assert len(health_calls) == 1

View File

@@ -493,6 +493,15 @@ class TestAppSettingsRepository:
"advert_interval": None,
"last_advert_time": None,
"bots": "{bad-bots-json",
"mqtt_broker_host": "",
"mqtt_broker_port": 1883,
"mqtt_username": "",
"mqtt_password": "",
"mqtt_use_tls": 0,
"mqtt_tls_insecure": 0,
"mqtt_topic_prefix": "meshcore",
"mqtt_publish_messages": 0,
"mqtt_publish_raw_packets": 0,
}
)
mock_conn.execute = AsyncMock(return_value=mock_cursor)

View File

@@ -1,5 +1,7 @@
"""Tests for settings router endpoints and validation behavior."""
from unittest.mock import AsyncMock, patch
import pytest
from fastapi import HTTPException
@@ -66,6 +68,55 @@ class TestUpdateSettings:
assert exc.value.status_code == 400
assert "syntax error" in exc.value.detail.lower()
@pytest.mark.asyncio
async def test_mqtt_fields_round_trip(self, test_db):
"""MQTT settings should be saved and retrieved correctly."""
mock_publisher = type("MockPublisher", (), {"restart": AsyncMock()})()
with patch("app.mqtt.mqtt_publisher", mock_publisher):
result = await update_settings(
AppSettingsUpdate(
mqtt_broker_host="broker.test",
mqtt_broker_port=8883,
mqtt_username="user",
mqtt_password="pass",
mqtt_use_tls=True,
mqtt_tls_insecure=True,
mqtt_topic_prefix="custom",
mqtt_publish_messages=True,
mqtt_publish_raw_packets=True,
)
)
assert result.mqtt_broker_host == "broker.test"
assert result.mqtt_broker_port == 8883
assert result.mqtt_username == "user"
assert result.mqtt_password == "pass"
assert result.mqtt_use_tls is True
assert result.mqtt_tls_insecure is True
assert result.mqtt_topic_prefix == "custom"
assert result.mqtt_publish_messages is True
assert result.mqtt_publish_raw_packets is True
# Verify persistence
fresh = await AppSettingsRepository.get()
assert fresh.mqtt_broker_host == "broker.test"
assert fresh.mqtt_use_tls is True
@pytest.mark.asyncio
async def test_mqtt_defaults_on_fresh_db(self, test_db):
"""MQTT fields should have correct defaults on a fresh database."""
settings = await AppSettingsRepository.get()
assert settings.mqtt_broker_host == ""
assert settings.mqtt_broker_port == 1883
assert settings.mqtt_username == ""
assert settings.mqtt_password == ""
assert settings.mqtt_use_tls is False
assert settings.mqtt_tls_insecure is False
assert settings.mqtt_topic_prefix == "meshcore"
assert settings.mqtt_publish_messages is False
assert settings.mqtt_publish_raw_packets is False
class TestToggleFavorite:
@pytest.mark.asyncio

24
uv.lock generated
View File

@@ -2,6 +2,19 @@ version = 1
revision = 1
requires-python = ">=3.10"
[[package]]
name = "aiomqtt"
version = "2.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "paho-mqtt" },
{ name = "typing-extensions", marker = "python_full_version < '3.11'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/db/b5/798e4855d17f0f3a2e2ed21c07473fcb4bb45993116693d0f68553927e2c/aiomqtt-2.5.0.tar.gz", hash = "sha256:70e181c140a54ae736394efe2b9e865f665551a5417f6957456cc46010487b21", size = 86453 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/84/24/1d5d7d89906db1a94b5029942c2fd909cf8e8551b288f56c03053d5615f8/aiomqtt-2.5.0-py3-none-any.whl", hash = "sha256:65dabeafeeee7b88864361ae9a118d81bd27082093f32f670a5c5fab17de8cf2", size = 15983 },
]
[[package]]
name = "aiosqlite"
version = "0.22.1"
@@ -398,6 +411,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469 },
]
[[package]]
name = "paho-mqtt"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/39/15/0a6214e76d4d32e7f663b109cf71fb22561c2be0f701d67f93950cd40542/paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834", size = 148848 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219 },
]
[[package]]
name = "pluggy"
version = "1.6.0"
@@ -879,6 +901,7 @@ name = "remoteterm-meshcore"
version = "2.2.0"
source = { virtual = "." }
dependencies = [
{ name = "aiomqtt" },
{ name = "aiosqlite" },
{ name = "fastapi" },
{ name = "meshcore" },
@@ -908,6 +931,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiomqtt", specifier = ">=2.0" },
{ name = "aiosqlite", specifier = ">=0.19.0" },
{ name = "fastapi", specifier = ">=0.115.0" },
{ name = "httpx", marker = "extra == 'test'", specifier = ">=0.27.0" },