From 7cd54d14d85214b66f54fa411d3b4cf258aa6e1f Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Thu, 5 Mar 2026 17:16:13 -0800 Subject: [PATCH] Move to modular fanout bus --- app/community_mqtt.py | 47 -- app/fanout/AGENTS_fanout.md | 92 +++ app/fanout/__init__.py | 8 + app/fanout/base.py | 34 ++ app/fanout/manager.py | 162 ++++++ app/fanout/mqtt_community.py | 89 +++ app/fanout/mqtt_private.py | 62 ++ app/main.py | 17 +- app/migrations.py | 135 +++++ app/models.py | 13 + app/mqtt.py | 33 -- app/packet_processor.py | 2 + app/repository/__init__.py | 2 + app/repository/fanout.py | 137 +++++ app/routers/fanout.py | 165 ++++++ app/routers/health.py | 30 +- app/routers/settings.py | 133 ----- app/websocket.py | 21 +- frontend/src/api.ts | 32 ++ frontend/src/components/SettingsModal.tsx | 17 +- .../settings/SettingsFanoutSection.tsx | 505 +++++++++++++++++ .../settings/SettingsMqttSection.tsx | 451 --------------- .../components/settings/settingsConstants.ts | 6 +- frontend/src/test/settingsModal.test.tsx | 158 +----- frontend/src/types.ts | 34 +- tests/conftest.py | 2 +- tests/test_community_mqtt.py | 34 -- tests/test_fanout.py | 528 ++++++++++++++++++ tests/test_fanout_integration.py | 403 +++++++++++++ tests/test_health_mqtt_status.py | 110 +--- tests/test_migrations.py | 52 +- tests/test_mqtt.py | 114 +--- tests/test_settings_router.py | 124 ---- tests/test_websocket.py | 29 +- 34 files changed, 2489 insertions(+), 1292 deletions(-) create mode 100644 app/fanout/AGENTS_fanout.md create mode 100644 app/fanout/__init__.py create mode 100644 app/fanout/base.py create mode 100644 app/fanout/manager.py create mode 100644 app/fanout/mqtt_community.py create mode 100644 app/fanout/mqtt_private.py create mode 100644 app/repository/fanout.py create mode 100644 app/routers/fanout.py create mode 100644 frontend/src/components/settings/SettingsFanoutSection.tsx delete mode 100644 frontend/src/components/settings/SettingsMqttSection.tsx create mode 100644 tests/test_fanout.py create mode 100644 tests/test_fanout_integration.py diff --git a/app/community_mqtt.py b/app/community_mqtt.py index 72fe6a9..ba9836f 100644 --- a/app/community_mqtt.py +++ b/app/community_mqtt.py @@ -555,50 +555,3 @@ class CommunityMqttPublisher(BaseMqttPublisher): pass return False return True - - -# Module-level singleton -community_publisher = CommunityMqttPublisher() - - -def community_mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None: - """Fire-and-forget community MQTT publish for raw packets only.""" - if event_type != "raw_packet": - return - if not community_publisher.connected or community_publisher._settings is None: - return - asyncio.create_task(_community_maybe_publish(data)) - - -async def _community_maybe_publish(data: dict[str, Any]) -> None: - """Format and publish a raw packet to the community broker.""" - settings = community_publisher._settings - if settings is None or not settings.community_mqtt_enabled: - return - - try: - from app.keystore import get_public_key - from app.radio import radio_manager - - public_key = get_public_key() - if public_key is None: - return - - pubkey_hex = public_key.hex().upper() - - # Get device name from radio - device_name = "" - if radio_manager.meshcore and radio_manager.meshcore.self_info: - device_name = radio_manager.meshcore.self_info.get("name", "") - - packet = _format_raw_packet(data, device_name, pubkey_hex) - iata = settings.community_mqtt_iata.upper().strip() - if not _IATA_RE.fullmatch(iata): - logger.debug("Community MQTT: skipping publish — no valid IATA code configured") - return - topic = f"meshcore/{iata}/{pubkey_hex}/packets" - - await community_publisher.publish(topic, packet) - - except Exception as e: - logger.warning("Community MQTT broadcast error: %s", e) diff --git a/app/fanout/AGENTS_fanout.md b/app/fanout/AGENTS_fanout.md new file mode 100644 index 0000000..9e66d56 --- /dev/null +++ b/app/fanout/AGENTS_fanout.md @@ -0,0 +1,92 @@ +# Fanout Bus Architecture + +The fanout bus is a unified system for dispatching mesh radio events (decoded messages and raw packets) to external integrations. It replaces the previous scattered singleton MQTT publishers with a modular, configurable framework. + +## Core Concepts + +### FanoutModule (base.py) +Abstract base class that all integration modules implement: +- `start()` / `stop()` — lifecycle management +- `on_message(data)` — receive decoded messages +- `on_raw(data)` — receive raw packets +- `status` property — "connected" | "disconnected" + +### FanoutManager (manager.py) +Singleton that owns all active modules and dispatches events: +- `load_from_db()` — startup: load enabled configs, instantiate modules +- `reload_config(id)` — CRUD: stop old, start new +- `remove_config(id)` — delete: stop and remove +- `broadcast_message(data)` — scope-check + dispatch `on_message` +- `broadcast_raw(data)` — scope-check + dispatch `on_raw` +- `stop_all()` — shutdown +- `get_statuses()` — health endpoint data + +### Scope Matching +Each config has a `scope` JSON blob controlling what events reach it: +```json +{"messages": "all", "raw_packets": "all"} +{"messages": "none", "raw_packets": "all"} +{"messages": {"channels": ["key1"], "contacts": "all"}, "raw_packets": "none"} +``` +Community MQTT always enforces `{"messages": "none", "raw_packets": "all"}`. + +## Event Flow + +``` +Radio Event → packet_processor / event_handler + → broadcast_event("message"|"raw_packet", data, realtime=True) + → WebSocket broadcast (always) + → FanoutManager.broadcast_message/raw (only if realtime=True) + → scope check per module + → module.on_message / on_raw +``` + +Setting `realtime=False` (used during historical decryption) skips fanout dispatch entirely. + +## Current Module Types + +### mqtt_private (mqtt_private.py) +Wraps `MqttPublisher` from `app/mqtt.py`. Config blob: +- `broker_host`, `broker_port`, `username`, `password` +- `use_tls`, `tls_insecure`, `topic_prefix` + +### mqtt_community (mqtt_community.py) +Wraps `CommunityMqttPublisher` from `app/community_mqtt.py`. Config blob: +- `broker_host`, `broker_port`, `iata`, `email` +- Only publishes raw packets (on_message is a no-op) + +## Adding a New Integration Type + +1. Create `app/fanout/my_type.py` with a class extending `FanoutModule` +2. Register it in `manager.py` → `_register_module_types()` +3. Add validation in `app/routers/fanout.py` → `_VALID_TYPES` and validator function +4. Add frontend editor component in `SettingsFanoutSection.tsx` + +## REST API + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/api/fanout` | List all fanout configs | +| POST | `/api/fanout` | Create new config | +| PATCH | `/api/fanout/{id}` | Update config (triggers module reload) | +| DELETE | `/api/fanout/{id}` | Delete config (stops module) | + +## Database + +`fanout_configs` table (created in migration 36): +- `id` TEXT PRIMARY KEY +- `type`, `name`, `enabled`, `config` (JSON), `scope` (JSON) +- `sort_order`, `created_at` + +Migration 36 also migrates existing `app_settings` MQTT columns into fanout rows. + +## Key Files + +- `app/fanout/base.py` — FanoutModule ABC +- `app/fanout/manager.py` — FanoutManager singleton +- `app/fanout/mqtt_private.py` — Private MQTT module +- `app/fanout/mqtt_community.py` — Community MQTT module +- `app/repository/fanout.py` — Database CRUD +- `app/routers/fanout.py` — REST API +- `app/websocket.py` — `broadcast_event()` dispatches to fanout +- `frontend/src/components/settings/SettingsFanoutSection.tsx` — UI diff --git a/app/fanout/__init__.py b/app/fanout/__init__.py new file mode 100644 index 0000000..885a25b --- /dev/null +++ b/app/fanout/__init__.py @@ -0,0 +1,8 @@ +from app.fanout.base import FanoutModule +from app.fanout.manager import FanoutManager, fanout_manager + +__all__ = [ + "FanoutManager", + "FanoutModule", + "fanout_manager", +] diff --git a/app/fanout/base.py b/app/fanout/base.py new file mode 100644 index 0000000..9aa4acb --- /dev/null +++ b/app/fanout/base.py @@ -0,0 +1,34 @@ +"""Base class for fanout integration modules.""" + +from __future__ import annotations + + +class FanoutModule: + """Base class for all fanout integrations. + + Each module wraps a specific integration (MQTT, webhook, etc.) and + receives dispatched messages/packets from the FanoutManager. + + Subclasses must override the ``status`` property. + """ + + def __init__(self, config_id: str, config: dict) -> None: + self.config_id = config_id + self.config = config + + async def start(self) -> None: + """Start the module (e.g. connect to broker). Override for persistent connections.""" + + async def stop(self) -> None: + """Stop the module (e.g. disconnect from broker).""" + + async def on_message(self, data: dict) -> None: + """Called for decoded messages (DM/channel). Override if needed.""" + + async def on_raw(self, data: dict) -> None: + """Called for raw RF packets. Override if needed.""" + + @property + def status(self) -> str: + """Return 'connected', 'disconnected', or 'error'.""" + raise NotImplementedError diff --git a/app/fanout/manager.py b/app/fanout/manager.py new file mode 100644 index 0000000..06a070f --- /dev/null +++ b/app/fanout/manager.py @@ -0,0 +1,162 @@ +"""FanoutManager: owns all active fanout modules and dispatches events.""" + +from __future__ import annotations + +import logging +from typing import Any + +from app.fanout.base import FanoutModule + +logger = logging.getLogger(__name__) + +# Type string -> module class mapping (extended in Phase 2/3) +_MODULE_TYPES: dict[str, type] = {} + + +def _register_module_types() -> None: + """Lazily populate the type registry to avoid circular imports.""" + if _MODULE_TYPES: + return + from app.fanout.mqtt_community import MqttCommunityModule + from app.fanout.mqtt_private import MqttPrivateModule + + _MODULE_TYPES["mqtt_private"] = MqttPrivateModule + _MODULE_TYPES["mqtt_community"] = MqttCommunityModule + + +def _scope_matches_message(scope: dict, data: dict) -> bool: + """Check whether a message event matches the given scope.""" + messages = scope.get("messages", "none") + if messages == "all": + return True + if messages == "none": + return False + if isinstance(messages, dict): + msg_type = data.get("type", "") + conversation_key = data.get("conversation_key", "") + if msg_type == "CHAN": + channels = messages.get("channels", "none") + if channels == "all": + return True + if channels == "none": + return False + if isinstance(channels, list): + return conversation_key in channels + elif msg_type == "PRIV": + contacts = messages.get("contacts", "none") + if contacts == "all": + return True + if contacts == "none": + return False + if isinstance(contacts, list): + return conversation_key in contacts + return False + + +def _scope_matches_raw(scope: dict, _data: dict) -> bool: + """Check whether a raw packet event matches the given scope.""" + return scope.get("raw_packets", "none") == "all" + + +class FanoutManager: + """Owns all active fanout modules and dispatches events.""" + + def __init__(self) -> None: + self._modules: dict[str, tuple[FanoutModule, dict]] = {} # id -> (module, scope) + + async def load_from_db(self) -> None: + """Read enabled fanout_configs and instantiate modules.""" + _register_module_types() + from app.repository.fanout import FanoutConfigRepository + + configs = await FanoutConfigRepository.get_enabled() + for cfg in configs: + await self._start_module(cfg) + + async def _start_module(self, cfg: dict[str, Any]) -> None: + """Instantiate and start a single module from a config dict.""" + config_id = cfg["id"] + config_type = cfg["type"] + config_blob = cfg["config"] + scope = cfg["scope"] + + cls = _MODULE_TYPES.get(config_type) + if cls is None: + logger.warning("Unknown fanout type %r for config %s, skipping", config_type, config_id) + return + + try: + module = cls(config_id, config_blob) + await module.start() + self._modules[config_id] = (module, scope) + logger.info( + "Started fanout module %s (type=%s)", cfg.get("name", config_id), config_type + ) + except Exception: + logger.exception("Failed to start fanout module %s", config_id) + + async def reload_config(self, config_id: str) -> None: + """Stop old module (if any) and start updated config.""" + await self.remove_config(config_id) + + from app.repository.fanout import FanoutConfigRepository + + cfg = await FanoutConfigRepository.get(config_id) + if cfg is None or not cfg["enabled"]: + return + await self._start_module(cfg) + + async def remove_config(self, config_id: str) -> None: + """Stop and remove a module.""" + entry = self._modules.pop(config_id, None) + if entry is not None: + module, _ = entry + try: + await module.stop() + except Exception: + logger.exception("Error stopping fanout module %s", config_id) + + async def broadcast_message(self, data: dict) -> None: + """Dispatch a decoded message to modules whose scope matches.""" + for config_id, (module, scope) in list(self._modules.items()): + if _scope_matches_message(scope, data): + try: + await module.on_message(data) + except Exception: + logger.exception("Fanout %s on_message error", config_id) + + async def broadcast_raw(self, data: dict) -> None: + """Dispatch a raw packet to modules whose scope matches.""" + for config_id, (module, scope) in list(self._modules.items()): + if _scope_matches_raw(scope, data): + try: + await module.on_raw(data) + except Exception: + logger.exception("Fanout %s on_raw error", config_id) + + async def stop_all(self) -> None: + """Shutdown all modules.""" + for config_id, (module, _) in list(self._modules.items()): + try: + await module.stop() + except Exception: + logger.exception("Error stopping fanout module %s", config_id) + self._modules.clear() + + def get_statuses(self) -> dict[str, dict[str, str]]: + """Return status info for each active module.""" + from app.repository.fanout import _configs_cache + + result: dict[str, dict[str, str]] = {} + for config_id, (module, _) in self._modules.items(): + info = _configs_cache.get(config_id, {}) + result[config_id] = { + "name": info.get("name", config_id), + "type": info.get("type", "unknown"), + "status": module.status, + } + return result + + +# Module-level singleton +fanout_manager = FanoutManager() diff --git a/app/fanout/mqtt_community.py b/app/fanout/mqtt_community.py new file mode 100644 index 0000000..0cd5590 --- /dev/null +++ b/app/fanout/mqtt_community.py @@ -0,0 +1,89 @@ +"""Fanout module wrapping the community MQTT publisher.""" + +from __future__ import annotations + +import logging +import re +from typing import Any + +from app.community_mqtt import CommunityMqttPublisher, _format_raw_packet +from app.fanout.base import FanoutModule +from app.models import AppSettings + +logger = logging.getLogger(__name__) + +_IATA_RE = re.compile(r"^[A-Z]{3}$") + + +def _config_to_settings(config: dict) -> AppSettings: + """Map a fanout config blob to AppSettings for the CommunityMqttPublisher.""" + return AppSettings( + community_mqtt_enabled=True, + community_mqtt_broker_host=config.get("broker_host", "mqtt-us-v1.letsmesh.net"), + community_mqtt_broker_port=config.get("broker_port", 443), + community_mqtt_iata=config.get("iata", ""), + community_mqtt_email=config.get("email", ""), + ) + + +class MqttCommunityModule(FanoutModule): + """Wraps a CommunityMqttPublisher for community packet sharing.""" + + def __init__(self, config_id: str, config: dict) -> None: + super().__init__(config_id, config) + self._publisher = CommunityMqttPublisher() + + async def start(self) -> None: + settings = _config_to_settings(self.config) + await self._publisher.start(settings) + + async def stop(self) -> None: + await self._publisher.stop() + + async def on_message(self, data: dict) -> None: + # Community MQTT only publishes raw packets, not decoded messages. + pass + + async def on_raw(self, data: dict) -> None: + if not self._publisher.connected or self._publisher._settings is None: + return + await _publish_community_packet(self._publisher, self.config, data) + + @property + def status(self) -> str: + if self._publisher._is_configured(): + return "connected" if self._publisher.connected else "disconnected" + return "disconnected" + + +async def _publish_community_packet( + publisher: CommunityMqttPublisher, + config: dict, + data: dict[str, Any], +) -> None: + """Format and publish a raw packet to the community broker.""" + try: + from app.keystore import get_public_key + from app.radio import radio_manager + + public_key = get_public_key() + if public_key is None: + return + + pubkey_hex = public_key.hex().upper() + + device_name = "" + if radio_manager.meshcore and radio_manager.meshcore.self_info: + device_name = radio_manager.meshcore.self_info.get("name", "") + + packet = _format_raw_packet(data, device_name, pubkey_hex) + iata = config.get("iata", "").upper().strip() + if not _IATA_RE.fullmatch(iata): + logger.debug("Community MQTT: skipping publish — no valid IATA code configured") + return + topic = f"meshcore/{iata}/{pubkey_hex}/packets" + + await publisher.publish(topic, packet) + + except Exception as e: + logger.warning("Community MQTT broadcast error: %s", e) diff --git a/app/fanout/mqtt_private.py b/app/fanout/mqtt_private.py new file mode 100644 index 0000000..b016282 --- /dev/null +++ b/app/fanout/mqtt_private.py @@ -0,0 +1,62 @@ +"""Fanout module wrapping the private MQTT publisher.""" + +from __future__ import annotations + +import logging + +from app.fanout.base import FanoutModule +from app.models import AppSettings +from app.mqtt import MqttPublisher, _build_message_topic, _build_raw_packet_topic + +logger = logging.getLogger(__name__) + + +def _config_to_settings(config: dict) -> AppSettings: + """Map a fanout config blob to AppSettings for the MqttPublisher.""" + return AppSettings( + mqtt_broker_host=config.get("broker_host", ""), + mqtt_broker_port=config.get("broker_port", 1883), + mqtt_username=config.get("username", ""), + mqtt_password=config.get("password", ""), + mqtt_use_tls=config.get("use_tls", False), + mqtt_tls_insecure=config.get("tls_insecure", False), + mqtt_topic_prefix=config.get("topic_prefix", "meshcore"), + # Always enable both publish flags; the fanout scope controls delivery. + mqtt_publish_messages=True, + mqtt_publish_raw_packets=True, + ) + + +class MqttPrivateModule(FanoutModule): + """Wraps an MqttPublisher instance for private MQTT forwarding.""" + + def __init__(self, config_id: str, config: dict) -> None: + super().__init__(config_id, config) + self._publisher = MqttPublisher() + + async def start(self) -> None: + settings = _config_to_settings(self.config) + await self._publisher.start(settings) + + async def stop(self) -> None: + await self._publisher.stop() + + async def on_message(self, data: dict) -> None: + if not self._publisher.connected or self._publisher._settings is None: + return + prefix = self.config.get("topic_prefix", "meshcore") + topic = _build_message_topic(prefix, data) + await self._publisher.publish(topic, data) + + async def on_raw(self, data: dict) -> None: + if not self._publisher.connected or self._publisher._settings is None: + return + prefix = self.config.get("topic_prefix", "meshcore") + topic = _build_raw_packet_topic(prefix, data) + await self._publisher.publish(topic, data) + + @property + def status(self) -> str: + if not self.config.get("broker_host"): + return "disconnected" + return "connected" if self._publisher.connected else "disconnected" diff --git a/app/main.py b/app/main.py index 4914e4a..b1f1e6f 100644 --- a/app/main.py +++ b/app/main.py @@ -18,6 +18,7 @@ from app.radio_sync import ( from app.routers import ( channels, contacts, + fanout, health, messages, packets, @@ -56,23 +57,18 @@ async def lifespan(app: FastAPI): # Always start connection monitor (even if initial connection failed) await radio_manager.start_connection_monitor() - # Start MQTT publishers if configured - from app.community_mqtt import community_publisher - from app.mqtt import mqtt_publisher - from app.repository import AppSettingsRepository + # Start fanout modules (MQTT, etc.) from database configs + from app.fanout.manager import fanout_manager try: - mqtt_settings = await AppSettingsRepository.get() - await mqtt_publisher.start(mqtt_settings) - await community_publisher.start(mqtt_settings) + await fanout_manager.load_from_db() except Exception as e: - logger.warning("Failed to start MQTT publisher(s): %s", e) + logger.warning("Failed to start fanout modules: %s", e) yield logger.info("Shutting down") - await community_publisher.stop() - await mqtt_publisher.stop() + await fanout_manager.stop_all() await radio_manager.stop_connection_monitor() await stop_message_polling() await stop_periodic_advert() @@ -119,6 +115,7 @@ async def radio_disconnected_handler(request: Request, exc: RadioDisconnectedErr # API routes - all prefixed with /api for production compatibility app.include_router(health.router, prefix="/api") +app.include_router(fanout.router, prefix="/api") app.include_router(radio.router, prefix="/api") app.include_router(contacts.router, prefix="/api") app.include_router(repeaters.router, prefix="/api") diff --git a/app/migrations.py b/app/migrations.py index 2546092..65168db 100644 --- a/app/migrations.py +++ b/app/migrations.py @@ -282,6 +282,13 @@ async def run_migrations(conn: aiosqlite.Connection) -> int: await set_version(conn, 35) applied += 1 + # Migration 36: Create fanout_configs table and migrate existing MQTT settings + if version < 36: + logger.info("Applying migration 36: create fanout_configs and migrate MQTT settings") + await _migrate_036_create_fanout_configs(conn) + await set_version(conn, 36) + applied += 1 + if applied > 0: logger.info( "Applied %d migration(s), schema now at version %d", applied, await get_version(conn) @@ -2014,3 +2021,131 @@ async def _migrate_035_add_block_lists(conn: aiosqlite.Connection) -> None: raise await conn.commit() + + +async def _migrate_036_create_fanout_configs(conn: aiosqlite.Connection) -> None: + """Create fanout_configs table and migrate existing MQTT settings. + + Reads existing MQTT settings from app_settings and creates corresponding + fanout_configs rows. Old columns are NOT dropped (rollback safety). + """ + import json + import uuid + + # 1. Create fanout_configs table + await conn.execute( + """ + CREATE TABLE IF NOT EXISTS fanout_configs ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + name TEXT NOT NULL, + enabled INTEGER DEFAULT 0, + config TEXT NOT NULL DEFAULT '{}', + scope TEXT NOT NULL DEFAULT '{}', + sort_order INTEGER DEFAULT 0, + created_at INTEGER NOT NULL + ) + """ + ) + + # 2. Read existing MQTT settings + try: + cursor = await conn.execute( + """ + SELECT mqtt_broker_host, mqtt_broker_port, mqtt_username, mqtt_password, + mqtt_use_tls, mqtt_tls_insecure, mqtt_topic_prefix, + mqtt_publish_messages, mqtt_publish_raw_packets, + community_mqtt_enabled, community_mqtt_iata, + community_mqtt_broker_host, community_mqtt_broker_port, + community_mqtt_email + FROM app_settings WHERE id = 1 + """ + ) + row = await cursor.fetchone() + except Exception: + row = None + + if row is None: + await conn.commit() + return + + import time + + now = int(time.time()) + sort_order = 0 + + # 3. Migrate private MQTT if configured + broker_host = row["mqtt_broker_host"] or "" + if broker_host: + publish_messages = bool(row["mqtt_publish_messages"]) + publish_raw = bool(row["mqtt_publish_raw_packets"]) + enabled = publish_messages or publish_raw + + config = { + "broker_host": broker_host, + "broker_port": row["mqtt_broker_port"] or 1883, + "username": row["mqtt_username"] or "", + "password": row["mqtt_password"] or "", + "use_tls": bool(row["mqtt_use_tls"]), + "tls_insecure": bool(row["mqtt_tls_insecure"]), + "topic_prefix": row["mqtt_topic_prefix"] or "meshcore", + } + + scope = { + "messages": "all" if publish_messages else "none", + "raw_packets": "all" if publish_raw else "none", + } + + await conn.execute( + """ + INSERT INTO fanout_configs (id, type, name, enabled, config, scope, sort_order, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(uuid.uuid4()), + "mqtt_private", + "Private MQTT", + 1 if enabled else 0, + json.dumps(config), + json.dumps(scope), + sort_order, + now, + ), + ) + sort_order += 1 + logger.info("Migrated private MQTT settings to fanout_configs (enabled=%s)", enabled) + + # 4. Migrate community MQTT if enabled + community_enabled = bool(row["community_mqtt_enabled"]) + if community_enabled: + config = { + "broker_host": row["community_mqtt_broker_host"] or "mqtt-us-v1.letsmesh.net", + "broker_port": row["community_mqtt_broker_port"] or 443, + "iata": row["community_mqtt_iata"] or "", + "email": row["community_mqtt_email"] or "", + } + + scope = { + "messages": "none", + "raw_packets": "all", + } + + await conn.execute( + """ + INSERT INTO fanout_configs (id, type, name, enabled, config, scope, sort_order, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + str(uuid.uuid4()), + "mqtt_community", + "Community MQTT", + 1, + json.dumps(config), + json.dumps(scope), + sort_order, + now, + ), + ) + logger.info("Migrated community MQTT settings to fanout_configs") + + await conn.commit() diff --git a/app/models.py b/app/models.py index f2e1755..43bfabb 100644 --- a/app/models.py +++ b/app/models.py @@ -533,6 +533,19 @@ class AppSettings(BaseModel): ) +class FanoutConfig(BaseModel): + """Configuration for a single fanout integration.""" + + id: str + type: str # 'mqtt_private' | 'mqtt_community' + name: str + enabled: bool + config: dict + scope: dict + sort_order: int = 0 + created_at: int = 0 + + class BusyChannel(BaseModel): channel_key: str channel_name: str diff --git a/app/mqtt.py b/app/mqtt.py index 79a6e3f..fb96989 100644 --- a/app/mqtt.py +++ b/app/mqtt.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio import logging import ssl from typing import Any @@ -54,38 +53,6 @@ class MqttPublisher(BaseMqttPublisher): return ctx -# Module-level singleton -mqtt_publisher = MqttPublisher() - - -def mqtt_broadcast(event_type: str, data: dict[str, Any]) -> None: - """Fire-and-forget MQTT publish, matching broadcast_event's pattern.""" - if event_type not in ("message", "raw_packet"): - return - if not mqtt_publisher.connected or mqtt_publisher._settings is None: - return - asyncio.create_task(_mqtt_maybe_publish(event_type, data)) - - -async def _mqtt_maybe_publish(event_type: str, data: dict[str, Any]) -> None: - """Check settings and build topic, then publish.""" - settings = mqtt_publisher._settings - if settings is None: - return - - try: - if event_type == "message" and settings.mqtt_publish_messages: - topic = _build_message_topic(settings.mqtt_topic_prefix, data) - await mqtt_publisher.publish(topic, data) - - elif event_type == "raw_packet" and settings.mqtt_publish_raw_packets: - topic = _build_raw_packet_topic(settings.mqtt_topic_prefix, data) - await mqtt_publisher.publish(topic, data) - - except Exception as e: - logger.warning("MQTT broadcast error: %s", e) - - def _build_message_topic(prefix: str, data: dict[str, Any]) -> str: """Build MQTT topic for a decrypted message.""" msg_type = data.get("type", "") diff --git a/app/packet_processor.py b/app/packet_processor.py index 3e40a79..1264345 100644 --- a/app/packet_processor.py +++ b/app/packet_processor.py @@ -209,6 +209,7 @@ async def create_message_from_decrypted( sender_name=sender, sender_key=resolved_sender_key, ).model_dump(), + realtime=trigger_bot, ) # Run bot if enabled (for incoming channel messages, not historical decryption) @@ -332,6 +333,7 @@ async def create_dm_message_from_decrypted( sender_name=sender_name, sender_key=conversation_key if not outgoing else None, ).model_dump(), + realtime=trigger_bot, ) # Update contact's last_contacted timestamp (for sorting) diff --git a/app/repository/__init__.py b/app/repository/__init__.py index 0058956..cb34f9c 100644 --- a/app/repository/__init__.py +++ b/app/repository/__init__.py @@ -5,6 +5,7 @@ from app.repository.contacts import ( ContactNameHistoryRepository, ContactRepository, ) +from app.repository.fanout import FanoutConfigRepository from app.repository.messages import MessageRepository from app.repository.raw_packets import RawPacketRepository from app.repository.settings import AppSettingsRepository, StatisticsRepository @@ -16,6 +17,7 @@ __all__ = [ "ContactAdvertPathRepository", "ContactNameHistoryRepository", "ContactRepository", + "FanoutConfigRepository", "MessageRepository", "RawPacketRepository", "StatisticsRepository", diff --git a/app/repository/fanout.py b/app/repository/fanout.py new file mode 100644 index 0000000..76fb31d --- /dev/null +++ b/app/repository/fanout.py @@ -0,0 +1,137 @@ +"""Repository for fanout_configs table.""" + +import json +import logging +import time +import uuid +from typing import Any + +from app.database import db + +logger = logging.getLogger(__name__) + +# In-memory cache of config metadata (name, type) for status reporting. +# Populated by get_all/get/create/update and read by FanoutManager.get_statuses(). +_configs_cache: dict[str, dict[str, Any]] = {} + + +def _row_to_dict(row: Any) -> dict[str, Any]: + """Convert a database row to a config dict.""" + result = { + "id": row["id"], + "type": row["type"], + "name": row["name"], + "enabled": bool(row["enabled"]), + "config": json.loads(row["config"]) if row["config"] else {}, + "scope": json.loads(row["scope"]) if row["scope"] else {}, + "sort_order": row["sort_order"] or 0, + "created_at": row["created_at"] or 0, + } + _configs_cache[result["id"]] = result + return result + + +class FanoutConfigRepository: + """CRUD operations for fanout_configs table.""" + + @staticmethod + async def get_all() -> list[dict[str, Any]]: + """Get all fanout configs ordered by sort_order.""" + cursor = await db.conn.execute( + "SELECT * FROM fanout_configs ORDER BY sort_order, created_at" + ) + rows = await cursor.fetchall() + return [_row_to_dict(row) for row in rows] + + @staticmethod + async def get(config_id: str) -> dict[str, Any] | None: + """Get a single fanout config by ID.""" + cursor = await db.conn.execute("SELECT * FROM fanout_configs WHERE id = ?", (config_id,)) + row = await cursor.fetchone() + if row is None: + return None + return _row_to_dict(row) + + @staticmethod + async def create( + config_type: str, + name: str, + config: dict, + scope: dict, + enabled: bool = True, + config_id: str | None = None, + ) -> dict[str, Any]: + """Create a new fanout config.""" + new_id = config_id or str(uuid.uuid4()) + now = int(time.time()) + + # Get next sort_order + cursor = await db.conn.execute( + "SELECT COALESCE(MAX(sort_order), -1) + 1 FROM fanout_configs" + ) + row = await cursor.fetchone() + sort_order = row[0] if row else 0 + + await db.conn.execute( + """ + INSERT INTO fanout_configs (id, type, name, enabled, config, scope, sort_order, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + new_id, + config_type, + name, + 1 if enabled else 0, + json.dumps(config), + json.dumps(scope), + sort_order, + now, + ), + ) + await db.conn.commit() + + result = await FanoutConfigRepository.get(new_id) + assert result is not None + return result + + @staticmethod + async def update(config_id: str, **fields: Any) -> dict[str, Any] | None: + """Update a fanout config. Only provided fields are updated.""" + updates = [] + params: list[Any] = [] + + for field in ("name", "enabled", "config", "scope", "sort_order"): + if field in fields: + value = fields[field] + if field == "enabled": + value = 1 if value else 0 + elif field in ("config", "scope"): + value = json.dumps(value) + updates.append(f"{field} = ?") + params.append(value) + + if not updates: + return await FanoutConfigRepository.get(config_id) + + params.append(config_id) + query = f"UPDATE fanout_configs SET {', '.join(updates)} WHERE id = ?" + await db.conn.execute(query, params) + await db.conn.commit() + + return await FanoutConfigRepository.get(config_id) + + @staticmethod + async def delete(config_id: str) -> None: + """Delete a fanout config.""" + await db.conn.execute("DELETE FROM fanout_configs WHERE id = ?", (config_id,)) + await db.conn.commit() + _configs_cache.pop(config_id, None) + + @staticmethod + async def get_enabled() -> list[dict[str, Any]]: + """Get all enabled fanout configs.""" + cursor = await db.conn.execute( + "SELECT * FROM fanout_configs WHERE enabled = 1 ORDER BY sort_order, created_at" + ) + rows = await cursor.fetchall() + return [_row_to_dict(row) for row in rows] diff --git a/app/routers/fanout.py b/app/routers/fanout.py new file mode 100644 index 0000000..4aa5c6b --- /dev/null +++ b/app/routers/fanout.py @@ -0,0 +1,165 @@ +"""REST API for fanout config CRUD.""" + +import logging +import re + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel, Field + +from app.repository.fanout import FanoutConfigRepository + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/fanout", tags=["fanout"]) + +# Valid types in Phase 1 +_VALID_TYPES = {"mqtt_private", "mqtt_community"} + +_IATA_RE = re.compile(r"^[A-Z]{3}$") + + +class FanoutConfigCreate(BaseModel): + type: str = Field(description="Integration type: 'mqtt_private' or 'mqtt_community'") + name: str = Field(min_length=1, description="User-assigned label") + config: dict = Field(default_factory=dict, description="Type-specific config blob") + scope: dict = Field(default_factory=dict, description="Scope controls") + enabled: bool = Field(default=True, description="Whether enabled on creation") + + +class FanoutConfigUpdate(BaseModel): + name: str | None = Field(default=None, description="Updated label") + config: dict | None = Field(default=None, description="Updated config blob") + scope: dict | None = Field(default=None, description="Updated scope controls") + enabled: bool | None = Field(default=None, description="Enable/disable toggle") + + +def _validate_mqtt_private_config(config: dict) -> None: + """Validate mqtt_private config blob.""" + if not config.get("broker_host"): + raise HTTPException(status_code=400, detail="broker_host is required for mqtt_private") + port = config.get("broker_port", 1883) + if not isinstance(port, int) or port < 1 or port > 65535: + raise HTTPException(status_code=400, detail="broker_port must be between 1 and 65535") + + +def _validate_mqtt_community_config(config: dict) -> None: + """Validate mqtt_community config blob.""" + iata = config.get("iata", "") + if iata and not _IATA_RE.fullmatch(iata.upper().strip()): + raise HTTPException( + status_code=400, + detail="IATA code must be exactly 3 uppercase alphabetic characters", + ) + + +def _enforce_scope(config_type: str, scope: dict) -> dict: + """Enforce type-specific scope constraints. Returns normalized scope.""" + if config_type == "mqtt_community": + # Community MQTT always: no messages, all raw packets + return {"messages": "none", "raw_packets": "all"} + # For mqtt_private, validate scope values + messages = scope.get("messages", "all") + if messages not in ("all", "none") and not isinstance(messages, dict): + messages = "all" + raw_packets = scope.get("raw_packets", "all") + if raw_packets not in ("all", "none"): + raw_packets = "all" + return {"messages": messages, "raw_packets": raw_packets} + + +@router.get("") +async def list_fanout_configs() -> list[dict]: + """List all fanout configs.""" + return await FanoutConfigRepository.get_all() + + +@router.post("") +async def create_fanout_config(body: FanoutConfigCreate) -> dict: + """Create a new fanout config.""" + if body.type not in _VALID_TYPES: + raise HTTPException( + status_code=400, + detail=f"Invalid type '{body.type}'. Must be one of: {', '.join(sorted(_VALID_TYPES))}", + ) + + # Only validate config when creating as enabled — disabled configs + # are drafts the user hasn't finished configuring yet. + if body.enabled: + if body.type == "mqtt_private": + _validate_mqtt_private_config(body.config) + elif body.type == "mqtt_community": + _validate_mqtt_community_config(body.config) + + scope = _enforce_scope(body.type, body.scope) + + cfg = await FanoutConfigRepository.create( + config_type=body.type, + name=body.name, + config=body.config, + scope=scope, + enabled=body.enabled, + ) + + # Start the module if enabled + if cfg["enabled"]: + from app.fanout.manager import fanout_manager + + await fanout_manager.reload_config(cfg["id"]) + + logger.info("Created fanout config %s (type=%s, name=%s)", cfg["id"], body.type, body.name) + return cfg + + +@router.patch("/{config_id}") +async def update_fanout_config(config_id: str, body: FanoutConfigUpdate) -> dict: + """Update a fanout config. Triggers module reload.""" + existing = await FanoutConfigRepository.get(config_id) + if existing is None: + raise HTTPException(status_code=404, detail="Fanout config not found") + + kwargs = {} + if body.name is not None: + kwargs["name"] = body.name + if body.enabled is not None: + kwargs["enabled"] = body.enabled + if body.config is not None: + kwargs["config"] = body.config + if body.scope is not None: + kwargs["scope"] = _enforce_scope(existing["type"], body.scope) + + # Validate config when the result will be enabled + will_be_enabled = body.enabled if body.enabled is not None else existing["enabled"] + if will_be_enabled: + config_to_validate = body.config if body.config is not None else existing["config"] + if existing["type"] == "mqtt_private": + _validate_mqtt_private_config(config_to_validate) + elif existing["type"] == "mqtt_community": + _validate_mqtt_community_config(config_to_validate) + + updated = await FanoutConfigRepository.update(config_id, **kwargs) + if updated is None: + raise HTTPException(status_code=404, detail="Fanout config not found") + + # Reload the module to pick up changes + from app.fanout.manager import fanout_manager + + await fanout_manager.reload_config(config_id) + + logger.info("Updated fanout config %s", config_id) + return updated + + +@router.delete("/{config_id}") +async def delete_fanout_config(config_id: str) -> dict: + """Delete a fanout config.""" + existing = await FanoutConfigRepository.get(config_id) + if existing is None: + raise HTTPException(status_code=404, detail="Fanout config not found") + + # Stop the module first + from app.fanout.manager import fanout_manager + + await fanout_manager.remove_config(config_id) + await FanoutConfigRepository.delete(config_id) + + logger.info("Deleted fanout config %s", config_id) + return {"deleted": True} diff --git a/app/routers/health.py b/app/routers/health.py index 5be6e6b..39a53c9 100644 --- a/app/routers/health.py +++ b/app/routers/health.py @@ -1,4 +1,5 @@ import os +from typing import Any from fastapi import APIRouter from pydantic import BaseModel @@ -16,8 +17,7 @@ class HealthResponse(BaseModel): connection_info: str | None database_size_mb: float oldest_undecrypted_timestamp: int | None - mqtt_status: str | None = None - community_mqtt_status: str | None = None + fanout_statuses: dict[str, dict[str, str]] = {} bots_disabled: bool = False @@ -36,27 +36,12 @@ async def build_health_data(radio_connected: bool, connection_info: str | None) except RuntimeError: pass # Database not connected - # MQTT status - mqtt_status: str | None = None + # Fanout module statuses + fanout_statuses: dict[str, Any] = {} try: - from app.mqtt import mqtt_publisher + from app.fanout.manager import fanout_manager - if mqtt_publisher._is_configured(): - mqtt_status = "connected" if mqtt_publisher.connected else "disconnected" - else: - mqtt_status = "disabled" - except Exception: - pass - - # Community MQTT status - community_mqtt_status: str | None = None - try: - from app.community_mqtt import community_publisher - - if community_publisher._is_configured(): - community_mqtt_status = "connected" if community_publisher.connected else "disconnected" - else: - community_mqtt_status = "disabled" + fanout_statuses = fanout_manager.get_statuses() except Exception: pass @@ -66,8 +51,7 @@ async def build_health_data(radio_connected: bool, connection_info: str | None) "connection_info": connection_info, "database_size_mb": db_size_mb, "oldest_undecrypted_timestamp": oldest_ts, - "mqtt_status": mqtt_status, - "community_mqtt_status": community_mqtt_status, + "fanout_statuses": fanout_statuses, "bots_disabled": settings.disable_bots, } diff --git a/app/routers/settings.py b/app/routers/settings.py index 55e20b3..56cf6de 100644 --- a/app/routers/settings.py +++ b/app/routers/settings.py @@ -1,6 +1,5 @@ import asyncio import logging -import re from typing import Literal from fastapi import APIRouter, HTTPException @@ -61,66 +60,6 @@ class AppSettingsUpdate(BaseModel): default=None, description="List of bot configurations", ) - mqtt_broker_host: str | None = Field( - default=None, - description="MQTT broker hostname (empty = disabled)", - ) - mqtt_broker_port: int | None = Field( - default=None, - ge=1, - le=65535, - description="MQTT broker port", - ) - mqtt_username: str | None = Field( - default=None, - description="MQTT username (optional)", - ) - mqtt_password: str | None = Field( - default=None, - description="MQTT password (optional)", - ) - mqtt_use_tls: bool | None = Field( - default=None, - description="Whether to use TLS for MQTT connection", - ) - mqtt_tls_insecure: bool | None = Field( - default=None, - description="Skip TLS certificate verification (for self-signed certs)", - ) - mqtt_topic_prefix: str | None = Field( - default=None, - description="MQTT topic prefix", - ) - mqtt_publish_messages: bool | None = Field( - default=None, - description="Whether to publish decrypted messages to MQTT", - ) - mqtt_publish_raw_packets: bool | None = Field( - default=None, - description="Whether to publish raw packets to MQTT", - ) - community_mqtt_enabled: bool | None = Field( - default=None, - description="Whether to publish raw packets to the community MQTT broker", - ) - community_mqtt_iata: str | None = Field( - default=None, - description="IATA region code for community MQTT topic routing (3 alpha chars)", - ) - community_mqtt_broker_host: str | None = Field( - default=None, - description="Community MQTT broker hostname", - ) - community_mqtt_broker_port: int | None = Field( - default=None, - ge=1, - le=65535, - description="Community MQTT broker port", - ) - community_mqtt_email: str | None = Field( - default=None, - description="Email address for node claiming on the community aggregator", - ) flood_scope: str | None = Field( default=None, description="Outbound flood scope / region name (empty = disabled)", @@ -210,53 +149,6 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: logger.info("Updating bots (count=%d)", len(update.bots)) kwargs["bots"] = update.bots - # MQTT fields - mqtt_fields = [ - "mqtt_broker_host", - "mqtt_broker_port", - "mqtt_username", - "mqtt_password", - "mqtt_use_tls", - "mqtt_tls_insecure", - "mqtt_topic_prefix", - "mqtt_publish_messages", - "mqtt_publish_raw_packets", - ] - mqtt_changed = False - for field in mqtt_fields: - value = getattr(update, field) - if value is not None: - kwargs[field] = value - mqtt_changed = True - - # Community MQTT fields - community_mqtt_changed = False - if update.community_mqtt_enabled is not None: - kwargs["community_mqtt_enabled"] = update.community_mqtt_enabled - community_mqtt_changed = True - - if update.community_mqtt_iata is not None: - iata = update.community_mqtt_iata.upper().strip() - if iata and not re.fullmatch(r"[A-Z]{3}", iata): - raise HTTPException( - status_code=400, - detail="IATA code must be exactly 3 uppercase alphabetic characters", - ) - kwargs["community_mqtt_iata"] = iata - community_mqtt_changed = True - - if update.community_mqtt_broker_host is not None: - kwargs["community_mqtt_broker_host"] = update.community_mqtt_broker_host - community_mqtt_changed = True - - if update.community_mqtt_broker_port is not None: - kwargs["community_mqtt_broker_port"] = update.community_mqtt_broker_port - community_mqtt_changed = True - - if update.community_mqtt_email is not None: - kwargs["community_mqtt_email"] = update.community_mqtt_email - community_mqtt_changed = True - # Block lists if update.blocked_keys is not None: kwargs["blocked_keys"] = [k.lower() for k in update.blocked_keys] @@ -270,34 +162,9 @@ async def update_settings(update: AppSettingsUpdate) -> AppSettings: kwargs["flood_scope"] = stripped flood_scope_changed = True - # Require IATA when enabling community MQTT - if kwargs.get("community_mqtt_enabled", False): - # Check the IATA value being set, or fall back to current settings - iata_value = kwargs.get("community_mqtt_iata") - if iata_value is None: - current = await AppSettingsRepository.get() - iata_value = current.community_mqtt_iata - if not iata_value or not re.fullmatch(r"[A-Z]{3}", iata_value): - raise HTTPException( - status_code=400, - detail="A valid IATA region code is required to enable community sharing", - ) - if kwargs: result = await AppSettingsRepository.update(**kwargs) - # Restart MQTT publisher if any MQTT settings changed - if mqtt_changed: - from app.mqtt import mqtt_publisher - - await mqtt_publisher.restart(result) - - # Restart community MQTT publisher if any community settings changed - if community_mqtt_changed: - from app.community_mqtt import community_publisher - - await community_publisher.restart(result) - # Apply flood scope to radio immediately if changed if flood_scope_changed: from app.radio import radio_manager diff --git a/app/websocket.py b/app/websocket.py index 1638035..3ceb705 100644 --- a/app/websocket.py +++ b/app/websocket.py @@ -92,21 +92,26 @@ class WebSocketManager: ws_manager = WebSocketManager() -def broadcast_event(event_type: str, data: dict) -> None: +def broadcast_event(event_type: str, data: dict, *, realtime: bool = True) -> None: """Schedule a broadcast without blocking. Convenience function that creates an asyncio task to broadcast - an event to all connected WebSocket clients and forward to MQTT. + an event to all connected WebSocket clients and forward to fanout modules. + + Args: + event_type: Event type string (e.g. "message", "raw_packet") + data: Event payload dict + realtime: If False, skip fanout dispatch (used for historical decryption) """ asyncio.create_task(ws_manager.broadcast(event_type, data)) - from app.mqtt import mqtt_broadcast + if realtime: + from app.fanout.manager import fanout_manager - mqtt_broadcast(event_type, data) - - from app.community_mqtt import community_mqtt_broadcast - - community_mqtt_broadcast(event_type, data) + if event_type == "message": + asyncio.create_task(fanout_manager.broadcast_message(data)) + elif event_type == "raw_packet": + asyncio.create_task(fanout_manager.broadcast_raw(data)) def broadcast_error(message: str, details: str | None = None) -> None: diff --git a/frontend/src/api.ts b/frontend/src/api.ts index d2ca63a..b4b4d11 100644 --- a/frontend/src/api.ts +++ b/frontend/src/api.ts @@ -8,6 +8,7 @@ import type { ContactAdvertPath, ContactAdvertPathSummary, ContactDetail, + FanoutConfig, Favorite, HealthStatus, MaintenanceResult, @@ -280,6 +281,37 @@ export const api = { body: JSON.stringify(request), }), + // Fanout + getFanoutConfigs: () => fetchJson('/fanout'), + createFanoutConfig: (config: { + type: string; + name: string; + config: Record; + scope: Record; + enabled?: boolean; + }) => + fetchJson('/fanout', { + method: 'POST', + body: JSON.stringify(config), + }), + updateFanoutConfig: ( + id: string, + update: { + name?: string; + config?: Record; + scope?: Record; + enabled?: boolean; + } + ) => + fetchJson(`/fanout/${id}`, { + method: 'PATCH', + body: JSON.stringify(update), + }), + deleteFanoutConfig: (id: string) => + fetchJson<{ deleted: boolean }>(`/fanout/${id}`, { + method: 'DELETE', + }), + // Statistics getStatistics: () => fetchJson('/statistics'), diff --git a/frontend/src/components/SettingsModal.tsx b/frontend/src/components/SettingsModal.tsx index dc94801..174b907 100644 --- a/frontend/src/components/SettingsModal.tsx +++ b/frontend/src/components/SettingsModal.tsx @@ -11,7 +11,7 @@ import { SETTINGS_SECTION_LABELS, type SettingsSection } from './settings/settin import { SettingsRadioSection } from './settings/SettingsRadioSection'; import { SettingsLocalSection } from './settings/SettingsLocalSection'; -import { SettingsMqttSection } from './settings/SettingsMqttSection'; +import { SettingsFanoutSection } from './settings/SettingsFanoutSection'; import { SettingsDatabaseSection } from './settings/SettingsDatabaseSection'; import { SettingsBotSection } from './settings/SettingsBotSection'; import { SettingsStatisticsSection } from './settings/SettingsStatisticsSection'; @@ -78,7 +78,7 @@ export function SettingsModal(props: SettingsModalProps) { const [expandedSections, setExpandedSections] = useState>({ radio: false, local: false, - mqtt: false, + fanout: false, database: false, bot: false, statistics: false, @@ -232,16 +232,11 @@ export function SettingsModal(props: SettingsModalProps) { )} - {shouldRenderSection('mqtt') && ( + {shouldRenderSection('fanout') && (
- {renderSectionHeader('mqtt')} - {isSectionVisible('mqtt') && appSettings && ( - + {renderSectionHeader('fanout')} + {isSectionVisible('fanout') && ( + )}
)} diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx new file mode 100644 index 0000000..e804ac6 --- /dev/null +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -0,0 +1,505 @@ +import { useState, useEffect, useCallback } from 'react'; +import { Input } from '../ui/input'; +import { Label } from '../ui/label'; +import { Button } from '../ui/button'; +import { Separator } from '../ui/separator'; +import { toast } from '../ui/sonner'; +import { cn } from '@/lib/utils'; +import { api } from '../../api'; +import type { FanoutConfig, HealthStatus } from '../../types'; + +const TYPE_LABELS: Record = { + mqtt_private: 'Private MQTT', + mqtt_community: 'Community MQTT', +}; + +const TYPE_OPTIONS = [ + { value: 'mqtt_private', label: 'Private MQTT' }, + { value: 'mqtt_community', label: 'Community MQTT' }, +]; + +function getStatusColor(status: string | undefined) { + if (status === 'connected') + return 'bg-status-connected shadow-[0_0_6px_hsl(var(--status-connected)/0.5)]'; + return 'bg-muted-foreground'; +} + +function getStatusLabel(status: string | undefined) { + if (status === 'connected') return 'Connected'; + if (status === 'disconnected') return 'Disconnected'; + return 'Inactive'; +} + +function MqttPrivateConfigEditor({ + config, + scope, + onChange, + onScopeChange, +}: { + config: Record; + scope: Record; + onChange: (config: Record) => void; + onScopeChange: (scope: Record) => void; +}) { + return ( +
+

+ Forward mesh data to your own MQTT broker for home automation, logging, or alerting. +

+ +
+
+ + onChange({ ...config, broker_host: e.target.value })} + /> +
+
+ + + onChange({ ...config, broker_port: parseInt(e.target.value, 10) || 1883 }) + } + /> +
+
+ +
+
+ + onChange({ ...config, username: e.target.value })} + /> +
+
+ + onChange({ ...config, password: e.target.value })} + /> +
+
+ + + + {!!config.use_tls && ( + + )} + + + +
+ + onChange({ ...config, topic_prefix: e.target.value })} + /> +
+ + + +
+ + + +
+
+ ); +} + +function MqttCommunityConfigEditor({ + config, + onChange, +}: { + config: Record; + onChange: (config: Record) => void; +}) { + return ( +
+

+ Share raw packet data with the MeshCore community for coverage mapping and network analysis. + Only raw RF packets are shared — never decrypted messages. +

+ +
+
+ + onChange({ ...config, broker_host: e.target.value })} + /> +
+
+ + + onChange({ ...config, broker_port: parseInt(e.target.value, 10) || 443 }) + } + /> +
+
+ +
+ + onChange({ ...config, iata: e.target.value.toUpperCase() })} + className="w-32" + /> +

+ Your nearest airport's IATA code (required) +

+
+ +
+ + onChange({ ...config, email: e.target.value })} + /> +

+ Used to claim your node on the community aggregator +

+
+
+ ); +} + +export function SettingsFanoutSection({ + health, + className, +}: { + health: HealthStatus | null; + className?: string; +}) { + const [configs, setConfigs] = useState([]); + const [editingId, setEditingId] = useState(null); + const [editConfig, setEditConfig] = useState>({}); + const [editScope, setEditScope] = useState>({}); + const [editName, setEditName] = useState(''); + const [busy, setBusy] = useState(false); + const [addingType, setAddingType] = useState(null); + + const loadConfigs = useCallback(async () => { + try { + const data = await api.getFanoutConfigs(); + setConfigs(data); + } catch (err) { + console.error('Failed to load fanout configs:', err); + } + }, []); + + useEffect(() => { + loadConfigs(); + }, [loadConfigs]); + + const handleToggleEnabled = async (cfg: FanoutConfig) => { + try { + await api.updateFanoutConfig(cfg.id, { enabled: !cfg.enabled }); + await loadConfigs(); + toast.success(cfg.enabled ? 'Integration disabled' : 'Integration enabled'); + } catch (err) { + toast.error(err instanceof Error ? err.message : 'Failed to update'); + } + }; + + const handleEdit = (cfg: FanoutConfig) => { + setEditingId(cfg.id); + setEditConfig(cfg.config); + setEditScope(cfg.scope); + setEditName(cfg.name); + }; + + const handleSave = async () => { + if (!editingId) return; + setBusy(true); + try { + await api.updateFanoutConfig(editingId, { + name: editName, + config: editConfig, + scope: editScope, + }); + await loadConfigs(); + setEditingId(null); + toast.success('Integration saved'); + } catch (err) { + toast.error(err instanceof Error ? err.message : 'Failed to save'); + } finally { + setBusy(false); + } + }; + + const handleDelete = async (id: string) => { + const cfg = configs.find((c) => c.id === id); + if (!confirm(`Delete "${cfg?.name}"? This cannot be undone.`)) return; + try { + await api.deleteFanoutConfig(id); + if (editingId === id) setEditingId(null); + await loadConfigs(); + toast.success('Integration deleted'); + } catch (err) { + toast.error(err instanceof Error ? err.message : 'Failed to delete'); + } + }; + + const handleAddStart = (type: string) => { + setAddingType(type); + }; + + const handleAddCreate = async (type: string) => { + const defaults: Record> = { + mqtt_private: { + broker_host: '', + broker_port: 1883, + username: '', + password: '', + use_tls: false, + tls_insecure: false, + topic_prefix: 'meshcore', + }, + mqtt_community: { + broker_host: 'mqtt-us-v1.letsmesh.net', + broker_port: 443, + iata: '', + email: '', + }, + }; + const defaultScopes: Record> = { + mqtt_private: { messages: 'all', raw_packets: 'all' }, + mqtt_community: { messages: 'none', raw_packets: 'all' }, + }; + + try { + const created = await api.createFanoutConfig({ + type, + name: TYPE_LABELS[type] || type, + config: defaults[type] || {}, + scope: defaultScopes[type] || {}, + enabled: false, + }); + await loadConfigs(); + setAddingType(null); + handleEdit(created); + toast.success('Integration created'); + } catch (err) { + toast.error(err instanceof Error ? err.message : 'Failed to create'); + } + }; + + const editingConfig = editingId ? configs.find((c) => c.id === editingId) : null; + + // Detail view + if (editingConfig) { + return ( +
+ + +
+ + setEditName(e.target.value)} + /> +
+ +
+ Type: {TYPE_LABELS[editingConfig.type] || editingConfig.type} +
+ + + + {editingConfig.type === 'mqtt_private' && ( + + )} + + {editingConfig.type === 'mqtt_community' && ( + + )} + + + +
+ + +
+
+ ); + } + + // List view + return ( +
+
+ MQTT support is an experimental feature in open beta. All publishing uses QoS 0 + (at-most-once delivery). +
+ + {configs.length === 0 ? ( +
+

No integrations configured

+
+ ) : ( +
+ {configs.map((cfg) => { + const statusEntry = health?.fanout_statuses?.[cfg.id]; + const status = cfg.enabled ? statusEntry?.status : undefined; + return ( +
+
+ + + {cfg.name} + + + {TYPE_LABELS[cfg.type] || cfg.type} + + + +
+ ); + })} +
+ )} + + {addingType ? ( +
+ +
+ {TYPE_OPTIONS.map((opt) => ( + + ))} +
+ +
+ ) : ( + + )} +
+ ); +} diff --git a/frontend/src/components/settings/SettingsMqttSection.tsx b/frontend/src/components/settings/SettingsMqttSection.tsx deleted file mode 100644 index ad42b6a..0000000 --- a/frontend/src/components/settings/SettingsMqttSection.tsx +++ /dev/null @@ -1,451 +0,0 @@ -import { useState, useEffect } from 'react'; -import { Input } from '../ui/input'; -import { Label } from '../ui/label'; -import { Button } from '../ui/button'; -import { Separator } from '../ui/separator'; -import { toast } from '../ui/sonner'; -import { cn } from '@/lib/utils'; -import type { AppSettings, AppSettingsUpdate, HealthStatus } from '../../types'; - -export function SettingsMqttSection({ - appSettings, - health, - onSaveAppSettings, - className, -}: { - appSettings: AppSettings; - health: HealthStatus | null; - onSaveAppSettings: (update: AppSettingsUpdate) => Promise; - className?: string; -}) { - const [mqttBrokerHost, setMqttBrokerHost] = useState(''); - const [mqttBrokerPort, setMqttBrokerPort] = useState('1883'); - const [mqttUsername, setMqttUsername] = useState(''); - const [mqttPassword, setMqttPassword] = useState(''); - const [mqttUseTls, setMqttUseTls] = useState(false); - const [mqttTlsInsecure, setMqttTlsInsecure] = useState(false); - const [mqttTopicPrefix, setMqttTopicPrefix] = useState('meshcore'); - const [mqttPublishMessages, setMqttPublishMessages] = useState(false); - const [mqttPublishRawPackets, setMqttPublishRawPackets] = useState(false); - - // Community MQTT state - const [communityMqttEnabled, setCommunityMqttEnabled] = useState(false); - const [communityMqttIata, setCommunityMqttIata] = useState(''); - const [communityMqttBrokerHost, setCommunityMqttBrokerHost] = useState('mqtt-us-v1.letsmesh.net'); - const [communityMqttBrokerPort, setCommunityMqttBrokerPort] = useState('443'); - const [communityMqttEmail, setCommunityMqttEmail] = useState(''); - - const [privateExpanded, setPrivateExpanded] = useState(false); - const [communityExpanded, setCommunityExpanded] = useState(false); - - const [busy, setBusy] = useState(false); - const [error, setError] = useState(null); - - useEffect(() => { - setMqttBrokerHost(appSettings.mqtt_broker_host ?? ''); - setMqttBrokerPort(String(appSettings.mqtt_broker_port ?? 1883)); - setMqttUsername(appSettings.mqtt_username ?? ''); - setMqttPassword(appSettings.mqtt_password ?? ''); - setMqttUseTls(appSettings.mqtt_use_tls ?? false); - setMqttTlsInsecure(appSettings.mqtt_tls_insecure ?? false); - setMqttTopicPrefix(appSettings.mqtt_topic_prefix ?? 'meshcore'); - setMqttPublishMessages(appSettings.mqtt_publish_messages ?? false); - setMqttPublishRawPackets(appSettings.mqtt_publish_raw_packets ?? false); - setCommunityMqttEnabled(appSettings.community_mqtt_enabled ?? false); - setCommunityMqttIata(appSettings.community_mqtt_iata ?? ''); - setCommunityMqttBrokerHost(appSettings.community_mqtt_broker_host ?? 'mqtt-us-v1.letsmesh.net'); - setCommunityMqttBrokerPort(String(appSettings.community_mqtt_broker_port ?? 443)); - setCommunityMqttEmail(appSettings.community_mqtt_email ?? ''); - }, [appSettings]); - - const handleSave = async () => { - setError(null); - setBusy(true); - - try { - const update: AppSettingsUpdate = { - mqtt_broker_host: mqttBrokerHost, - mqtt_broker_port: parseInt(mqttBrokerPort, 10) || 1883, - mqtt_username: mqttUsername, - mqtt_password: mqttPassword, - mqtt_use_tls: mqttUseTls, - mqtt_tls_insecure: mqttTlsInsecure, - mqtt_topic_prefix: mqttTopicPrefix || 'meshcore', - mqtt_publish_messages: mqttPublishMessages, - mqtt_publish_raw_packets: mqttPublishRawPackets, - community_mqtt_enabled: communityMqttEnabled, - community_mqtt_iata: communityMqttIata, - community_mqtt_broker_host: communityMqttBrokerHost || 'mqtt-us-v1.letsmesh.net', - community_mqtt_broker_port: parseInt(communityMqttBrokerPort, 10) || 443, - community_mqtt_email: communityMqttEmail, - }; - await onSaveAppSettings(update); - toast.success('MQTT settings saved'); - } catch (err) { - setError(err instanceof Error ? err.message : 'Failed to save'); - } finally { - setBusy(false); - } - }; - - return ( -
-
- MQTT support is an experimental feature in open beta. All publishing uses QoS 0 - (at-most-once delivery). Please report any bugs on the{' '} - - GitHub issues page - - . -
- -
- Outgoing messages (DMs and group messages) will be reported to private MQTT brokers in - decrypted/plaintext form. The raw outgoing packets will NOT be reported to any MQTT broker, - private or community. This means that{' '} - - your advertisements will not be reported to community analytics (LetsMesh/etc.) due to - fundamental limitations of the radio - {' '} - — you don't hear your own advertisements unless they're echoed back to you. - So, your own advert echoes may result in you being listed on LetsMesh/etc., but if - you're alone in your mesh, your node will appear as an ingest source within LetsMesh, - without GPS data/etc. derived from adverts -- we faithfully report only traffic heard on the - radio (and don't reconstruct synthetic advertisement events to submit). Rely on the - “My Nodes” or view heard packets to validate that your radio is submitting to - community sources; if you're alone in your local mesh, the radio itself may not appear - as a heard/mapped source. -
- - {/* Private MQTT Broker */} -
- - - {privateExpanded && ( -
-

- Forward mesh data to your own MQTT broker for home automation, logging, or alerting. -

- - -

- Forward decrypted DM and channel messages -

- - -

Forward all RF packets

- - {(mqttPublishMessages || mqttPublishRawPackets) && ( -
- - -
-
- - setMqttBrokerHost(e.target.value)} - /> -
- -
- - setMqttBrokerPort(e.target.value)} - /> -
-
- -
-
- - setMqttUsername(e.target.value)} - /> -
- -
- - setMqttPassword(e.target.value)} - /> -
-
- - - - {mqttUseTls && ( - <> - -

- Allow self-signed or untrusted broker certificates -

- - )} - - - -
- - setMqttTopicPrefix(e.target.value)} - /> -
-
-

- Decrypted messages{' '} - - {'{'}id, type, conversation_key, text, sender_timestamp, received_at, - paths, outgoing, acked{'}'} - -

-
-
{mqttTopicPrefix || 'meshcore'}/dm:<contact_key>
-
{mqttTopicPrefix || 'meshcore'}/gm:<channel_key>
-
-
-
-

- Raw packets{' '} - - {'{'}id, observation_id, timestamp, data, payload_type, snr, rssi, - decrypted, decrypted_info{'}'} - -

-
-
{mqttTopicPrefix || 'meshcore'}/raw/dm:<contact_key>
-
{mqttTopicPrefix || 'meshcore'}/raw/gm:<channel_key>
-
{mqttTopicPrefix || 'meshcore'}/raw/unrouted
-
-
-
-
-
- )} -
- )} -
- - {/* Community Analytics */} -
- - - {communityExpanded && ( -
-

- Share raw packet data with the MeshCore community for coverage mapping and network - analysis. Only raw RF packets are shared — never decrypted messages. General parity - with{' '} - - meshcore-packet-capture - - . -

- - - {communityMqttEnabled && ( -
-
-
- - setCommunityMqttBrokerHost(e.target.value)} - /> -

- MQTT over TLS (WebSocket Secure) only -

-
-
- - setCommunityMqttBrokerPort(e.target.value)} - /> -
-
-
- - setCommunityMqttIata(e.target.value.toUpperCase())} - className="w-32" - /> -

- Your nearest airport's{' '} - - IATA code - {' '} - (required) -

- {communityMqttIata && ( -

- Topic: meshcore/{communityMqttIata}/<pubkey>/packets -

- )} -
-
- - setCommunityMqttEmail(e.target.value)} - /> -

- Used to claim your node on the community aggregator -

-
-
- )} -
- )} -
- - - - {error && ( -
- {error} -
- )} -
- ); -} diff --git a/frontend/src/components/settings/settingsConstants.ts b/frontend/src/components/settings/settingsConstants.ts index 136fbd0..9bb8868 100644 --- a/frontend/src/components/settings/settingsConstants.ts +++ b/frontend/src/components/settings/settingsConstants.ts @@ -3,7 +3,7 @@ export type SettingsSection = | 'local' | 'database' | 'bot' - | 'mqtt' + | 'fanout' | 'statistics' | 'about'; @@ -12,7 +12,7 @@ export const SETTINGS_SECTION_ORDER: SettingsSection[] = [ 'local', 'database', 'bot', - 'mqtt', + 'fanout', 'statistics', 'about', ]; @@ -22,7 +22,7 @@ export const SETTINGS_SECTION_LABELS: Record = { local: '🖥️ Local Configuration', database: '🗄️ Database & Messaging', bot: '🤖 Bots', - mqtt: '📤 MQTT', + fanout: '📤 Fanout & Forwarding', statistics: '📊 Statistics', about: 'About', }; diff --git a/frontend/src/test/settingsModal.test.tsx b/frontend/src/test/settingsModal.test.tsx index 03eb854..9ed3cfb 100644 --- a/frontend/src/test/settingsModal.test.tsx +++ b/frontend/src/test/settingsModal.test.tsx @@ -38,8 +38,7 @@ const baseHealth: HealthStatus = { connection_info: 'Serial: /dev/ttyUSB0', database_size_mb: 1.2, oldest_undecrypted_timestamp: null, - mqtt_status: null, - community_mqtt_status: null, + fanout_statuses: {}, bots_disabled: false, }; @@ -159,19 +158,6 @@ function openLocalSection() { fireEvent.click(localToggle); } -function openMqttSection() { - const mqttToggle = screen.getByRole('button', { name: /MQTT/i }); - fireEvent.click(mqttToggle); -} - -function expandPrivateMqtt() { - fireEvent.click(screen.getByText('Private MQTT Broker')); -} - -function expandCommunityMqtt() { - fireEvent.click(screen.getByText('Community Analytics')); -} - function openDatabaseSection() { const databaseToggle = screen.getByRole('button', { name: /Database/i }); fireEvent.click(databaseToggle); @@ -430,148 +416,6 @@ describe('SettingsModal', () => { expect(screen.getByText('42 msgs')).toBeInTheDocument(); }); - it('renders MQTT section with form inputs', () => { - renderModal(); - openMqttSection(); - expandPrivateMqtt(); - - // Publish checkboxes always visible - expect(screen.getByText('Publish Messages')).toBeInTheDocument(); - expect(screen.getByText('Publish Raw Packets')).toBeInTheDocument(); - - // Broker config hidden until a publish option is enabled - expect(screen.queryByLabelText('Broker Host')).not.toBeInTheDocument(); - - // Enable one publish option to reveal broker config - fireEvent.click(screen.getByText('Publish Messages')); - expect(screen.getByLabelText('Broker Host')).toBeInTheDocument(); - expect(screen.getByLabelText('Broker Port')).toBeInTheDocument(); - expect(screen.getByLabelText('Username')).toBeInTheDocument(); - expect(screen.getByLabelText('Password')).toBeInTheDocument(); - expect(screen.getByLabelText('Topic Prefix')).toBeInTheDocument(); - }); - - it('saves MQTT settings through onSaveAppSettings', async () => { - const { onSaveAppSettings } = renderModal({ - appSettings: { ...baseSettings, mqtt_publish_messages: true }, - }); - openMqttSection(); - expandPrivateMqtt(); - - const hostInput = screen.getByLabelText('Broker Host'); - fireEvent.change(hostInput, { target: { value: 'mqtt.example.com' } }); - - fireEvent.click(screen.getByRole('button', { name: 'Save MQTT Settings' })); - - await waitFor(() => { - expect(onSaveAppSettings).toHaveBeenCalledWith( - expect.objectContaining({ - mqtt_broker_host: 'mqtt.example.com', - mqtt_broker_port: 1883, - }) - ); - }); - }); - - it('shows MQTT disabled status when mqtt_status is null', () => { - renderModal({ - appSettings: { - ...baseSettings, - mqtt_broker_host: 'broker.local', - }, - }); - openMqttSection(); - - // Both MQTT and community MQTT show "Disabled" when null status - const disabledElements = screen.getAllByText('Disabled'); - expect(disabledElements.length).toBeGreaterThanOrEqual(1); - }); - - it('shows MQTT connected status badge', () => { - renderModal({ - appSettings: { - ...baseSettings, - mqtt_broker_host: 'broker.local', - }, - health: { - ...baseHealth, - mqtt_status: 'connected', - }, - }); - openMqttSection(); - - expect(screen.getByText('Connected')).toBeInTheDocument(); - }); - - it('renders community sharing section in MQTT tab', () => { - renderModal(); - openMqttSection(); - expandCommunityMqtt(); - - expect(screen.getByText('Community Analytics')).toBeInTheDocument(); - expect(screen.getByText('Enable Community Analytics')).toBeInTheDocument(); - }); - - it('shows IATA input only when community sharing is enabled', () => { - renderModal({ - appSettings: { - ...baseSettings, - community_mqtt_enabled: false, - }, - }); - openMqttSection(); - expandCommunityMqtt(); - - expect(screen.queryByLabelText('Region Code (IATA)')).not.toBeInTheDocument(); - - // Enable community sharing - fireEvent.click(screen.getByText('Enable Community Analytics')); - expect(screen.getByLabelText('Region Code (IATA)')).toBeInTheDocument(); - }); - - it('includes community MQTT fields in save payload', async () => { - const { onSaveAppSettings } = renderModal({ - appSettings: { - ...baseSettings, - community_mqtt_enabled: true, - community_mqtt_iata: 'DEN', - }, - }); - openMqttSection(); - - fireEvent.click(screen.getByRole('button', { name: 'Save MQTT Settings' })); - - await waitFor(() => { - expect(onSaveAppSettings).toHaveBeenCalledWith( - expect.objectContaining({ - community_mqtt_enabled: true, - community_mqtt_iata: 'DEN', - }) - ); - }); - }); - - it('shows community MQTT connected status badge', () => { - renderModal({ - appSettings: { - ...baseSettings, - community_mqtt_enabled: true, - }, - health: { - ...baseHealth, - community_mqtt_status: 'connected', - }, - }); - openMqttSection(); - - // Community Analytics sub-section should show Connected - const communitySection = screen.getByText('Community Analytics').closest('div'); - expect(communitySection).not.toBeNull(); - // Both MQTT and community could show "Connected" — check count - const connectedElements = screen.getAllByText('Connected'); - expect(connectedElements.length).toBeGreaterThanOrEqual(1); - }); - it('fetches statistics when expanded in mobile external-nav mode', async () => { const mockStats: StatisticsResponse = { busiest_channels_24h: [], diff --git a/frontend/src/types.ts b/frontend/src/types.ts index d5fcea6..ee98507 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -23,17 +23,33 @@ export interface RadioConfigUpdate { radio?: RadioSettings; } +export interface FanoutStatusEntry { + name: string; + type: string; + status: string; +} + export interface HealthStatus { status: string; radio_connected: boolean; connection_info: string | null; database_size_mb: number; oldest_undecrypted_timestamp: number | null; - mqtt_status: string | null; - community_mqtt_status: string | null; + fanout_statuses: Record; bots_disabled: boolean; } +export interface FanoutConfig { + id: string; + type: string; + name: string; + enabled: boolean; + config: Record; + scope: Record; + sort_order: number; + created_at: number; +} + export interface MaintenanceResult { packets_deleted: number; vacuumed: boolean; @@ -240,20 +256,6 @@ export interface AppSettingsUpdate { sidebar_sort_order?: 'recent' | 'alpha'; advert_interval?: number; bots?: BotConfig[]; - mqtt_broker_host?: string; - mqtt_broker_port?: number; - mqtt_username?: string; - mqtt_password?: string; - mqtt_use_tls?: boolean; - mqtt_tls_insecure?: boolean; - mqtt_topic_prefix?: string; - mqtt_publish_messages?: boolean; - mqtt_publish_raw_packets?: boolean; - community_mqtt_enabled?: boolean; - community_mqtt_iata?: string; - community_mqtt_broker_host?: string; - community_mqtt_broker_port?: number; - community_mqtt_email?: string; flood_scope?: string; blocked_keys?: string[]; blocked_names?: string[]; diff --git a/tests/conftest.py b/tests/conftest.py index 86f2c7e..976b302 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -68,7 +68,7 @@ def captured_broadcasts(): """Capture WebSocket broadcasts for verification.""" broadcasts = [] - def mock_broadcast(event_type: str, data: dict): + def mock_broadcast(event_type: str, data: dict, **kwargs): broadcasts.append({"type": event_type, "data": data}) return broadcasts, mock_broadcast diff --git a/tests/test_community_mqtt.py b/tests/test_community_mqtt.py index 159a512..27c1a08 100644 --- a/tests/test_community_mqtt.py +++ b/tests/test_community_mqtt.py @@ -21,7 +21,6 @@ from app.community_mqtt import ( _format_raw_packet, _generate_jwt_token, _get_client_version, - community_mqtt_broadcast, ) from app.models import AppSettings @@ -394,39 +393,6 @@ class TestCommunityMqttPublisher: assert pub._is_configured() is True -class TestCommunityMqttBroadcast: - def test_filters_non_raw_packet(self): - """Non-raw_packet events should be ignored.""" - with patch("app.community_mqtt.community_publisher") as mock_pub: - mock_pub.connected = True - mock_pub._settings = AppSettings(community_mqtt_enabled=True) - community_mqtt_broadcast("message", {"text": "hello"}) - # No asyncio.create_task should be called for non-raw_packet events - # Since we're filtering, we just verify no exception - - def test_skips_when_disconnected(self): - """Should not publish when disconnected.""" - with ( - patch("app.community_mqtt.community_publisher") as mock_pub, - patch("app.community_mqtt.asyncio.create_task") as mock_task, - ): - mock_pub.connected = False - mock_pub._settings = AppSettings(community_mqtt_enabled=True) - community_mqtt_broadcast("raw_packet", {"data": "00"}) - mock_task.assert_not_called() - - def test_skips_when_settings_none(self): - """Should not publish when settings are None.""" - with ( - patch("app.community_mqtt.community_publisher") as mock_pub, - patch("app.community_mqtt.asyncio.create_task") as mock_task, - ): - mock_pub.connected = True - mock_pub._settings = None - community_mqtt_broadcast("raw_packet", {"data": "00"}) - mock_task.assert_not_called() - - class TestPublishFailureSetsDisconnected: @pytest.mark.asyncio async def test_publish_error_sets_connected_false(self): diff --git a/tests/test_fanout.py b/tests/test_fanout.py new file mode 100644 index 0000000..e9316e0 --- /dev/null +++ b/tests/test_fanout.py @@ -0,0 +1,528 @@ +"""Tests for fanout bus: manager, scope matching, repository, and modules.""" + +import json +from unittest.mock import AsyncMock, patch + +import pytest + +from app.database import Database +from app.fanout.base import FanoutModule +from app.fanout.manager import ( + FanoutManager, + _scope_matches_message, + _scope_matches_raw, +) + +# --------------------------------------------------------------------------- +# Scope matching unit tests +# --------------------------------------------------------------------------- + + +class TestScopeMatchesMessage: + def test_all_matches_everything(self): + assert _scope_matches_message({"messages": "all"}, {"type": "PRIV"}) + + def test_none_matches_nothing(self): + assert not _scope_matches_message({"messages": "none"}, {"type": "PRIV"}) + + def test_missing_key_defaults_none(self): + assert not _scope_matches_message({}, {"type": "PRIV"}) + + def test_dict_channels_all(self): + scope = {"messages": {"channels": "all", "contacts": "none"}} + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + + def test_dict_channels_none(self): + scope = {"messages": {"channels": "none"}} + assert not _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + + def test_dict_channels_list_match(self): + scope = {"messages": {"channels": ["ch1", "ch2"]}} + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + + def test_dict_channels_list_no_match(self): + scope = {"messages": {"channels": ["ch1", "ch2"]}} + assert not _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch3"}) + + def test_dict_contacts_all(self): + scope = {"messages": {"contacts": "all"}} + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + + def test_dict_contacts_list_match(self): + scope = {"messages": {"contacts": ["pk1"]}} + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + + def test_dict_contacts_list_no_match(self): + scope = {"messages": {"contacts": ["pk1"]}} + assert not _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk2"}) + + +class TestScopeMatchesRaw: + def test_all_matches(self): + assert _scope_matches_raw({"raw_packets": "all"}, {}) + + def test_none_does_not_match(self): + assert not _scope_matches_raw({"raw_packets": "none"}, {}) + + def test_missing_key_does_not_match(self): + assert not _scope_matches_raw({}, {}) + + +# --------------------------------------------------------------------------- +# FanoutManager dispatch tests +# --------------------------------------------------------------------------- + + +class StubModule(FanoutModule): + """Minimal FanoutModule for testing dispatch.""" + + def __init__(self): + super().__init__("stub", {}) + self.message_calls: list[dict] = [] + self.raw_calls: list[dict] = [] + self._status = "connected" + + async def start(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def on_message(self, data: dict) -> None: + self.message_calls.append(data) + + async def on_raw(self, data: dict) -> None: + self.raw_calls.append(data) + + @property + def status(self) -> str: + return self._status + + +class TestFanoutManagerDispatch: + @pytest.mark.asyncio + async def test_broadcast_message_dispatches_to_matching_module(self): + manager = FanoutManager() + mod = StubModule() + scope = {"messages": "all", "raw_packets": "none"} + manager._modules["test-id"] = (mod, scope) + + await manager.broadcast_message({"type": "PRIV", "conversation_key": "pk1"}) + + assert len(mod.message_calls) == 1 + assert mod.message_calls[0]["conversation_key"] == "pk1" + + @pytest.mark.asyncio + async def test_broadcast_message_skips_non_matching_module(self): + manager = FanoutManager() + mod = StubModule() + scope = {"messages": "none", "raw_packets": "all"} + manager._modules["test-id"] = (mod, scope) + + await manager.broadcast_message({"type": "PRIV", "conversation_key": "pk1"}) + + assert len(mod.message_calls) == 0 + + @pytest.mark.asyncio + async def test_broadcast_raw_dispatches_to_matching_module(self): + manager = FanoutManager() + mod = StubModule() + scope = {"messages": "none", "raw_packets": "all"} + manager._modules["test-id"] = (mod, scope) + + await manager.broadcast_raw({"data": "aabbccdd"}) + + assert len(mod.raw_calls) == 1 + + @pytest.mark.asyncio + async def test_broadcast_raw_skips_non_matching(self): + manager = FanoutManager() + mod = StubModule() + scope = {"messages": "all", "raw_packets": "none"} + manager._modules["test-id"] = (mod, scope) + + await manager.broadcast_raw({"data": "aabbccdd"}) + + assert len(mod.raw_calls) == 0 + + @pytest.mark.asyncio + async def test_stop_all_stops_all_modules(self): + manager = FanoutManager() + mod1 = StubModule() + mod1.stop = AsyncMock() + mod2 = StubModule() + mod2.stop = AsyncMock() + manager._modules["id1"] = (mod1, {}) + manager._modules["id2"] = (mod2, {}) + + await manager.stop_all() + + mod1.stop.assert_called_once() + mod2.stop.assert_called_once() + assert len(manager._modules) == 0 + + @pytest.mark.asyncio + async def test_module_error_does_not_halt_broadcast(self): + manager = FanoutManager() + bad_mod = StubModule() + + async def fail(data): + raise RuntimeError("boom") + + bad_mod.on_message = fail + good_mod = StubModule() + + manager._modules["bad"] = (bad_mod, {"messages": "all"}) + manager._modules["good"] = (good_mod, {"messages": "all"}) + + await manager.broadcast_message({"type": "PRIV", "conversation_key": "pk1"}) + + # Good module should still receive the message despite the bad one failing + assert len(good_mod.message_calls) == 1 + + def test_get_statuses(self): + manager = FanoutManager() + mod = StubModule() + mod._status = "connected" + manager._modules["test-id"] = (mod, {}) + + with patch( + "app.repository.fanout._configs_cache", + {"test-id": {"name": "Test", "type": "mqtt_private"}}, + ): + statuses = manager.get_statuses() + + assert "test-id" in statuses + assert statuses["test-id"]["status"] == "connected" + assert statuses["test-id"]["name"] == "Test" + assert statuses["test-id"]["type"] == "mqtt_private" + + +# --------------------------------------------------------------------------- +# Repository tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def fanout_db(): + """Create an in-memory database with fanout_configs table.""" + import app.repository.fanout as fanout_mod + + db = Database(":memory:") + await db.connect() + + await db.conn.execute(""" + CREATE TABLE IF NOT EXISTS fanout_configs ( + id TEXT PRIMARY KEY, + type TEXT NOT NULL, + name TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 0, + config TEXT NOT NULL DEFAULT '{}', + scope TEXT NOT NULL DEFAULT '{}', + sort_order INTEGER NOT NULL DEFAULT 0, + created_at INTEGER NOT NULL DEFAULT 0 + ) + """) + await db.conn.commit() + + original_db = fanout_mod.db + fanout_mod.db = db + + try: + yield db + finally: + fanout_mod.db = original_db + await db.disconnect() + + +class TestFanoutConfigRepository: + @pytest.mark.asyncio + async def test_create_and_get(self, fanout_db): + from app.repository.fanout import FanoutConfigRepository + + cfg = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Test MQTT", + config={"broker_host": "localhost", "broker_port": 1883}, + scope={"messages": "all", "raw_packets": "all"}, + enabled=True, + ) + + assert cfg["type"] == "mqtt_private" + assert cfg["name"] == "Test MQTT" + assert cfg["enabled"] is True + assert cfg["config"]["broker_host"] == "localhost" + + fetched = await FanoutConfigRepository.get(cfg["id"]) + assert fetched is not None + assert fetched["id"] == cfg["id"] + + @pytest.mark.asyncio + async def test_get_all(self, fanout_db): + from app.repository.fanout import FanoutConfigRepository + + await FanoutConfigRepository.create( + config_type="mqtt_private", name="A", config={}, scope={}, enabled=True + ) + await FanoutConfigRepository.create( + config_type="mqtt_community", name="B", config={}, scope={}, enabled=False + ) + + all_configs = await FanoutConfigRepository.get_all() + assert len(all_configs) == 2 + + @pytest.mark.asyncio + async def test_update(self, fanout_db): + from app.repository.fanout import FanoutConfigRepository + + cfg = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Original", + config={"broker_host": "old"}, + scope={}, + enabled=True, + ) + + updated = await FanoutConfigRepository.update( + cfg["id"], + name="Renamed", + config={"broker_host": "new"}, + enabled=False, + ) + + assert updated is not None + assert updated["name"] == "Renamed" + assert updated["config"]["broker_host"] == "new" + assert updated["enabled"] is False + + @pytest.mark.asyncio + async def test_delete(self, fanout_db): + from app.repository.fanout import FanoutConfigRepository + + cfg = await FanoutConfigRepository.create( + config_type="mqtt_private", name="Doomed", config={}, scope={}, enabled=True + ) + + await FanoutConfigRepository.delete(cfg["id"]) + + assert await FanoutConfigRepository.get(cfg["id"]) is None + + @pytest.mark.asyncio + async def test_get_enabled(self, fanout_db): + from app.repository.fanout import FanoutConfigRepository + + await FanoutConfigRepository.create( + config_type="mqtt_private", name="On", config={}, scope={}, enabled=True + ) + await FanoutConfigRepository.create( + config_type="mqtt_community", name="Off", config={}, scope={}, enabled=False + ) + + enabled = await FanoutConfigRepository.get_enabled() + assert len(enabled) == 1 + assert enabled[0]["name"] == "On" + + +# --------------------------------------------------------------------------- +# broadcast_event realtime=False test +# --------------------------------------------------------------------------- + + +class TestBroadcastEventRealtime: + @pytest.mark.asyncio + async def test_realtime_false_does_not_dispatch_fanout(self): + """broadcast_event with realtime=False should NOT trigger fanout dispatch.""" + from app.websocket import broadcast_event + + with ( + patch("app.websocket.ws_manager") as mock_ws, + patch("app.fanout.manager.fanout_manager") as mock_fm, + ): + mock_ws.broadcast = AsyncMock() + + broadcast_event("message", {"type": "PRIV"}, realtime=False) + + # Allow tasks to run + import asyncio + + await asyncio.sleep(0) + + # WebSocket broadcast should still fire + mock_ws.broadcast.assert_called_once() + # But fanout should NOT be called + mock_fm.broadcast_message.assert_not_called() + + @pytest.mark.asyncio + async def test_realtime_true_dispatches_fanout(self): + """broadcast_event with realtime=True should trigger fanout dispatch.""" + from app.websocket import broadcast_event + + with ( + patch("app.websocket.ws_manager") as mock_ws, + patch("app.fanout.manager.fanout_manager") as mock_fm, + ): + mock_ws.broadcast = AsyncMock() + mock_fm.broadcast_message = AsyncMock() + + broadcast_event("message", {"type": "PRIV"}, realtime=True) + + import asyncio + + await asyncio.sleep(0) + + mock_ws.broadcast.assert_called_once() + mock_fm.broadcast_message.assert_called_once() + + +# --------------------------------------------------------------------------- +# Migration test +# --------------------------------------------------------------------------- + + +def _create_app_settings_table_sql(): + """SQL to create app_settings with all MQTT columns for migration testing.""" + return """ + CREATE TABLE IF NOT EXISTS app_settings ( + id INTEGER PRIMARY KEY CHECK (id = 1), + max_radio_contacts INTEGER DEFAULT 200, + favorites TEXT DEFAULT '[]', + auto_decrypt_dm_on_advert INTEGER DEFAULT 0, + sidebar_sort_order TEXT DEFAULT 'recent', + last_message_times TEXT DEFAULT '{}', + preferences_migrated INTEGER DEFAULT 0, + advert_interval INTEGER DEFAULT 0, + last_advert_time INTEGER DEFAULT 0, + bots TEXT DEFAULT '[]', + mqtt_broker_host TEXT DEFAULT '', + mqtt_broker_port INTEGER DEFAULT 1883, + mqtt_username TEXT DEFAULT '', + mqtt_password TEXT DEFAULT '', + mqtt_use_tls INTEGER DEFAULT 0, + mqtt_tls_insecure INTEGER DEFAULT 0, + mqtt_topic_prefix TEXT DEFAULT 'meshcore', + mqtt_publish_messages INTEGER DEFAULT 0, + mqtt_publish_raw_packets INTEGER DEFAULT 0, + community_mqtt_enabled INTEGER DEFAULT 0, + community_mqtt_iata TEXT DEFAULT '', + community_mqtt_broker_host TEXT DEFAULT 'mqtt-us-v1.letsmesh.net', + community_mqtt_broker_port INTEGER DEFAULT 443, + community_mqtt_email TEXT DEFAULT '', + flood_scope TEXT DEFAULT '', + blocked_keys TEXT DEFAULT '[]', + blocked_names TEXT DEFAULT '[]' + ) + """ + + +class TestMigration036: + @pytest.mark.asyncio + async def test_fanout_configs_table_created(self): + """Migration 36 should create the fanout_configs table.""" + from app.migrations import _migrate_036_create_fanout_configs + + db = Database(":memory:") + await db.connect() + + await db.conn.execute(_create_app_settings_table_sql()) + await db.conn.execute("INSERT OR IGNORE INTO app_settings (id) VALUES (1)") + await db.conn.commit() + + try: + await _migrate_036_create_fanout_configs(db.conn) + + cursor = await db.conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='fanout_configs'" + ) + row = await cursor.fetchone() + assert row is not None + finally: + await db.disconnect() + + @pytest.mark.asyncio + async def test_migration_creates_mqtt_private_from_settings(self): + """Migration should create mqtt_private config from existing MQTT settings.""" + from app.migrations import _migrate_036_create_fanout_configs + + db = Database(":memory:") + await db.connect() + + await db.conn.execute(_create_app_settings_table_sql()) + await db.conn.execute( + """INSERT OR REPLACE INTO app_settings (id, mqtt_broker_host, mqtt_broker_port, + mqtt_username, mqtt_password, mqtt_use_tls, mqtt_tls_insecure, + mqtt_topic_prefix, mqtt_publish_messages, mqtt_publish_raw_packets) + VALUES (1, 'broker.local', 1883, 'user', 'pass', 0, 0, 'mesh', 1, 0)""" + ) + await db.conn.commit() + + try: + await _migrate_036_create_fanout_configs(db.conn) + + cursor = await db.conn.execute( + "SELECT * FROM fanout_configs WHERE type = 'mqtt_private'" + ) + row = await cursor.fetchone() + assert row is not None + + config = json.loads(row["config"]) + assert config["broker_host"] == "broker.local" + assert config["username"] == "user" + + scope = json.loads(row["scope"]) + assert scope["messages"] == "all" + assert scope["raw_packets"] == "none" + finally: + await db.disconnect() + + @pytest.mark.asyncio + async def test_migration_creates_community_from_settings(self): + """Migration should create mqtt_community config when community was enabled.""" + from app.migrations import _migrate_036_create_fanout_configs + + db = Database(":memory:") + await db.connect() + + await db.conn.execute(_create_app_settings_table_sql()) + await db.conn.execute( + """INSERT OR REPLACE INTO app_settings (id, community_mqtt_enabled, community_mqtt_iata, + community_mqtt_broker_host, community_mqtt_broker_port, community_mqtt_email) + VALUES (1, 1, 'DEN', 'mqtt-us-v1.letsmesh.net', 443, 'test@example.com')""" + ) + await db.conn.commit() + + try: + await _migrate_036_create_fanout_configs(db.conn) + + cursor = await db.conn.execute( + "SELECT * FROM fanout_configs WHERE type = 'mqtt_community'" + ) + row = await cursor.fetchone() + assert row is not None + assert bool(row["enabled"]) + + config = json.loads(row["config"]) + assert config["iata"] == "DEN" + assert config["email"] == "test@example.com" + finally: + await db.disconnect() + + @pytest.mark.asyncio + async def test_migration_skips_when_no_mqtt_configured(self): + """Migration should not create rows when MQTT was not configured.""" + from app.migrations import _migrate_036_create_fanout_configs + + db = Database(":memory:") + await db.connect() + + await db.conn.execute(_create_app_settings_table_sql()) + await db.conn.execute("INSERT OR IGNORE INTO app_settings (id) VALUES (1)") + await db.conn.commit() + + try: + await _migrate_036_create_fanout_configs(db.conn) + + cursor = await db.conn.execute("SELECT COUNT(*) FROM fanout_configs") + row = await cursor.fetchone() + assert row[0] == 0 + finally: + await db.disconnect() diff --git a/tests/test_fanout_integration.py b/tests/test_fanout_integration.py new file mode 100644 index 0000000..ab2a5f9 --- /dev/null +++ b/tests/test_fanout_integration.py @@ -0,0 +1,403 @@ +"""Integration tests: real MQTT capture broker + real fanout modules. + +Spins up a minimal in-process MQTT 3.1.1 broker on a random port, creates +fanout configs in an in-memory DB, starts real MqttPrivateModule instances +via the FanoutManager, and verifies that PUBLISH packets arrive (or don't) +based on enabled/disabled state and scope settings. +""" + +import asyncio +import json +import struct + +import pytest + +import app.repository.fanout as fanout_mod +from app.database import Database +from app.fanout.manager import FanoutManager +from app.repository.fanout import FanoutConfigRepository + +# --------------------------------------------------------------------------- +# Minimal async MQTT 3.1.1 capture broker +# --------------------------------------------------------------------------- + + +class MqttCaptureBroker: + """Tiny TCP server that speaks just enough MQTT to capture PUBLISH packets.""" + + def __init__(self): + self.published: list[tuple[str, dict]] = [] + self._server: asyncio.Server | None = None + self.port: int = 0 + + async def start(self) -> int: + self._server = await asyncio.start_server(self._handle_client, "127.0.0.1", 0) + self.port = self._server.sockets[0].getsockname()[1] + return self.port + + async def stop(self): + if self._server: + self._server.close() + await self._server.wait_closed() + + async def wait_for(self, count: int, timeout: float = 5.0) -> list[tuple[str, dict]]: + """Block until *count* messages captured, or timeout.""" + deadline = asyncio.get_event_loop().time() + timeout + while len(self.published) < count: + if asyncio.get_event_loop().time() >= deadline: + break + await asyncio.sleep(0.02) + return list(self.published) + + async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + try: + while True: + first = await reader.readexactly(1) + pkt_type = (first[0] & 0xF0) >> 4 + rem_len = await self._read_varlen(reader) + payload = await reader.readexactly(rem_len) if rem_len else b"" + + if pkt_type == 1: # CONNECT -> CONNACK + writer.write(b"\x20\x02\x00\x00") + await writer.drain() + elif pkt_type == 3: # PUBLISH (QoS 0) + topic_len = struct.unpack("!H", payload[:2])[0] + topic = payload[2 : 2 + topic_len].decode() + body = payload[2 + topic_len :] + try: + data = json.loads(body) + except Exception: + data = {} + self.published.append((topic, data)) + elif pkt_type == 12: # PINGREQ -> PINGRESP + writer.write(b"\xd0\x00") + await writer.drain() + elif pkt_type == 14: # DISCONNECT + break + except (asyncio.IncompleteReadError, ConnectionError, OSError): + pass + finally: + writer.close() + + @staticmethod + async def _read_varlen(reader: asyncio.StreamReader) -> int: + value, shift = 0, 0 + while True: + b = (await reader.readexactly(1))[0] + value |= (b & 0x7F) << shift + if not (b & 0x80): + return value + shift += 7 + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +async def mqtt_broker(): + broker = MqttCaptureBroker() + await broker.start() + yield broker + await broker.stop() + + +@pytest.fixture +async def integration_db(): + """In-memory DB with fanout_configs, wired into the repository module. + + Database.connect() runs all migrations which create the fanout_configs + table, so no manual DDL is needed here. + """ + test_db = Database(":memory:") + await test_db.connect() + + original_db = fanout_mod.db + fanout_mod.db = test_db + try: + yield test_db + finally: + fanout_mod.db = original_db + await test_db.disconnect() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _wait_connected(manager: FanoutManager, config_id: str, timeout: float = 5.0): + """Poll until the module reports 'connected'.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + entry = manager._modules.get(config_id) + if entry and entry[0].status == "connected": + return + await asyncio.sleep(0.05) + raise TimeoutError(f"Module {config_id} did not connect within {timeout}s") + + +def _private_config(port: int, prefix: str) -> dict: + return {"broker_host": "127.0.0.1", "broker_port": port, "topic_prefix": prefix} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestFanoutMqttIntegration: + """End-to-end: real capture broker <-> real fanout modules.""" + + @pytest.mark.asyncio + async def test_both_enabled_both_receive(self, mqtt_broker, integration_db): + """Two enabled integrations with different prefixes both receive messages.""" + from unittest.mock import patch + + cfg_a = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Alpha", + config=_private_config(mqtt_broker.port, "alpha"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=True, + ) + cfg_b = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Beta", + config=_private_config(mqtt_broker.port, "beta"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=True, + ) + + manager = FanoutManager() + with ( + patch("app.mqtt_base._broadcast_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + ): + try: + await manager.load_from_db() + await _wait_connected(manager, cfg_a["id"]) + await _wait_connected(manager, cfg_b["id"]) + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "hello"} + ) + + messages = await mqtt_broker.wait_for(2) + finally: + await manager.stop_all() + + topics = {m[0] for m in messages} + assert "alpha/dm:pk1" in topics + assert "beta/dm:pk1" in topics + + @pytest.mark.asyncio + async def test_one_disabled_only_enabled_receives(self, mqtt_broker, integration_db): + """Disabled integration must not publish any messages.""" + from unittest.mock import patch + + cfg_on = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Enabled", + config=_private_config(mqtt_broker.port, "on"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=True, + ) + await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Disabled", + config=_private_config(mqtt_broker.port, "off"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=False, + ) + + manager = FanoutManager() + with ( + patch("app.mqtt_base._broadcast_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + ): + try: + await manager.load_from_db() + await _wait_connected(manager, cfg_on["id"]) + + # Only 1 module should be loaded + assert len(manager._modules) == 1 + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "hello"} + ) + + await mqtt_broker.wait_for(1) + await asyncio.sleep(0.2) # extra time to catch stray messages + finally: + await manager.stop_all() + + assert len(mqtt_broker.published) == 1 + assert mqtt_broker.published[0][0] == "on/dm:pk1" + + @pytest.mark.asyncio + async def test_both_disabled_nothing_published(self, mqtt_broker, integration_db): + """Both disabled -> zero messages published.""" + from unittest.mock import patch + + await FanoutConfigRepository.create( + config_type="mqtt_private", + name="A", + config=_private_config(mqtt_broker.port, "a"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=False, + ) + await FanoutConfigRepository.create( + config_type="mqtt_private", + name="B", + config=_private_config(mqtt_broker.port, "b"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=False, + ) + + manager = FanoutManager() + with ( + patch("app.mqtt_base._broadcast_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + ): + try: + await manager.load_from_db() + assert len(manager._modules) == 0 + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "hello"} + ) + await asyncio.sleep(0.3) + finally: + await manager.stop_all() + + assert len(mqtt_broker.published) == 0 + + @pytest.mark.asyncio + async def test_disable_after_enable_stops_publishing(self, mqtt_broker, integration_db): + """Disabling a live integration stops its publishing immediately.""" + from unittest.mock import patch + + cfg = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Toggle", + config=_private_config(mqtt_broker.port, "toggle"), + scope={"messages": "all", "raw_packets": "all"}, + enabled=True, + ) + + manager = FanoutManager() + with ( + patch("app.mqtt_base._broadcast_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + ): + try: + await manager.load_from_db() + await _wait_connected(manager, cfg["id"]) + + # Publishes while enabled + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "msg1"} + ) + await mqtt_broker.wait_for(1) + assert len(mqtt_broker.published) == 1 + + # Disable via DB + reload + await FanoutConfigRepository.update(cfg["id"], enabled=False) + await manager.reload_config(cfg["id"]) + assert cfg["id"] not in manager._modules + + # Should NOT publish after disable + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk2", "text": "msg2"} + ) + await asyncio.sleep(0.3) + finally: + await manager.stop_all() + + # Only the first message + assert len(mqtt_broker.published) == 1 + assert mqtt_broker.published[0][0] == "toggle/dm:pk1" + + @pytest.mark.asyncio + async def test_scope_messages_only_no_raw(self, mqtt_broker, integration_db): + """Module with raw_packets=none receives messages but not raw packets.""" + from unittest.mock import patch + + cfg = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Messages Only", + config=_private_config(mqtt_broker.port, "msgsonly"), + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + with ( + patch("app.mqtt_base._broadcast_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + ): + try: + await manager.load_from_db() + await _wait_connected(manager, cfg["id"]) + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "hi"} + ) + await manager.broadcast_raw({"data": "aabbccdd"}) + + await mqtt_broker.wait_for(1) + await asyncio.sleep(0.2) + finally: + await manager.stop_all() + + assert len(mqtt_broker.published) == 1 + assert "dm:pk1" in mqtt_broker.published[0][0] + + @pytest.mark.asyncio + async def test_scope_raw_only_no_messages(self, mqtt_broker, integration_db): + """Module with messages=none receives raw packets but not decoded messages.""" + from unittest.mock import patch + + cfg = await FanoutConfigRepository.create( + config_type="mqtt_private", + name="Raw Only", + config=_private_config(mqtt_broker.port, "rawonly"), + scope={"messages": "none", "raw_packets": "all"}, + enabled=True, + ) + + manager = FanoutManager() + with ( + patch("app.mqtt_base._broadcast_health"), + patch("app.websocket.broadcast_success"), + patch("app.websocket.broadcast_error"), + patch("app.websocket.broadcast_health"), + ): + try: + await manager.load_from_db() + await _wait_connected(manager, cfg["id"]) + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "hi"} + ) + await manager.broadcast_raw({"data": "aabbccdd"}) + + await mqtt_broker.wait_for(1) + await asyncio.sleep(0.2) + finally: + await manager.stop_all() + + assert len(mqtt_broker.published) == 1 + assert "raw/" in mqtt_broker.published[0][0] diff --git a/tests/test_health_mqtt_status.py b/tests/test_health_mqtt_status.py index c504d18..6a772d7 100644 --- a/tests/test_health_mqtt_status.py +++ b/tests/test_health_mqtt_status.py @@ -1,7 +1,7 @@ -"""Tests for health endpoint MQTT status field. +"""Tests for health endpoint fanout status fields. -Verifies that build_health_data correctly reports MQTT status as -'connected', 'disconnected', or 'disabled' based on publisher state. +Verifies that build_health_data correctly reports fanout module statuses +via the fanout_manager. """ from unittest.mock import patch @@ -11,96 +11,34 @@ import pytest from app.routers.health import build_health_data -class TestHealthMqttStatus: - """Test MQTT status in build_health_data.""" +class TestHealthFanoutStatus: + """Test fanout_statuses in build_health_data.""" @pytest.mark.asyncio - async def test_mqtt_disabled_when_not_configured(self, test_db): - """MQTT status is 'disabled' when broker host is empty.""" - from app.mqtt import mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - try: - from app.models import AppSettings - - mqtt_publisher._settings = AppSettings(mqtt_broker_host="") - mqtt_publisher.connected = False - + async def test_no_fanout_modules_returns_empty(self, test_db): + """fanout_statuses should be empty dict when no modules are running.""" + with patch("app.fanout.manager.fanout_manager") as mock_fm: + mock_fm.get_statuses.return_value = {} data = await build_health_data(True, "TCP: 1.2.3.4:4000") - assert data["mqtt_status"] == "disabled" - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected + assert data["fanout_statuses"] == {} @pytest.mark.asyncio - async def test_mqtt_disabled_when_nothing_to_publish(self, test_db): - """MQTT status is 'disabled' when broker host is set but no publish options enabled.""" - from app.mqtt import mqtt_publisher + async def test_fanout_statuses_reflect_manager(self, test_db): + """fanout_statuses should return whatever the manager reports.""" + mock_statuses = { + "uuid-1": {"name": "Private MQTT", "type": "mqtt_private", "status": "connected"}, + "uuid-2": { + "name": "Community MQTT", + "type": "mqtt_community", + "status": "disconnected", + }, + } + with patch("app.fanout.manager.fanout_manager") as mock_fm: + mock_fm.get_statuses.return_value = mock_statuses + data = await build_health_data(True, "Serial: /dev/ttyUSB0") - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - try: - from app.models import AppSettings - - mqtt_publisher._settings = AppSettings( - mqtt_broker_host="broker.local", - mqtt_publish_messages=False, - mqtt_publish_raw_packets=False, - ) - mqtt_publisher.connected = False - - data = await build_health_data(True, "TCP: 1.2.3.4:4000") - - assert data["mqtt_status"] == "disabled" - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - @pytest.mark.asyncio - async def test_mqtt_connected_when_publisher_connected(self, test_db): - """MQTT status is 'connected' when publisher is connected.""" - from app.mqtt import mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - try: - from app.models import AppSettings - - mqtt_publisher._settings = AppSettings( - mqtt_broker_host="broker.local", mqtt_publish_messages=True - ) - mqtt_publisher.connected = True - - data = await build_health_data(True, "TCP: 1.2.3.4:4000") - - assert data["mqtt_status"] == "connected" - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - @pytest.mark.asyncio - async def test_mqtt_disconnected_when_configured_but_not_connected(self, test_db): - """MQTT status is 'disconnected' when configured but not connected.""" - from app.mqtt import mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - try: - from app.models import AppSettings - - mqtt_publisher._settings = AppSettings( - mqtt_broker_host="broker.local", mqtt_publish_raw_packets=True - ) - mqtt_publisher.connected = False - - data = await build_health_data(False, None) - - assert data["mqtt_status"] == "disconnected" - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected + assert data["fanout_statuses"] == mock_statuses @pytest.mark.asyncio async def test_health_status_ok_when_connected(self, test_db): diff --git a/tests/test_migrations.py b/tests/test_migrations.py index 5c31898..112efd2 100644 --- a/tests/test_migrations.py +++ b/tests/test_migrations.py @@ -100,8 +100,8 @@ class TestMigration001: # Run migrations applied = await run_migrations(conn) - assert applied == 35 # All migrations run - assert await get_version(conn) == 35 + assert applied == 36 # All migrations run + assert await get_version(conn) == 36 # Verify columns exist by inserting and selecting await conn.execute( @@ -183,9 +183,9 @@ class TestMigration001: applied1 = await run_migrations(conn) applied2 = await run_migrations(conn) - assert applied1 == 35 # All migrations run + assert applied1 == 36 # All migrations run assert applied2 == 0 # No migrations on second run - assert await get_version(conn) == 35 + assert await get_version(conn) == 36 finally: await conn.close() @@ -246,8 +246,8 @@ class TestMigration001: applied = await run_migrations(conn) # All migrations applied (version incremented) but no error - assert applied == 35 - assert await get_version(conn) == 35 + assert applied == 36 + assert await get_version(conn) == 36 finally: await conn.close() @@ -374,10 +374,10 @@ class TestMigration013: ) await conn.commit() - # Run migration 13 (plus 14-34 which also run) + # Run migration 13 (plus 14-36 which also run) applied = await run_migrations(conn) - assert applied == 23 - assert await get_version(conn) == 35 + assert applied == 24 + assert await get_version(conn) == 36 # Verify bots array was created with migrated data cursor = await conn.execute("SELECT bots FROM app_settings WHERE id = 1") @@ -497,7 +497,7 @@ class TestMigration018: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 35 + assert await get_version(conn) == 36 # Verify autoindex is gone cursor = await conn.execute( @@ -575,8 +575,8 @@ class TestMigration018: await conn.commit() applied = await run_migrations(conn) - assert applied == 18 # Migrations 18-35 run (18+19 skip internally) - assert await get_version(conn) == 35 + assert applied == 19 # Migrations 18-36 run (18+19 skip internally) + assert await get_version(conn) == 36 finally: await conn.close() @@ -648,7 +648,7 @@ class TestMigration019: assert await cursor.fetchone() is not None await run_migrations(conn) - assert await get_version(conn) == 35 + assert await get_version(conn) == 36 # Verify autoindex is gone cursor = await conn.execute( @@ -714,8 +714,8 @@ class TestMigration020: assert (await cursor.fetchone())[0] == "delete" applied = await run_migrations(conn) - assert applied == 16 # Migrations 20-35 - assert await get_version(conn) == 35 + assert applied == 17 # Migrations 20-36 + assert await get_version(conn) == 36 # Verify WAL mode cursor = await conn.execute("PRAGMA journal_mode") @@ -745,7 +745,7 @@ class TestMigration020: await set_version(conn, 20) applied = await run_migrations(conn) - assert applied == 15 # Migrations 21-35 still run + assert applied == 16 # Migrations 21-36 still run # Still WAL + INCREMENTAL cursor = await conn.execute("PRAGMA journal_mode") @@ -803,8 +803,8 @@ class TestMigration028: await conn.commit() applied = await run_migrations(conn) - assert applied == 8 - assert await get_version(conn) == 35 + assert applied == 9 + assert await get_version(conn) == 36 # Verify payload_hash column is now BLOB cursor = await conn.execute("PRAGMA table_info(raw_packets)") @@ -873,8 +873,8 @@ class TestMigration028: await conn.commit() applied = await run_migrations(conn) - assert applied == 8 # Version still bumped - assert await get_version(conn) == 35 + assert applied == 9 # Version still bumped + assert await get_version(conn) == 36 # Verify data unchanged cursor = await conn.execute("SELECT payload_hash FROM raw_packets") @@ -923,8 +923,8 @@ class TestMigration032: await conn.commit() applied = await run_migrations(conn) - assert applied == 4 - assert await get_version(conn) == 35 + assert applied == 5 + assert await get_version(conn) == 36 # Verify all columns exist with correct defaults cursor = await conn.execute( @@ -996,8 +996,8 @@ class TestMigration034: await conn.commit() applied = await run_migrations(conn) - assert applied == 2 - assert await get_version(conn) == 35 + assert applied == 3 + assert await get_version(conn) == 36 # Verify column exists with correct default cursor = await conn.execute("SELECT flood_scope FROM app_settings WHERE id = 1") @@ -1039,8 +1039,8 @@ class TestMigration033: await conn.commit() applied = await run_migrations(conn) - assert applied == 3 - assert await get_version(conn) == 35 + assert applied == 4 + assert await get_version(conn) == 36 cursor = await conn.execute( "SELECT key, name, is_hashtag, on_radio FROM channels WHERE key = ?", diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py index 7be3473..d17ee49 100644 --- a/tests/test_mqtt.py +++ b/tests/test_mqtt.py @@ -6,11 +6,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from app.models import AppSettings -from app.mqtt import ( - MqttPublisher, - _build_message_topic, - _build_raw_packet_topic, -) +from app.mqtt import MqttPublisher, _build_message_topic, _build_raw_packet_topic def _make_settings(**overrides) -> AppSettings: @@ -162,114 +158,6 @@ class TestMqttPublisher: assert pub._client is None -class TestMqttBroadcast: - @pytest.mark.asyncio - async def test_mqtt_broadcast_skips_when_disconnected(self): - """mqtt_broadcast should return immediately if publisher is disconnected.""" - from app.mqtt import mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - - try: - mqtt_publisher.connected = False - mqtt_publisher._settings = _make_settings() - - # This should not create any tasks or fail - from app.mqtt import mqtt_broadcast - - mqtt_broadcast("message", {"type": "PRIV", "conversation_key": "abc"}) - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - @pytest.mark.asyncio - async def test_mqtt_maybe_publish_message(self): - """_mqtt_maybe_publish should call publish for message events.""" - from app.mqtt import _mqtt_maybe_publish, mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - - try: - mqtt_publisher._settings = _make_settings(mqtt_publish_messages=True) - mqtt_publisher.connected = True - - with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: - await _mqtt_maybe_publish("message", {"type": "PRIV", "conversation_key": "abc123"}) - mock_pub.assert_called_once() - topic = mock_pub.call_args[0][0] - assert topic == "meshcore/dm:abc123" - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - @pytest.mark.asyncio - async def test_mqtt_maybe_publish_raw_packet(self): - """_mqtt_maybe_publish should call publish for raw_packet events.""" - from app.mqtt import _mqtt_maybe_publish, mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - - try: - mqtt_publisher._settings = _make_settings(mqtt_publish_raw_packets=True) - mqtt_publisher.connected = True - - with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: - await _mqtt_maybe_publish( - "raw_packet", - {"decrypted_info": {"channel_key": "ch1", "contact_key": None}}, - ) - mock_pub.assert_called_once() - topic = mock_pub.call_args[0][0] - assert topic == "meshcore/raw/gm:ch1" - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - @pytest.mark.asyncio - async def test_mqtt_maybe_publish_skips_disabled_messages(self): - """_mqtt_maybe_publish should skip messages when publish_messages is False.""" - from app.mqtt import _mqtt_maybe_publish, mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - - try: - mqtt_publisher._settings = _make_settings(mqtt_publish_messages=False) - mqtt_publisher.connected = True - - with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: - await _mqtt_maybe_publish("message", {"type": "PRIV", "conversation_key": "abc"}) - mock_pub.assert_not_called() - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - @pytest.mark.asyncio - async def test_mqtt_maybe_publish_skips_disabled_raw_packets(self): - """_mqtt_maybe_publish should skip raw_packets when publish_raw_packets is False.""" - from app.mqtt import _mqtt_maybe_publish, mqtt_publisher - - original_settings = mqtt_publisher._settings - original_connected = mqtt_publisher.connected - - try: - mqtt_publisher._settings = _make_settings(mqtt_publish_raw_packets=False) - mqtt_publisher.connected = True - - with patch.object(mqtt_publisher, "publish", new_callable=AsyncMock) as mock_pub: - await _mqtt_maybe_publish( - "raw_packet", - {"decrypted_info": None}, - ) - mock_pub.assert_not_called() - finally: - mqtt_publisher._settings = original_settings - mqtt_publisher.connected = original_connected - - class TestBuildTlsContext: def test_returns_none_when_tls_disabled(self): settings = _make_settings(mqtt_use_tls=False) diff --git a/tests/test_settings_router.py b/tests/test_settings_router.py index 7120742..bb7d542 100644 --- a/tests/test_settings_router.py +++ b/tests/test_settings_router.py @@ -68,130 +68,6 @@ class TestUpdateSettings: assert exc.value.status_code == 400 assert "syntax error" in exc.value.detail.lower() - @pytest.mark.asyncio - async def test_mqtt_fields_round_trip(self, test_db): - """MQTT settings should be saved and retrieved correctly.""" - mock_publisher = type("MockPublisher", (), {"restart": AsyncMock()})() - with patch("app.mqtt.mqtt_publisher", mock_publisher): - result = await update_settings( - AppSettingsUpdate( - mqtt_broker_host="broker.test", - mqtt_broker_port=8883, - mqtt_username="user", - mqtt_password="pass", - mqtt_use_tls=True, - mqtt_tls_insecure=True, - mqtt_topic_prefix="custom", - mqtt_publish_messages=True, - mqtt_publish_raw_packets=True, - ) - ) - - assert result.mqtt_broker_host == "broker.test" - assert result.mqtt_broker_port == 8883 - assert result.mqtt_username == "user" - assert result.mqtt_password == "pass" - assert result.mqtt_use_tls is True - assert result.mqtt_tls_insecure is True - assert result.mqtt_topic_prefix == "custom" - assert result.mqtt_publish_messages is True - assert result.mqtt_publish_raw_packets is True - - # Verify persistence - fresh = await AppSettingsRepository.get() - assert fresh.mqtt_broker_host == "broker.test" - assert fresh.mqtt_use_tls is True - - @pytest.mark.asyncio - async def test_mqtt_defaults_on_fresh_db(self, test_db): - """MQTT fields should have correct defaults on a fresh database.""" - settings = await AppSettingsRepository.get() - - assert settings.mqtt_broker_host == "" - assert settings.mqtt_broker_port == 1883 - assert settings.mqtt_username == "" - assert settings.mqtt_password == "" - assert settings.mqtt_use_tls is False - assert settings.mqtt_tls_insecure is False - assert settings.mqtt_topic_prefix == "meshcore" - assert settings.mqtt_publish_messages is False - assert settings.mqtt_publish_raw_packets is False - - @pytest.mark.asyncio - async def test_community_mqtt_fields_round_trip(self, test_db): - """Community MQTT settings should be saved and retrieved correctly.""" - mock_community = type("MockCommunity", (), {"restart": AsyncMock()})() - with patch("app.community_mqtt.community_publisher", mock_community): - result = await update_settings( - AppSettingsUpdate( - community_mqtt_enabled=True, - community_mqtt_iata="DEN", - community_mqtt_broker_host="custom-broker.example.com", - community_mqtt_broker_port=8883, - community_mqtt_email="test@example.com", - ) - ) - - assert result.community_mqtt_enabled is True - assert result.community_mqtt_iata == "DEN" - assert result.community_mqtt_broker_host == "custom-broker.example.com" - assert result.community_mqtt_broker_port == 8883 - assert result.community_mqtt_email == "test@example.com" - - # Verify persistence - fresh = await AppSettingsRepository.get() - assert fresh.community_mqtt_enabled is True - assert fresh.community_mqtt_iata == "DEN" - assert fresh.community_mqtt_broker_host == "custom-broker.example.com" - assert fresh.community_mqtt_broker_port == 8883 - assert fresh.community_mqtt_email == "test@example.com" - - # Verify restart was called - mock_community.restart.assert_called_once() - - @pytest.mark.asyncio - async def test_community_mqtt_iata_validation_rejects_invalid(self, test_db): - """Invalid IATA codes should be rejected.""" - with pytest.raises(HTTPException) as exc: - await update_settings(AppSettingsUpdate(community_mqtt_iata="A")) - assert exc.value.status_code == 400 - - with pytest.raises(HTTPException) as exc: - await update_settings(AppSettingsUpdate(community_mqtt_iata="ABCDE")) - assert exc.value.status_code == 400 - - with pytest.raises(HTTPException) as exc: - await update_settings(AppSettingsUpdate(community_mqtt_iata="12")) - assert exc.value.status_code == 400 - - with pytest.raises(HTTPException) as exc: - await update_settings(AppSettingsUpdate(community_mqtt_iata="ABCD")) - assert exc.value.status_code == 400 - - @pytest.mark.asyncio - async def test_community_mqtt_enable_requires_iata(self, test_db): - """Enabling community MQTT without a valid IATA code should be rejected.""" - with pytest.raises(HTTPException) as exc: - await update_settings(AppSettingsUpdate(community_mqtt_enabled=True)) - assert exc.value.status_code == 400 - assert "IATA" in exc.value.detail - - @pytest.mark.asyncio - async def test_community_mqtt_iata_uppercased(self, test_db): - """IATA codes should be uppercased.""" - mock_community = type("MockCommunity", (), {"restart": AsyncMock()})() - with patch("app.community_mqtt.community_publisher", mock_community): - result = await update_settings(AppSettingsUpdate(community_mqtt_iata="den")) - assert result.community_mqtt_iata == "DEN" - - @pytest.mark.asyncio - async def test_community_mqtt_defaults_on_fresh_db(self, test_db): - """Community MQTT fields should have correct defaults on a fresh database.""" - settings = await AppSettingsRepository.get() - assert settings.community_mqtt_enabled is False - assert settings.community_mqtt_iata == "" - assert settings.community_mqtt_email == "" - @pytest.mark.asyncio async def test_flood_scope_round_trip(self, test_db): """Flood scope should be saved and retrieved correctly.""" diff --git a/tests/test_websocket.py b/tests/test_websocket.py index ab816c0..7bd361f 100644 --- a/tests/test_websocket.py +++ b/tests/test_websocket.py @@ -206,45 +206,42 @@ class TestWebSocketConnectionManagement: class TestBroadcastEventFanout: - """Test that broadcast_event dispatches to WS, private MQTT, and community MQTT.""" + """Test that broadcast_event dispatches to WS and fanout manager.""" @pytest.mark.asyncio - async def test_broadcast_event_dispatches_to_all_three_sinks(self): - """broadcast_event creates a WS task, calls mqtt_broadcast, and - calls community_mqtt_broadcast.""" + async def test_broadcast_event_dispatches_to_ws_and_fanout(self): + """broadcast_event creates a WS task and dispatches to fanout manager.""" from app.websocket import broadcast_event with ( patch("app.websocket.ws_manager") as mock_ws, - patch("app.mqtt.mqtt_broadcast") as mock_mqtt, - patch("app.community_mqtt.community_mqtt_broadcast") as mock_community, + patch("app.fanout.manager.fanout_manager") as mock_fm, ): mock_ws.broadcast = AsyncMock() + mock_fm.broadcast_message = AsyncMock() broadcast_event("message", {"id": 1, "text": "hello"}) - # Let the asyncio task (ws_manager.broadcast) run + # Let the asyncio tasks run await asyncio.sleep(0) mock_ws.broadcast.assert_called_once_with("message", {"id": 1, "text": "hello"}) - mock_mqtt.assert_called_once_with("message", {"id": 1, "text": "hello"}) - mock_community.assert_called_once_with("message", {"id": 1, "text": "hello"}) + mock_fm.broadcast_message.assert_called_once_with({"id": 1, "text": "hello"}) @pytest.mark.asyncio - async def test_broadcast_event_passes_event_type_to_mqtt_filters(self): - """MQTT sinks receive the event_type so they can filter by message vs raw_packet.""" + async def test_broadcast_event_raw_packet_dispatches_to_fanout(self): + """broadcast_event for raw_packet dispatches to fanout broadcast_raw.""" from app.websocket import broadcast_event with ( patch("app.websocket.ws_manager") as mock_ws, - patch("app.mqtt.mqtt_broadcast") as mock_mqtt, - patch("app.community_mqtt.community_mqtt_broadcast") as mock_community, + patch("app.fanout.manager.fanout_manager") as mock_fm, ): mock_ws.broadcast = AsyncMock() + mock_fm.broadcast_raw = AsyncMock() broadcast_event("raw_packet", {"data": "ff00"}) await asyncio.sleep(0) - # Both MQTT sinks receive the event type for filtering - assert mock_mqtt.call_args.args[0] == "raw_packet" - assert mock_community.call_args.args[0] == "raw_packet" + mock_ws.broadcast.assert_called_once() + mock_fm.broadcast_raw.assert_called_once_with({"data": "ff00"})