From f993110ec4cc57bc1678c2cfb4ddfd6352fa3414 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sat, 28 Feb 2026 21:24:46 -0800 Subject: [PATCH] Initial mqtt implementation --- AGENTS.md | 26 +- README.md | 1 + app/AGENTS.md | 17 + app/main.py | 11 + app/migrations.py | 39 ++ app/models.py | 38 ++ app/mqtt.py | 223 ++++++++ app/packet_processor.py | 4 + app/repository/settings.py | 59 +- app/routers/health.py | 14 + app/routers/settings.py | 67 ++- app/websocket.py | 6 +- frontend/AGENTS.md | 6 + frontend/src/components/SettingsModal.tsx | 203 +++++++ frontend/src/components/settingsConstants.ts | 3 + frontend/src/test/settingsModal.test.tsx | 78 ++- frontend/src/types.ts | 21 + pyproject.toml | 1 + tests/test_migrations.py | 38 +- tests/test_mqtt.py | 550 +++++++++++++++++++ tests/test_repository.py | 9 + tests/test_settings_router.py | 51 ++ uv.lock | 24 + 23 files changed, 1462 insertions(+), 27 deletions(-) create mode 100644 app/mqtt.py create mode 100644 tests/test_mqtt.py diff --git a/AGENTS.md b/AGENTS.md index a54ab15..ae25e9e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -91,7 +91,7 @@ A web interface for MeshCore mesh radio networks. The backend connects to a Mesh 1. **Store-and-serve**: Backend stores all packets even when no client is connected 2. **Parallel storage**: Messages stored both decrypted (when possible) and as raw packets 3. **Extended capacity**: Server stores contacts/channels beyond radio limits (~350 contacts, ~40 channels) -4. **Real-time updates**: WebSocket pushes events; REST for actions +4. **Real-time updates**: WebSocket pushes events; REST for actions; optional MQTT forwarding 5. **Offline-capable**: Radio operates independently; server syncs when connected 6. **Auto-reconnect**: Background monitor detects disconnection and attempts reconnection @@ -126,7 +126,7 @@ To improve repeater disambiguation in the network visualizer, the backend stores 1. Radio receives raw bytes → `packet_processor.py` parses, decrypts, deduplicates, and stores in database (primary path via `RX_LOG_DATA` event) 2. `event_handlers.py` handles higher-level events (`CONTACT_MSG_RECV`, `ACK`) as a fallback/supplement -3. `ws_manager` broadcasts to connected clients +3. `broadcast_event()` in `websocket.py` fans out to both WebSocket clients and MQTT 4. Frontend `useWebSocket` receives → updates React state ### Outgoing Messages @@ -158,7 +158,8 @@ This message-layer echo/path handling is independent of raw-packet storage dedup │ ├── repository.py # Database CRUD │ ├── event_handlers.py # Radio events │ ├── decoder.py # Packet decryption -│ └── websocket.py # Real-time broadcasts +│ ├── websocket.py # Real-time broadcasts +│ └── mqtt.py # Optional MQTT publisher ├── frontend/ # React frontend │ ├── AGENTS.md # Frontend documentation │ ├── src/ @@ -354,6 +355,23 @@ Read state (`last_read_at`) is tracked **server-side** for consistency across de **Note:** These are NOT the same as `Message.conversation_key` (the database field). +### MQTT Publishing + +Optional MQTT integration forwards mesh events to an external broker for home automation, logging, or alerting. All MQTT config is stored in the database (`app_settings`), not env vars — configured from the Settings pane, no server restart needed. + +**Two independent toggles**: publish decrypted messages, publish raw packets. + +**Topic structure** (default prefix `meshcore`): +- `meshcore/dm:` — decrypted DM +- `meshcore/gm:` — decrypted channel message +- `meshcore/raw/dm:` — raw packet attributed to a DM contact +- `meshcore/raw/gm:` — raw packet attributed to a channel +- `meshcore/raw/unrouted` — raw packets that couldn't be attributed + +**Architecture**: `broadcast_event()` in `websocket.py` calls `mqtt_broadcast()` — a single hook covering all message and raw_packet broadcasts. The `MqttPublisher` in `app/mqtt.py` manages a background connection loop with auto-reconnect and backoff. Publishes are fire-and-forget (silent drop if disconnected). Connection state changes trigger toasts via `broadcast_error`/`broadcast_success`. The health endpoint includes `mqtt_status`. + +**Security**: MQTT password stored in plaintext in SQLite, consistent with the project's trusted-network design. + ### Server-Side Decryption The server can decrypt packets using stored keys, both in real-time and for historical packets. @@ -395,7 +413,7 @@ mc.subscribe(EventType.ACK, handler) | `MESHCORE_LOG_LEVEL` | `INFO` | Logging level (`DEBUG`/`INFO`/`WARNING`/`ERROR`) | | `MESHCORE_DATABASE_PATH` | `data/meshcore.db` | SQLite database location | -**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, and `bots`. They are configured via `GET/PATCH /api/settings` (and related settings endpoints). +**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `bots`, and all MQTT configuration (`mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`, `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`). They are configured via `GET/PATCH /api/settings` (and related settings endpoints). Byte-perfect channel retries are user-triggered via `POST /api/messages/channel/{message_id}/resend` and are allowed for 30 seconds after the original send. diff --git a/README.md b/README.md index c4b2de7..4ddbead 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Backend server + browser interface for MeshCore mesh radio networks. Connect you * Monitor unlimited contacts and channels (radio limits don't apply -- packets are decrypted server-side) * Access your radio remotely over your network or VPN * Search for hashtag room names for channels you don't have keys for yet +* Forward decrypted packets to MQTT brokers * Visualize the mesh as a map or node set, view repeater stats, and more! **Warning:** This app has no auth, and is for trusted environments only. _Do not put this on an untrusted network, or open it to the public._ The bots can execute arbitrary Python code which means anyone on your network can, too. If you need access control, consider using a reverse proxy like Nginx, or extending FastAPI; access control and user management are outside the scope of this app. diff --git a/app/AGENTS.md b/app/AGENTS.md index e79bb2b..f018c1d 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -27,6 +27,7 @@ app/ ├── packet_processor.py # Raw packet pipeline, dedup, path handling ├── event_handlers.py # MeshCore event subscriptions and ACK tracking ├── websocket.py # WS manager + broadcast helpers +├── mqtt.py # Optional MQTT publisher (fire-and-forget forwarding) ├── bot.py # Bot execution and outbound bot sends ├── dependencies.py # Shared FastAPI dependency providers ├── keystore.py # Ephemeral private/public key storage for DM decryption @@ -99,6 +100,19 @@ app/ - `0` means disabled. - Last send time tracked in `app_settings.last_advert_time`. +### MQTT publishing + +- Optional forwarding of mesh events to an external MQTT broker. +- All config in `app_settings` (not env vars): `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password`, `mqtt_use_tls`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets`. +- Disabled when `mqtt_broker_host` is empty. +- `broadcast_event()` in `websocket.py` calls `mqtt_broadcast()` — single hook covers all message and raw_packet events. +- `MqttPublisher` (`app/mqtt.py`) runs a background connection loop with auto-reconnect and exponential backoff (5s → 30s). +- Publishes are fire-and-forget; individual publish failures logged but not surfaced to users. +- Connection state changes surface via `broadcast_error`/`broadcast_success` toasts. +- Health endpoint includes `mqtt_status` field (`connected`, `disconnected`, `disabled`). +- Settings changes trigger `mqtt_publisher.restart()` — no server restart needed. +- Topics: `{prefix}/dm:{key}`, `{prefix}/gm:{key}`, `{prefix}/raw/dm:{key}`, `{prefix}/raw/gm:{key}`, `{prefix}/raw/unrouted`. + ## API Surface (all under `/api`) ### Health @@ -204,6 +218,8 @@ Main tables: - `advert_interval` - `last_advert_time` - `bots` +- `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password` +- `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets` ## Security Posture (intentional) @@ -239,6 +255,7 @@ tests/ ├── test_message_pagination.py # Cursor-based message pagination ├── test_message_prefix_claim.py # Message prefix claim logic ├── test_migrations.py # Schema migration system +├── test_mqtt.py # MQTT publisher topic routing and lifecycle ├── test_packet_pipeline.py # End-to-end packet processing ├── test_radio.py # RadioManager, serial detection ├── test_radio_operation.py # radio_operation() context manager diff --git a/app/main.py b/app/main.py index c20ce1a..ee7e4c2 100644 --- a/app/main.py +++ b/app/main.py @@ -56,9 +56,20 @@ async def lifespan(app: FastAPI): # Always start connection monitor (even if initial connection failed) await radio_manager.start_connection_monitor() + # Start MQTT publisher if configured + from app.mqtt import mqtt_publisher + from app.repository import AppSettingsRepository + + try: + mqtt_settings = await AppSettingsRepository.get() + await mqtt_publisher.start(mqtt_settings) + except Exception as e: + logger.warning("Failed to start MQTT publisher: %s", e) + yield logger.info("Shutting down") + await mqtt_publisher.stop() await radio_manager.stop_connection_monitor() await stop_message_polling() await stop_periodic_advert() diff --git a/app/migrations.py b/app/migrations.py index 5194a9c..c842033 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -247,6 +247,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 30) applied += 1 + # Migration 31: Add MQTT configuration columns to app_settings + if version < 31: + logger.info("Applying migration 31: add MQTT columns to app_settings") + await _migrate_031_add_mqtt_columns(conn) + await set_version(conn, 31) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -1852,3 +1859,35 @@ async def _migrate_030_add_pagination_index(conn: aiosqlite.Connection) -> None: ) await conn.execute("DROP INDEX IF EXISTS idx_messages_conversation") await conn.commit() + + +async def _migrate_031_add_mqtt_columns(conn: aiosqlite.Connection) -> None: + """Add MQTT configuration columns to app_settings.""" + # Guard: app_settings may not exist in partial-schema test setups + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='app_settings'" + ) + if not await cursor.fetchone(): + await conn.commit() + return + + cursor = await conn.execute("PRAGMA table_info(app_settings)") + columns = {row[1] for row in await cursor.fetchall()} + + new_columns = [ + ("mqtt_broker_host", "TEXT DEFAULT ''"), + ("mqtt_broker_port", "INTEGER DEFAULT 1883"), + ("mqtt_username", "TEXT DEFAULT ''"), + ("mqtt_password", "TEXT DEFAULT ''"), + ("mqtt_use_tls", "INTEGER DEFAULT 0"), + ("mqtt_tls_insecure", "INTEGER DEFAULT 0"), + ("mqtt_topic_prefix", "TEXT DEFAULT 'meshcore'"), + ("mqtt_publish_messages", "INTEGER DEFAULT 0"), + ("mqtt_publish_raw_packets", "INTEGER DEFAULT 0"), + ] + + for col_name, col_def in new_columns: + if col_name not in columns: + await conn.execute(f"ALTER TABLE app_settings ADD COLUMN {col_name} {col_def}") + + await conn.commit() diff --git a/app/models.py b/app/models.py index 8044d85..a636060 100644 --- a/app/models.py +++ b/app/models.py @@ -173,6 +173,8 @@ class RawPacketDecryptedInfo(BaseModel): channel_name: str | None = None sender: str | None = None + channel_key: str | None = None + contact_key: str | None = None class RawPacketBroadcast(BaseModel): @@ -425,6 +427,42 @@ class AppSettings(BaseModel): default_factory=list, description="List of bot configurations", ) + mqtt_broker_host: str = Field( + default="", + description="MQTT broker hostname (empty = disabled)", + ) + mqtt_broker_port: int = Field( + default=1883, + description="MQTT broker port", + ) + mqtt_username: str = Field( + default="", + description="MQTT username (optional)", + ) + mqtt_password: str = Field( + default="", + description="MQTT password (optional)", + ) + mqtt_use_tls: bool = Field( + default=False, + description="Whether to use TLS for MQTT connection", + ) + mqtt_tls_insecure: bool = Field( + default=False, + description="Skip TLS certificate verification (for self-signed certs)", + ) + mqtt_topic_prefix: str = Field( + default="meshcore", + description="MQTT topic prefix", + ) + mqtt_publish_messages: bool = Field( + default=False, + description="Whether to publish decrypted messages to MQTT", + ) + mqtt_publish_raw_packets: bool = Field( + default=False, + description="Whether to publish raw packets to MQTT", + ) class BusyChannel(BaseModel): diff --git a/app/mqtt.py b/app/mqtt.py new file mode 100644 index 0000000..9c2933c --- /dev/null +++ b/app/mqtt.py @@ -0,0 +1,223 @@ +"""MQTT publisher for forwarding mesh network events to an MQTT broker.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import ssl +from typing import Any + +import aiomqtt + +from app.models import AppSettings + +logger = logging.getLogger(__name__) + +# Reconnect backoff: start at 5s, cap at 30s +_BACKOFF_MIN = 5 +_BACKOFF_MAX = 30 + + +class MqttPublisher: + """Manages an MQTT connection and publishes mesh network events.""" + + def __init__(self) -> None: + self._client: aiomqtt.Client | None = None + self._task: asyncio.Task[None] | None = None + self._settings: AppSettings | None = None + self._settings_version: int = 0 + self._version_event: asyncio.Event = asyncio.Event() + self.connected: bool = False + + async def start(self, settings: AppSettings) -> None: + """Start the background connection loop.""" + self._settings = settings + self._settings_version += 1 + self._version_event.set() + if self._task is None or self._task.done(): + self._task = asyncio.create_task(self._connection_loop()) + + async def stop(self) -> None: + """Cancel the background task and disconnect.""" + if self._task and not self._task.done(): + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._client = None + self.connected = False + + async def restart(self, settings: AppSettings) -> None: + """Called when MQTT settings change — stop + start.""" + await self.stop() + await self.start(settings) + + async def publish(self, topic: str, payload: dict[str, Any]) -> None: + """Publish a JSON payload. Drops silently if not connected.""" + if self._client is None or not self.connected: + return + try: + await self._client.publish(topic, json.dumps(payload)) + except Exception as e: + logger.warning("MQTT publish failed on %s: %s", topic, e) + self.connected = False + # Wake the connection loop so it exits the wait and reconnects + self._settings_version += 1 + self._version_event.set() + + def _mqtt_configured(self) -> bool: + """Check if MQTT is configured (broker host is set).""" + return bool(self._settings and self._settings.mqtt_broker_host) + + async def _connection_loop(self) -> None: + """Background loop: connect, wait, reconnect on failure.""" + from app.websocket import broadcast_error, broadcast_success + + backoff = _BACKOFF_MIN + + while True: + if not self._mqtt_configured(): + self.connected = False + self._client = None + # Wait until settings change (which might configure MQTT) + self._version_event.clear() + try: + await self._version_event.wait() + except asyncio.CancelledError: + return + continue + + settings = self._settings + assert settings is not None # guaranteed by _mqtt_configured() + version_at_connect = self._settings_version + + try: + tls_context = self._build_tls_context(settings) + + async with aiomqtt.Client( + hostname=settings.mqtt_broker_host, + port=settings.mqtt_broker_port, + username=settings.mqtt_username or None, + password=settings.mqtt_password or None, + tls_context=tls_context, + ) as client: + self._client = client + self.connected = True + backoff = _BACKOFF_MIN + + broadcast_success( + "MQTT connected", + f"{settings.mqtt_broker_host}:{settings.mqtt_broker_port}", + ) + _broadcast_mqtt_health() + + # Wait until cancelled or settings version changes. + # The 60s timeout is a housekeeping wake-up; actual connection + # liveness is handled by paho-mqtt's keepalive mechanism. + while self._settings_version == version_at_connect: + self._version_event.clear() + try: + await asyncio.wait_for(self._version_event.wait(), timeout=60) + except asyncio.TimeoutError: + continue + + except asyncio.CancelledError: + self.connected = False + self._client = None + return + + except Exception as e: + self.connected = False + self._client = None + + broadcast_error( + "MQTT connection failure", + "Please correct the settings or disable.", + ) + _broadcast_mqtt_health() + logger.warning("MQTT connection error: %s (reconnecting in %ds)", e, backoff) + + try: + await asyncio.sleep(backoff) + except asyncio.CancelledError: + return + backoff = min(backoff * 2, _BACKOFF_MAX) + + @staticmethod + def _build_tls_context(settings: AppSettings) -> ssl.SSLContext | None: + """Build TLS context from settings, or None if TLS is disabled.""" + if not settings.mqtt_use_tls: + return None + ctx = ssl.create_default_context() + if settings.mqtt_tls_insecure: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +# Module-level singleton +mqtt_publisher = MqttPublisher() + + +def _broadcast_mqtt_health() -> None: + """Push updated health (including mqtt_status) to all WS clients.""" + from app.radio import radio_manager + from app.websocket import broadcast_health + + broadcast_health(radio_manager.is_connected, radio_manager.connection_info) + + +def mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None: + """Fire-and-forget MQTT publish, matching broadcast_event's pattern.""" + if event_type not in ("message", "raw_packet"): + return + if not mqtt_publisher.connected or mqtt_publisher._settings is None: + return + asyncio.create_task(_mqtt_maybe_publish(event_type, data)) + + +async def _mqtt_maybe_publish(event_type: str, data: dict[str, Any]) -> None: + """Check settings and build topic, then publish.""" + settings = mqtt_publisher._settings + if settings is None: + return + + try: + if event_type == "message" and settings.mqtt_publish_messages: + topic = _build_message_topic(settings.mqtt_topic_prefix, data) + await mqtt_publisher.publish(topic, data) + + elif event_type == "raw_packet" and settings.mqtt_publish_raw_packets: + topic = _build_raw_packet_topic(settings.mqtt_topic_prefix, data) + await mqtt_publisher.publish(topic, data) + + except Exception as e: + logger.warning("MQTT broadcast error: %s", e) + + +def _build_message_topic(prefix: str, data: dict[str, Any]) -> str: + """Build MQTT topic for a decrypted message.""" + msg_type = data.get("type", "") + conversation_key = data.get("conversation_key", "unknown") + + if msg_type == "PRIV": + return f"{prefix}/dm:{conversation_key}" + elif msg_type == "CHAN": + return f"{prefix}/gm:{conversation_key}" + return f"{prefix}/message:{conversation_key}" + + +def _build_raw_packet_topic(prefix: str, data: dict[str, Any]) -> str: + """Build MQTT topic for a raw packet.""" + info = data.get("decrypted_info") + if info and isinstance(info, dict): + contact_key = info.get("contact_key") + channel_key = info.get("channel_key") + if contact_key: + return f"{prefix}/raw/dm:{contact_key}" + if channel_key: + return f"{prefix}/raw/gm:{channel_key}" + return f"{prefix}/raw/unrouted" diff --git a/app/packet_processor.py b/app/packet_processor.py index 04a7a82..010d49d 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -576,6 +576,8 @@ async def process_raw_packet( decrypted_info=RawPacketDecryptedInfo( channel_name=result["channel_name"], sender=result["sender"], + channel_key=result.get("channel_key"), + contact_key=result.get("contact_key"), ) if result["decrypted"] else None, @@ -632,6 +634,7 @@ async def _process_group_text( "channel_name": channel.name, "sender": decrypted.sender, "message_id": msg_id, # None if duplicate, msg_id if new + "channel_key": channel.key, } # Couldn't decrypt with any known key @@ -888,6 +891,7 @@ async def _process_direct_message( "contact_name": contact.name, "sender": contact.name or contact.public_key[:12], "message_id": msg_id, + "contact_key": contact.public_key, } # Couldn't decrypt with any known contact diff --git a/app/repository/settings.py b/app/repository/settings.py index b888047..49bf450 100644 --- a/app/repository/settings.py +++ b/app/repository/settings.py @@ -26,7 +26,10 @@ class AppSettingsRepository: """ SELECT max_radio_contacts, favorites, auto_decrypt_dm_on_advert, sidebar_sort_order, last_message_times, preferences_migrated, - advert_interval, last_advert_time, bots + advert_interval, last_advert_time, bots, + mqtt_broker_host, mqtt_broker_port, mqtt_username, mqtt_password, + mqtt_use_tls, mqtt_tls_insecure, mqtt_topic_prefix, + mqtt_publish_messages, mqtt_publish_raw_packets FROM app_settings WHERE id = 1 """ ) @@ -91,6 +94,15 @@ class AppSettingsRepository: advert_interval=row["advert_interval"] or 0, last_advert_time=row["last_advert_time"] or 0, bots=bots, + mqtt_broker_host=row["mqtt_broker_host"] or "", + mqtt_broker_port=row["mqtt_broker_port"] or 1883, + mqtt_username=row["mqtt_username"] or "", + mqtt_password=row["mqtt_password"] or "", + mqtt_use_tls=bool(row["mqtt_use_tls"]), + mqtt_tls_insecure=bool(row["mqtt_tls_insecure"]), + mqtt_topic_prefix=row["mqtt_topic_prefix"] or "meshcore", + mqtt_publish_messages=bool(row["mqtt_publish_messages"]), + mqtt_publish_raw_packets=bool(row["mqtt_publish_raw_packets"]), ) @staticmethod @@ -104,6 +116,15 @@ class AppSettingsRepository: advert_interval: int | None = None, last_advert_time: int | None = None, bots: list[BotConfig] | None = None, + mqtt_broker_host: str | None = None, + mqtt_broker_port: int | None = None, + mqtt_username: str | None = None, + mqtt_password: str | None = None, + mqtt_use_tls: bool | None = None, + mqtt_tls_insecure: bool | None = None, + mqtt_topic_prefix: str | None = None, + mqtt_publish_messages: bool | None = None, + mqtt_publish_raw_packets: bool | None = None, ) -> AppSettings: """Update app settings. Only provided fields are updated.""" updates = [] @@ -147,6 +168,42 @@ class AppSettingsRepository: bots_json = json.dumps([b.model_dump() for b in bots]) params.append(bots_json) + if mqtt_broker_host is not None: + updates.append("mqtt_broker_host = ?") + params.append(mqtt_broker_host) + + if mqtt_broker_port is not None: + updates.append("mqtt_broker_port = ?") + params.append(mqtt_broker_port) + + if mqtt_username is not None: + updates.append("mqtt_username = ?") + params.append(mqtt_username) + + if mqtt_password is not None: + updates.append("mqtt_password = ?") + params.append(mqtt_password) + + if mqtt_use_tls is not None: + updates.append("mqtt_use_tls = ?") + params.append(1 if mqtt_use_tls else 0) + + if mqtt_tls_insecure is not None: + updates.append("mqtt_tls_insecure = ?") + params.append(1 if mqtt_tls_insecure else 0) + + if mqtt_topic_prefix is not None: + updates.append("mqtt_topic_prefix = ?") + params.append(mqtt_topic_prefix) + + if mqtt_publish_messages is not None: + updates.append("mqtt_publish_messages = ?") + params.append(1 if mqtt_publish_messages else 0) + + if mqtt_publish_raw_packets is not None: + updates.append("mqtt_publish_raw_packets = ?") + params.append(1 if mqtt_publish_raw_packets else 0) + if updates: query = f"UPDATE app_settings SET {', '.join(updates)} WHERE id = 1" await db.conn.execute(query, params) diff --git a/app/routers/health.py b/app/routers/health.py index 20aadcb..ab43c20 100644 --- a/app/routers/health.py +++ b/app/routers/health.py @@ -16,6 +16,7 @@ class HealthResponse(BaseModel): connection_info: str | None database_size_mb: float oldest_undecrypted_timestamp: int | None + mqtt_status: str | None = None async def build_health_data(radio_connected: bool, connection_info: str | None) -> dict: @@ -33,12 +34,25 @@ async def build_health_data(radio_connected: bool, connection_info: str | None) except RuntimeError: pass # Database not connected + # MQTT status + mqtt_status: str | None = None + try: + from app.mqtt import mqtt_publisher + + if mqtt_publisher._mqtt_configured(): + mqtt_status = "connected" if mqtt_publisher.connected else "disconnected" + else: + mqtt_status = "disabled" + except Exception: + pass + return { "status": "ok" if radio_connected else "degraded", "radio_connected": radio_connected, "connection_info": connection_info, "database_size_mb": db_size_mb, "oldest_undecrypted_timestamp": oldest_ts, + "mqtt_status": mqtt_status, } diff --git a/app/routers/settings.py b/app/routers/settings.py index 89583b4..4bfc7f1 100644 --- a/app/routers/settings.py +++ b/app/routers/settings.py @@ -59,6 +59,44 @@ class AppSettingsUpdate(BaseModel): default=None, description="List of bot configurations", ) + mqtt_broker_host: str | None = Field( + default=None, + description="MQTT broker hostname (empty = disabled)", + ) + mqtt_broker_port: int | None = Field( + default=None, + ge=1, + le=65535, + description="MQTT broker port", + ) + mqtt_username: str | None = Field( + default=None, + description="MQTT username (optional)", + ) + mqtt_password: str | None = Field( + default=None, + description="MQTT password (optional)", + ) + mqtt_use_tls: bool | None = Field( + default=None, + description="Whether to use TLS for MQTT connection", + ) + mqtt_tls_insecure: bool | None = Field( + default=None, + description="Skip TLS certificate verification (for self-signed certs)", + ) + mqtt_topic_prefix: str | None = Field( + default=None, + description="MQTT topic prefix", + ) + mqtt_publish_messages: bool | None = Field( + default=None, + description="Whether to publish decrypted messages to MQTT", + ) + mqtt_publish_raw_packets: bool | None = Field( + default=None, + description="Whether to publish raw packets to MQTT", + ) class FavoriteRequest(BaseModel): @@ -124,8 +162,35 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: logger.info("Updating bots (count=%d)", len(update.bots)) kwargs["bots"] = update.bots + # MQTT fields + mqtt_fields = [ + "mqtt_broker_host", + "mqtt_broker_port", + "mqtt_username", + "mqtt_password", + "mqtt_use_tls", + "mqtt_tls_insecure", + "mqtt_topic_prefix", + "mqtt_publish_messages", + "mqtt_publish_raw_packets", + ] + mqtt_changed = False + for field in mqtt_fields: + value = getattr(update, field) + if value is not None: + kwargs[field] = value + mqtt_changed = True + if kwargs: - return await AppSettingsRepository.update(**kwargs) + result = await AppSettingsRepository.update(**kwargs) + + # Restart MQTT publisher if any MQTT settings changed + if mqtt_changed: + from app.mqtt import mqtt_publisher + + await mqtt_publisher.restart(result) + + return result return await AppSettingsRepository.get() diff --git a/app/websocket.py b/app/websocket.py index f38be5f..825d4a1 100644 --- a/app/websocket.py +++ b/app/websocket.py @@ -96,10 +96,14 @@ def broadcast_event(event_type: str, data: dict) -> None: """Schedule a broadcast without blocking. Convenience function that creates an asyncio task to broadcast - an event to all connected WebSocket clients. + an event to all connected WebSocket clients and forward to MQTT. """ asyncio.create_task(ws_manager.broadcast(event_type, data)) + from app.mqtt import mqtt_broadcast + + mqtt_broadcast(event_type, data) + def broadcast_error(message: str, details: str | None = None) -> None: """Broadcast an error notification to all connected clients. diff --git a/frontend/AGENTS.md b/frontend/AGENTS.md index b76fb81..8bb6e76 100644 --- a/frontend/AGENTS.md +++ b/frontend/AGENTS.md @@ -206,6 +206,12 @@ LocalStorage migration helpers for favorites; canonical favorites are server-sid - `advert_interval` - `last_advert_time` - `bots` +- `mqtt_broker_host`, `mqtt_broker_port`, `mqtt_username`, `mqtt_password` +- `mqtt_use_tls`, `mqtt_tls_insecure`, `mqtt_topic_prefix`, `mqtt_publish_messages`, `mqtt_publish_raw_packets` + +`HealthStatus` includes `mqtt_status` (`"connected"`, `"disconnected"`, `"disabled"`, or `null`). + +`RawPacket.decrypted_info` includes `channel_key` and `contact_key` for MQTT topic routing. ## Contact Info Pane diff --git a/frontend/src/components/SettingsModal.tsx b/frontend/src/components/SettingsModal.tsx index 969de08..bfc62eb 100644 --- a/frontend/src/components/SettingsModal.tsx +++ b/frontend/src/components/SettingsModal.tsx @@ -85,6 +85,7 @@ export function SettingsModal(props: SettingsModalProps) { radio: !isMobile, identity: false, connectivity: false, + mqtt: false, database: false, bot: false, statistics: false, @@ -127,6 +128,17 @@ export function SettingsModal(props: SettingsModalProps) { // Advertisement interval state (displayed in hours, stored as seconds in DB) const [advertIntervalHours, setAdvertIntervalHours] = useState('0'); + // MQTT state + const [mqttBrokerHost, setMqttBrokerHost] = useState(''); + const [mqttBrokerPort, setMqttBrokerPort] = useState('1883'); + const [mqttUsername, setMqttUsername] = useState(''); + const [mqttPassword, setMqttPassword] = useState(''); + const [mqttUseTls, setMqttUseTls] = useState(false); + const [mqttTlsInsecure, setMqttTlsInsecure] = useState(false); + const [mqttTopicPrefix, setMqttTopicPrefix] = useState('meshcore'); + const [mqttPublishMessages, setMqttPublishMessages] = useState(false); + const [mqttPublishRawPackets, setMqttPublishRawPackets] = useState(false); + // Bot state const DEFAULT_BOT_CODE = `def bot( sender_name: str | None, @@ -193,6 +205,15 @@ export function SettingsModal(props: SettingsModalProps) { setAutoDecryptOnAdvert(appSettings.auto_decrypt_dm_on_advert); setAdvertIntervalHours(String(Math.round(appSettings.advert_interval / 3600))); setBots(appSettings.bots || []); + setMqttBrokerHost(appSettings.mqtt_broker_host ?? ''); + setMqttBrokerPort(String(appSettings.mqtt_broker_port ?? 1883)); + setMqttUsername(appSettings.mqtt_username ?? ''); + setMqttPassword(appSettings.mqtt_password ?? ''); + setMqttUseTls(appSettings.mqtt_use_tls ?? false); + setMqttTlsInsecure(appSettings.mqtt_tls_insecure ?? false); + setMqttTopicPrefix(appSettings.mqtt_topic_prefix ?? 'meshcore'); + setMqttPublishMessages(appSettings.mqtt_publish_messages ?? false); + setMqttPublishRawPackets(appSettings.mqtt_publish_raw_packets ?? false); } }, [appSettings]); @@ -409,6 +430,34 @@ export function SettingsModal(props: SettingsModalProps) { } }; + const handleSaveMqtt = async () => { + setSectionError(null); + setBusySection('mqtt'); + + try { + const update: AppSettingsUpdate = { + mqtt_broker_host: mqttBrokerHost, + mqtt_broker_port: parseInt(mqttBrokerPort, 10) || 1883, + mqtt_username: mqttUsername, + mqtt_password: mqttPassword, + mqtt_use_tls: mqttUseTls, + mqtt_tls_insecure: mqttTlsInsecure, + mqtt_topic_prefix: mqttTopicPrefix || 'meshcore', + mqtt_publish_messages: mqttPublishMessages, + mqtt_publish_raw_packets: mqttPublishRawPackets, + }; + await onSaveAppSettings(update); + toast.success('MQTT settings saved'); + } catch (err) { + setSectionError({ + section: 'mqtt', + message: err instanceof Error ? err.message : 'Failed to save', + }); + } finally { + setBusySection(null); + } + }; + const handleSetPrivateKey = async () => { if (!privateKey.trim()) { setSectionError({ section: 'identity', message: 'Private key is required' }); @@ -976,6 +1025,160 @@ export function SettingsModal(props: SettingsModalProps) { )} + {shouldRenderSection('mqtt') && ( +
+ {renderSectionHeader('mqtt')} + {isSectionVisible('mqtt') && ( +
+
+ + {health?.mqtt_status === 'connected' ? ( +
+
+ Connected +
+ ) : health?.mqtt_status === 'disconnected' ? ( +
+
+ Disconnected +
+ ) : ( +
+
+ Disabled +
+ )} +
+ + + +
+ + setMqttBrokerHost(e.target.value)} + /> +
+ +
+ + setMqttBrokerPort(e.target.value)} + /> +
+ +
+ + setMqttUsername(e.target.value)} + /> +
+ +
+ + setMqttPassword(e.target.value)} + /> +
+ + + + {mqttUseTls && ( + <> + +

+ Allow self-signed or untrusted broker certificates +

+ + )} + + + +
+ + setMqttTopicPrefix(e.target.value)} + /> +

+ Topics: {mqttTopicPrefix || 'meshcore'}/dm:<key>,{' '} + {mqttTopicPrefix || 'meshcore'}/gm:<key>, {mqttTopicPrefix || 'meshcore'} + /raw/... +

+
+ + + + +

+ Forward decrypted DM and channel messages +

+ + +

Forward all RF packets

+ + + + {getSectionError('mqtt') && ( +
{getSectionError('mqtt')}
+ )} +
+ )} +
+ )} + {shouldRenderSection('database') && (
{renderSectionHeader('database')} diff --git a/frontend/src/components/settingsConstants.ts b/frontend/src/components/settingsConstants.ts index 60fa3bc..e6e43b8 100644 --- a/frontend/src/components/settingsConstants.ts +++ b/frontend/src/components/settingsConstants.ts @@ -2,6 +2,7 @@ export type SettingsSection = | 'radio' | 'identity' | 'connectivity' + | 'mqtt' | 'database' | 'bot' | 'statistics'; @@ -10,6 +11,7 @@ export const SETTINGS_SECTION_ORDER: SettingsSection[] = [ 'radio', 'identity', 'connectivity', + 'mqtt', 'database', 'bot', 'statistics', @@ -19,6 +21,7 @@ export const SETTINGS_SECTION_LABELS: Record = { radio: '📻 Radio', identity: '🪪 Identity', connectivity: '📡 Connectivity', + mqtt: '📤 MQTT', database: '🗄️ Database & Interface', bot: '🤖 Bot', statistics: '📊 Statistics', diff --git a/frontend/src/test/settingsModal.test.tsx b/frontend/src/test/settingsModal.test.tsx index 2f108bc..19be53a 100644 --- a/frontend/src/test/settingsModal.test.tsx +++ b/frontend/src/test/settingsModal.test.tsx @@ -38,6 +38,7 @@ const baseHealth: HealthStatus = { connection_info: 'Serial: /dev/ttyUSB0', database_size_mb: 1.2, oldest_undecrypted_timestamp: null, + mqtt_status: null, }; const baseSettings: AppSettings = { @@ -50,10 +51,20 @@ const baseSettings: AppSettings = { advert_interval: 0, last_advert_time: 0, bots: [], + mqtt_broker_host: '', + mqtt_broker_port: 1883, + mqtt_username: '', + mqtt_password: '', + mqtt_use_tls: false, + mqtt_tls_insecure: false, + mqtt_topic_prefix: 'meshcore', + mqtt_publish_messages: false, + mqtt_publish_raw_packets: false, }; function renderModal(overrides?: { appSettings?: AppSettings; + health?: HealthStatus; onSaveAppSettings?: (update: AppSettingsUpdate) => Promise; onRefreshAppSettings?: () => Promise; onSave?: (update: RadioConfigUpdate) => Promise; @@ -79,7 +90,7 @@ function renderModal(overrides?: { open: overrides?.open ?? true, pageMode: overrides?.pageMode, config: baseConfig, - health: baseHealth, + health: overrides?.health ?? baseHealth, appSettings: overrides?.appSettings ?? baseSettings, onClose, onSave, @@ -133,6 +144,11 @@ function openConnectivitySection() { fireEvent.click(connectivityToggle); } +function openMqttSection() { + const mqttToggle = screen.getByRole('button', { name: /MQTT/i }); + fireEvent.click(mqttToggle); +} + function openDatabaseSection() { const databaseToggle = screen.getByRole('button', { name: /Database/i }); fireEvent.click(databaseToggle); @@ -389,6 +405,66 @@ describe('SettingsModal', () => { expect(screen.getByText('42 msgs')).toBeInTheDocument(); }); + it('renders MQTT section with form inputs', () => { + renderModal(); + openMqttSection(); + + expect(screen.getByLabelText('Broker Host')).toBeInTheDocument(); + expect(screen.getByLabelText('Broker Port')).toBeInTheDocument(); + expect(screen.getByLabelText('Username')).toBeInTheDocument(); + expect(screen.getByLabelText('Password')).toBeInTheDocument(); + expect(screen.getByLabelText('Topic Prefix')).toBeInTheDocument(); + expect(screen.getByText('Publish Messages')).toBeInTheDocument(); + expect(screen.getByText('Publish Raw Packets')).toBeInTheDocument(); + }); + + it('saves MQTT settings through onSaveAppSettings', async () => { + const { onSaveAppSettings } = renderModal(); + openMqttSection(); + + const hostInput = screen.getByLabelText('Broker Host'); + fireEvent.change(hostInput, { target: { value: 'mqtt.example.com' } }); + + fireEvent.click(screen.getByRole('button', { name: 'Save MQTT Settings' })); + + await waitFor(() => { + expect(onSaveAppSettings).toHaveBeenCalledWith( + expect.objectContaining({ + mqtt_broker_host: 'mqtt.example.com', + mqtt_broker_port: 1883, + }) + ); + }); + }); + + it('shows MQTT disabled status when mqtt_status is null', () => { + renderModal({ + appSettings: { + ...baseSettings, + mqtt_broker_host: 'broker.local', + }, + }); + openMqttSection(); + + expect(screen.getByText('Disabled')).toBeInTheDocument(); + }); + + it('shows MQTT connected status badge', () => { + renderModal({ + appSettings: { + ...baseSettings, + mqtt_broker_host: 'broker.local', + }, + health: { + ...baseHealth, + mqtt_status: 'connected', + }, + }); + openMqttSection(); + + expect(screen.getByText('Connected')).toBeInTheDocument(); + }); + it('fetches statistics when expanded in mobile external-nav mode', async () => { const mockStats: StatisticsResponse = { busiest_channels_24h: [], diff --git a/frontend/src/types.ts b/frontend/src/types.ts index d6fe960..7609161 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -29,6 +29,7 @@ export interface HealthStatus { connection_info: string | null; database_size_mb: number; oldest_undecrypted_timestamp: number | null; + mqtt_status: string | null; } export interface MaintenanceResult { @@ -155,6 +156,8 @@ export interface RawPacket { decrypted_info: { channel_name: string | null; sender: string | null; + channel_key: string | null; + contact_key: string | null; } | null; } @@ -180,6 +183,15 @@ export interface AppSettings { advert_interval: number; last_advert_time: number; bots: BotConfig[]; + mqtt_broker_host: string; + mqtt_broker_port: number; + mqtt_username: string; + mqtt_password: string; + mqtt_use_tls: boolean; + mqtt_tls_insecure: boolean; + mqtt_topic_prefix: string; + mqtt_publish_messages: boolean; + mqtt_publish_raw_packets: boolean; } export interface AppSettingsUpdate { @@ -188,6 +200,15 @@ export interface AppSettingsUpdate { sidebar_sort_order?: 'recent' | 'alpha'; advert_interval?: number; bots?: BotConfig[]; + mqtt_broker_host?: string; + mqtt_broker_port?: number; + mqtt_username?: string; + mqtt_password?: string; + mqtt_use_tls?: boolean; + mqtt_tls_insecure?: boolean; + mqtt_topic_prefix?: string; + mqtt_publish_messages?: boolean; + mqtt_publish_raw_packets?: boolean; } export interface MigratePreferencesRequest { diff --git a/pyproject.toml b/pyproject.toml index fdfc939..d6fa096 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "pycryptodome>=3.20.0", "pynacl>=1.5.0", "meshcore", + "aiomqtt>=2.0", ] [project.optional-dependencies] diff --git a/tests/test_migrations.py b/tests/test_migrations.py index b8a7f11..64c3948 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -100,8 +100,8 @@ class TestMigration001: # Run migrations applied = await run_migrations(conn) - assert applied == 30 # All migrations run - assert await get_version(conn) == 30 + assert applied == 31 # All migrations run + assert await get_version(conn) == 31 # Verify columns exist by inserting and selecting await conn.execute( @@ -183,9 +183,9 @@ class TestMigration001: applied1 = await run_migrations(conn) applied2 = await run_migrations(conn) - assert applied1 == 30 # All migrations run + assert applied1 == 31 # All migrations run assert applied2 == 0 # No migrations on second run - assert await get_version(conn) == 30 + assert await get_version(conn) == 31 finally: await conn.close() @@ -246,8 +246,8 @@ class TestMigration001: applied = await run_migrations(conn) # All migrations applied (version incremented) but no error - assert applied == 30 - assert await get_version(conn) == 30 + assert applied == 31 + assert await get_version(conn) == 31 finally: await conn.close() @@ -376,8 +376,8 @@ class TestMigration013: # Run migration 13 (plus 14-27 which also run) applied = await run_migrations(conn) - assert applied == 18 - assert await get_version(conn) == 30 + assert applied == 19 + assert await get_version(conn) == 31 # Verify bots array was created with migrated data cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1") @@ -497,7 +497,7 @@ class TestMigration018: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 30 + assert await get_version(conn) == 31 # Verify autoindex is gone cursor = await conn.execute( @@ -575,8 +575,8 @@ class TestMigration018: await conn.commit() applied = await run_migrations(conn) - assert applied == 13 # Migrations 18-30 run (18+19 skip internally) - assert await get_version(conn) == 30 + assert applied == 14 # Migrations 18-31 run (18+19 skip internally) + assert await get_version(conn) == 31 finally: await conn.close() @@ -648,7 +648,7 @@ class TestMigration019: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 30 + assert await get_version(conn) == 31 # Verify autoindex is gone cursor = await conn.execute( @@ -714,8 +714,8 @@ class TestMigration020: assert (await cursor.fetchone())[0] == "delete" applied = await run_migrations(conn) - assert applied == 11 # Migrations 20-30 - assert await get_version(conn) == 30 + assert applied == 12 # Migrations 20-31 + assert await get_version(conn) == 31 # Verify WAL mode cursor = await conn.execute("PRAGMA journal_mode") @@ -745,7 +745,7 @@ class TestMigration020: await set_version(conn, 20) applied = await run_migrations(conn) - assert applied == 10 # Migrations 21-30 still run + assert applied == 11 # Migrations 21-31 still run # Still WAL + INCREMENTAL cursor = await conn.execute("PRAGMA journal_mode") @@ -803,8 +803,8 @@ class TestMigration028: await conn.commit() applied = await run_migrations(conn) - assert applied == 3 - assert await get_version(conn) == 30 + assert applied == 4 + assert await get_version(conn) == 31 # Verify payload_hash column is now BLOB cursor = await conn.execute("PRAGMA table_info(raw_packets)") @@ -873,8 +873,8 @@ class TestMigration028: await conn.commit() applied = await run_migrations(conn) - assert applied == 3 # Version still bumped - assert await get_version(conn) == 30 + assert applied == 4 # Version still bumped + assert await get_version(conn) == 31 # Verify data unchanged cursor = await conn.execute("SELECT payload_hash FROM raw_packets") diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py new file mode 100644 index 0000000..64a01e6 --- /dev/null +++ b/tests/test_mqtt.py @@ -0,0 +1,550 @@ +"""Tests for MQTT publisher module.""" + +import ssl +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from app.models import AppSettings +from app.mqtt import ( + MqttPublisher, + _build_message_topic, + _build_raw_packet_topic, +) + + +def _make_settings(**overrides) -> AppSettings: + """Create an AppSettings with MQTT fields.""" + defaults = { + "mqtt_broker_host": "broker.local", + "mqtt_broker_port": 1883, + "mqtt_username": "", + "mqtt_password": "", + "mqtt_use_tls": False, + "mqtt_topic_prefix": "meshcore", + "mqtt_publish_messages": True, + "mqtt_publish_raw_packets": True, + } + defaults.update(overrides) + return AppSettings(**defaults) + + +class TestTopicBuilders: + def test_dm_message_topic(self): + topic = _build_message_topic("meshcore", {"type": "PRIV", "conversation_key": "abc123"}) + assert topic == "meshcore/dm:abc123" + + def test_channel_message_topic(self): + topic = _build_message_topic("meshcore", {"type": "CHAN", "conversation_key": "def456"}) + assert topic == "meshcore/gm:def456" + + def test_unknown_message_type_fallback(self): + topic = _build_message_topic("meshcore", {"type": "OTHER", "conversation_key": "xyz"}) + assert topic == "meshcore/message:xyz" + + def test_custom_prefix(self): + topic = _build_message_topic("myprefix", {"type": "PRIV", "conversation_key": "abc"}) + assert topic == "myprefix/dm:abc" + + def test_raw_packet_dm_topic(self): + data = {"decrypted_info": {"contact_key": "contact123", "channel_key": None}} + topic = _build_raw_packet_topic("meshcore", data) + assert topic == "meshcore/raw/dm:contact123" + + def test_raw_packet_gm_topic(self): + data = {"decrypted_info": {"contact_key": None, "channel_key": "chan456"}} + topic = _build_raw_packet_topic("meshcore", data) + assert topic == "meshcore/raw/gm:chan456" + + def test_raw_packet_unrouted_no_info(self): + data = {"decrypted_info": None} + topic = _build_raw_packet_topic("meshcore", data) + assert topic == "meshcore/raw/unrouted" + + def test_raw_packet_unrouted_empty_keys(self): + data = {"decrypted_info": {"contact_key": None, "channel_key": None}} + topic = _build_raw_packet_topic("meshcore", data) + assert topic == "meshcore/raw/unrouted" + + def test_raw_packet_contact_takes_precedence_over_channel(self): + data = {"decrypted_info": {"contact_key": "c1", "channel_key": "ch1"}} + topic = _build_raw_packet_topic("meshcore", data) + assert topic == "meshcore/raw/dm:c1" + + +class TestMqttPublisher: + def test_initial_state(self): + pub = MqttPublisher() + assert pub.connected is False + assert pub._client is None + + def test_not_configured_when_host_empty(self): + pub = MqttPublisher() + pub._settings = _make_settings(mqtt_broker_host="") + assert pub._mqtt_configured() is False + + def test_configured_when_host_set(self): + pub = MqttPublisher() + pub._settings = _make_settings(mqtt_broker_host="broker.local") + assert pub._mqtt_configured() is True + + @pytest.mark.asyncio + async def test_publish_drops_silently_when_disconnected(self): + pub = MqttPublisher() + pub.connected = False + # Should not raise + await pub.publish("topic", {"key": "value"}) + + @pytest.mark.asyncio + async def test_publish_calls_client_when_connected(self): + pub = MqttPublisher() + pub.connected = True + mock_client = AsyncMock() + pub._client = mock_client + + await pub.publish("test/topic", {"msg": "hello"}) + + mock_client.publish.assert_called_once() + call_args = mock_client.publish.call_args + assert call_args[0][0] == "test/topic" + + @pytest.mark.asyncio + async def test_publish_handles_exception_gracefully(self): + pub = MqttPublisher() + pub.connected = True + mock_client = AsyncMock() + mock_client.publish.side_effect = Exception("Network error") + pub._client = mock_client + + # Should not raise + await pub.publish("test/topic", {"msg": "hello"}) + + # After a publish failure, connected should be cleared to stop + # further attempts and reflect accurate status + assert pub.connected is False + + @pytest.mark.asyncio + async def test_stop_resets_state(self): + pub = MqttPublisher() + pub.connected = True + pub._client = MagicMock() + pub._task = None # No task to cancel + + await pub.stop() + + assert pub.connected is False + assert pub._client is None + + +class TestMqttBroadcast: + @pytest.mark.asyncio + async def test_mqtt_broadcast_skips_when_disconnected(self): + """mqtt_broadcast should return immediately if publisher is disconnected.""" + from app.mqtt import mqtt_publisher + + original_settings = mqtt_publisher._settings + original_connected = mqtt_publisher.connected + + try: + mqtt_publisher.connected = False + mqtt_publisher._settings = _make_settings() + + # This should not create any tasks or fail + from app.mqtt import mqtt_broadcast + + mqtt_broadcast("message", {"type": "PRIV", "conversation_key": "abc"}) + finally: + mqtt_publisher._settings = original_settings + mqtt_publisher.connected = original_connected + + @pytest.mark.asyncio + async def test_mqtt_maybe_publish_message(self): + """_mqtt_maybe_publish should call publish for message events.""" + from app.mqtt import _mqtt_maybe_publish, mqtt_publisher + + original_settings = mqtt_publisher._settings + original_connected = mqtt_publisher.connected + + try: + mqtt_publisher._settings = _make_settings(mqtt_publish_messages=True) + mqtt_publisher.connected = True + + with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: + await _mqtt_maybe_publish("message", {"type": "PRIV", "conversation_key": "abc123"}) + mock_pub.assert_called_once() + topic = mock_pub.call_args[0][0] + assert topic == "meshcore/dm:abc123" + finally: + mqtt_publisher._settings = original_settings + mqtt_publisher.connected = original_connected + + @pytest.mark.asyncio + async def test_mqtt_maybe_publish_raw_packet(self): + """_mqtt_maybe_publish should call publish for raw_packet events.""" + from app.mqtt import _mqtt_maybe_publish, mqtt_publisher + + original_settings = mqtt_publisher._settings + original_connected = mqtt_publisher.connected + + try: + mqtt_publisher._settings = _make_settings(mqtt_publish_raw_packets=True) + mqtt_publisher.connected = True + + with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: + await _mqtt_maybe_publish( + "raw_packet", + {"decrypted_info": {"channel_key": "ch1", "contact_key": None}}, + ) + mock_pub.assert_called_once() + topic = mock_pub.call_args[0][0] + assert topic == "meshcore/raw/gm:ch1" + finally: + mqtt_publisher._settings = original_settings + mqtt_publisher.connected = original_connected + + @pytest.mark.asyncio + async def test_mqtt_maybe_publish_skips_disabled_messages(self): + """_mqtt_maybe_publish should skip messages when publish_messages is False.""" + from app.mqtt import _mqtt_maybe_publish, mqtt_publisher + + original_settings = mqtt_publisher._settings + original_connected = mqtt_publisher.connected + + try: + mqtt_publisher._settings = _make_settings(mqtt_publish_messages=False) + mqtt_publisher.connected = True + + with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: + await _mqtt_maybe_publish("message", {"type": "PRIV", "conversation_key": "abc"}) + mock_pub.assert_not_called() + finally: + mqtt_publisher._settings = original_settings + mqtt_publisher.connected = original_connected + + @pytest.mark.asyncio + async def test_mqtt_maybe_publish_skips_disabled_raw_packets(self): + """_mqtt_maybe_publish should skip raw_packets when publish_raw_packets is False.""" + from app.mqtt import _mqtt_maybe_publish, mqtt_publisher + + original_settings = mqtt_publisher._settings + original_connected = mqtt_publisher.connected + + try: + mqtt_publisher._settings = _make_settings(mqtt_publish_raw_packets=False) + mqtt_publisher.connected = True + + with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: + await _mqtt_maybe_publish( + "raw_packet", + {"decrypted_info": None}, + ) + mock_pub.assert_not_called() + finally: + mqtt_publisher._settings = original_settings + mqtt_publisher.connected = original_connected + + +class TestBuildTlsContext: + def test_returns_none_when_tls_disabled(self): + settings = _make_settings(mqtt_use_tls=False) + assert MqttPublisher._build_tls_context(settings) is None + + def test_returns_context_when_tls_enabled(self): + settings = _make_settings(mqtt_use_tls=True) + ctx = MqttPublisher._build_tls_context(settings) + assert isinstance(ctx, ssl.SSLContext) + assert ctx.check_hostname is True + assert ctx.verify_mode == ssl.CERT_REQUIRED + + def test_insecure_skips_verification(self): + settings = _make_settings(mqtt_use_tls=True, mqtt_tls_insecure=True) + ctx = MqttPublisher._build_tls_context(settings) + assert isinstance(ctx, ssl.SSLContext) + assert ctx.check_hostname is False + assert ctx.verify_mode == ssl.CERT_NONE + + +def _mock_aiomqtt_client(): + """Create a mock aiomqtt.Client that works as an async context manager.""" + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + return mock_client + + +class TestConnectionLoop: + """Integration tests for MqttPublisher._connection_loop.""" + + @pytest.mark.asyncio + async def test_connects_and_sets_state(self): + """Connection loop should connect and set connected=True.""" + import asyncio + + pub = MqttPublisher() + settings = _make_settings() + + mock_client = _mock_aiomqtt_client() + + # The connection loop will block forever in the inner wait loop. + # We let it connect, verify state, then cancel. + connected_event = asyncio.Event() + + original_aenter = mock_client.__aenter__ + + async def side_effect_aenter(*a, **kw): + result = await original_aenter(*a, **kw) + # Signal that connection happened + connected_event.set() + return result + + mock_client.__aenter__ = AsyncMock(side_effect=side_effect_aenter) + + with ( + patch("app.mqtt.aiomqtt.Client", return_value=mock_client), + patch("app.mqtt._broadcast_mqtt_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_health"), + ): + await pub.start(settings) + + # Wait for connection to be established + await asyncio.wait_for(connected_event.wait(), timeout=2) + + assert pub.connected is True + assert pub._client is mock_client + + await pub.stop() + assert pub.connected is False + + @pytest.mark.asyncio + async def test_reconnects_after_connection_failure(self): + """Connection loop should retry after a connection error with backoff.""" + import asyncio + + from app.mqtt import _BACKOFF_MIN + + pub = MqttPublisher() + settings = _make_settings() + + attempt_count = 0 + connected_event = asyncio.Event() + + def make_client_factory(): + """Factory that fails first, succeeds second.""" + + def factory(**kwargs): + nonlocal attempt_count + attempt_count += 1 + mock = _mock_aiomqtt_client() + if attempt_count == 1: + # First attempt: fail on __aenter__ + mock.__aenter__ = AsyncMock(side_effect=ConnectionRefusedError("refused")) + else: + # Second attempt: succeed and signal + original_aenter = mock.__aenter__ + + async def signal_aenter(*a, **kw): + result = await original_aenter(*a, **kw) + connected_event.set() + return result + + mock.__aenter__ = AsyncMock(side_effect=signal_aenter) + return mock + + return factory + + with ( + patch("app.mqtt.aiomqtt.Client", side_effect=make_client_factory()), + patch("app.mqtt._broadcast_mqtt_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + patch("app.mqtt.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, + ): + await pub.start(settings) + + # Wait for second (successful) connection + await asyncio.wait_for(connected_event.wait(), timeout=5) + + assert pub.connected is True + assert attempt_count == 2 + # Should have slept with initial backoff after first failure + mock_sleep.assert_called_once_with(_BACKOFF_MIN) + + await pub.stop() + + @pytest.mark.asyncio + async def test_backoff_increases_on_repeated_failures(self): + """Backoff should double after each failure, capped at _BACKOFF_MAX.""" + import asyncio + + from app.mqtt import _BACKOFF_MAX, _BACKOFF_MIN + + pub = MqttPublisher() + settings = _make_settings() + + max_failures = 4 # enough to observe doubling and capping + + def make_failing_factory(): + call_count = 0 + + def factory(**kwargs): + nonlocal call_count + call_count += 1 + mock = _mock_aiomqtt_client() + mock.__aenter__ = AsyncMock(side_effect=OSError("network down")) + return mock + + return factory, lambda: call_count + + factory, get_count = make_failing_factory() + sleep_args: list[int] = [] + + async def capture_sleep(duration): + sleep_args.append(duration) + if len(sleep_args) >= max_failures: + # Cancel the loop after enough failures + pub._task.cancel() + raise asyncio.CancelledError + + with ( + patch("app.mqtt.aiomqtt.Client", side_effect=factory), + patch("app.mqtt._broadcast_mqtt_health"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + patch("app.mqtt.asyncio.sleep", side_effect=capture_sleep), + ): + await pub.start(settings) + try: + await pub._task + except asyncio.CancelledError: + pass + + assert sleep_args[0] == _BACKOFF_MIN + assert sleep_args[1] == _BACKOFF_MIN * 2 + assert sleep_args[2] == _BACKOFF_MIN * 4 + # Fourth should be capped at _BACKOFF_MAX (5*8=40 > 30) + assert sleep_args[3] == _BACKOFF_MAX + + @pytest.mark.asyncio + async def test_waits_for_settings_when_unconfigured(self): + """When host is empty, loop should block until settings change.""" + import asyncio + + pub = MqttPublisher() + unconfigured = _make_settings(mqtt_broker_host="") + + connected_event = asyncio.Event() + + def make_success_client(**kwargs): + mock = _mock_aiomqtt_client() + original_aenter = mock.__aenter__ + + async def signal_aenter(*a, **kw): + result = await original_aenter(*a, **kw) + connected_event.set() + return result + + mock.__aenter__ = AsyncMock(side_effect=signal_aenter) + return mock + + with ( + patch("app.mqtt.aiomqtt.Client", side_effect=make_success_client), + patch("app.mqtt._broadcast_mqtt_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_health"), + ): + # Start with unconfigured settings — loop should wait + await pub.start(unconfigured) + await asyncio.sleep(0.05) + assert pub.connected is False + + # Now provide configured settings — loop should connect + configured = _make_settings(mqtt_broker_host="broker.local") + pub._settings = configured + pub._settings_version += 1 + pub._version_event.set() + + await asyncio.wait_for(connected_event.wait(), timeout=2) + assert pub.connected is True + + await pub.stop() + + @pytest.mark.asyncio + async def test_health_broadcast_on_connect_and_failure(self): + """_broadcast_mqtt_health should be called on connect and on failure.""" + import asyncio + + pub = MqttPublisher() + settings = _make_settings() + + health_calls: list[str] = [] + connect_event = asyncio.Event() + + def track_health(): + health_calls.append("health") + + def make_client(**kwargs): + mock = _mock_aiomqtt_client() + original_aenter = mock.__aenter__ + + async def signal_aenter(*a, **kw): + result = await original_aenter(*a, **kw) + connect_event.set() + return result + + mock.__aenter__ = AsyncMock(side_effect=signal_aenter) + return mock + + with ( + patch("app.mqtt.aiomqtt.Client", side_effect=make_client), + patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_health"), + ): + await pub.start(settings) + await asyncio.wait_for(connect_event.wait(), timeout=2) + + # Should have been called once on successful connect + assert len(health_calls) == 1 + + await pub.stop() + + @pytest.mark.asyncio + async def test_health_broadcast_on_connection_error(self): + """_broadcast_mqtt_health should be called when connection fails.""" + import asyncio + + pub = MqttPublisher() + settings = _make_settings() + + health_calls: list[str] = [] + + def track_health(): + health_calls.append("health") + + async def cancel_on_sleep(duration): + # Cancel after the first backoff sleep to stop the loop + pub._task.cancel() + raise asyncio.CancelledError + + def make_failing_client(**kwargs): + mock = _mock_aiomqtt_client() + mock.__aenter__ = AsyncMock(side_effect=OSError("refused")) + return mock + + with ( + patch("app.mqtt.aiomqtt.Client", side_effect=make_failing_client), + patch("app.mqtt._broadcast_mqtt_health", side_effect=track_health), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + patch("app.mqtt.asyncio.sleep", side_effect=cancel_on_sleep), + ): + await pub.start(settings) + try: + await pub._task + except asyncio.CancelledError: + pass + + # Should have been called once on connection failure + assert len(health_calls) == 1 diff --git a/tests/test_repository.py b/tests/test_repository.py index 5cbce1a..f9daaaf 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -493,6 +493,15 @@ class TestAppSettingsRepository: "advert_interval": None, "last_advert_time": None, "bots": "{bad-bots-json", + "mqtt_broker_host": "", + "mqtt_broker_port": 1883, + "mqtt_username": "", + "mqtt_password": "", + "mqtt_use_tls": 0, + "mqtt_tls_insecure": 0, + "mqtt_topic_prefix": "meshcore", + "mqtt_publish_messages": 0, + "mqtt_publish_raw_packets": 0, } ) mock_conn.execute = AsyncMock(return_value=mock_cursor) diff --git a/tests/test_settings_router.py b/tests/test_settings_router.py index c79fce8..74fdce8 100644 --- a/tests/test_settings_router.py +++ b/tests/test_settings_router.py @@ -1,5 +1,7 @@ """Tests for settings router endpoints and validation behavior.""" +from unittest.mock import AsyncMock, patch + import pytest from fastapi import HTTPException @@ -66,6 +68,55 @@ class TestUpdateSettings: assert exc.value.status_code == 400 assert "syntax error" in exc.value.detail.lower() + @pytest.mark.asyncio + async def test_mqtt_fields_round_trip(self, test_db): + """MQTT settings should be saved and retrieved correctly.""" + mock_publisher = type("MockPublisher", (), {"restart": AsyncMock()})() + with patch("app.mqtt.mqtt_publisher", mock_publisher): + result = await update_settings( + AppSettingsUpdate( + mqtt_broker_host="broker.test", + mqtt_broker_port=8883, + mqtt_username="user", + mqtt_password="pass", + mqtt_use_tls=True, + mqtt_tls_insecure=True, + mqtt_topic_prefix="custom", + mqtt_publish_messages=True, + mqtt_publish_raw_packets=True, + ) + ) + + assert result.mqtt_broker_host == "broker.test" + assert result.mqtt_broker_port == 8883 + assert result.mqtt_username == "user" + assert result.mqtt_password == "pass" + assert result.mqtt_use_tls is True + assert result.mqtt_tls_insecure is True + assert result.mqtt_topic_prefix == "custom" + assert result.mqtt_publish_messages is True + assert result.mqtt_publish_raw_packets is True + + # Verify persistence + fresh = await AppSettingsRepository.get() + assert fresh.mqtt_broker_host == "broker.test" + assert fresh.mqtt_use_tls is True + + @pytest.mark.asyncio + async def test_mqtt_defaults_on_fresh_db(self, test_db): + """MQTT fields should have correct defaults on a fresh database.""" + settings = await AppSettingsRepository.get() + + assert settings.mqtt_broker_host == "" + assert settings.mqtt_broker_port == 1883 + assert settings.mqtt_username == "" + assert settings.mqtt_password == "" + assert settings.mqtt_use_tls is False + assert settings.mqtt_tls_insecure is False + assert settings.mqtt_topic_prefix == "meshcore" + assert settings.mqtt_publish_messages is False + assert settings.mqtt_publish_raw_packets is False + class TestToggleFavorite: @pytest.mark.asyncio diff --git a/uv.lock b/uv.lock index 4d0a20c..1c1454b 100644 --- a/uv.lock +++ b/uv.lock @@ -2,6 +2,19 @@ version = 1 revision = 1 requires-python = ">=3.10" +[[package]] +name = "aiomqtt" +version = "2.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "paho-mqtt" }, + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/db/b5/798e4855d17f0f3a2e2ed21c07473fcb4bb45993116693d0f68553927e2c/aiomqtt-2.5.0.tar.gz", hash = "sha256:70e181c140a54ae736394efe2b9e865f665551a5417f6957456cc46010487b21", size = 86453 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/84/24/1d5d7d89906db1a94b5029942c2fd909cf8e8551b288f56c03053d5615f8/aiomqtt-2.5.0-py3-none-any.whl", hash = "sha256:65dabeafeeee7b88864361ae9a118d81bd27082093f32f670a5c5fab17de8cf2", size = 15983 }, +] + [[package]] name = "aiosqlite" version = "0.22.1" @@ -398,6 +411,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469 }, ] +[[package]] +name = "paho-mqtt" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/39/15/0a6214e76d4d32e7f663b109cf71fb22561c2be0f701d67f93950cd40542/paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834", size = 148848 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219 }, +] + [[package]] name = "pluggy" version = "1.6.0" @@ -879,6 +901,7 @@ name = "remoteterm-meshcore" version = "2.2.0" source = { virtual = "." } dependencies = [ + { name = "aiomqtt" }, { name = "aiosqlite" }, { name = "fastapi" }, { name = "meshcore" }, @@ -908,6 +931,7 @@ dev = [ [package.metadata] requires-dist = [ + { name = "aiomqtt", specifier = ">=2.0" }, { name = "aiosqlite", specifier = ">=0.19.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "httpx", marker = "extra == 'test'", specifier = ">=0.27.0" },