diff --git a/app/fanout/community_mqtt.py b/app/fanout/community_mqtt.py index 22524bc..aafee63 100644 --- a/app/fanout/community_mqtt.py +++ b/app/fanout/community_mqtt.py @@ -52,8 +52,15 @@ class CommunityMqttSettings(Protocol): community_mqtt_enabled: bool community_mqtt_broker_host: str community_mqtt_broker_port: int + community_mqtt_transport: str + community_mqtt_use_tls: bool + community_mqtt_tls_verify: bool + community_mqtt_auth_mode: str + community_mqtt_username: str + community_mqtt_password: str community_mqtt_iata: str community_mqtt_email: str + community_mqtt_token_audience: str def _base64url_encode(data: bytes) -> str: @@ -327,11 +334,18 @@ class CommunityMqttPublisher(BaseMqttPublisher): await super().start(settings) def _on_not_configured(self) -> None: - from app.keystore import has_private_key + from app.keystore import get_public_key, has_private_key from app.websocket import broadcast_error s: CommunityMqttSettings | None = self._settings - if s and not has_private_key() and not self._key_unavailable_warned: + auth_mode = getattr(s, "community_mqtt_auth_mode", "token") if s else "token" + if ( + s + and auth_mode == "token" + and get_public_key() is not None + and not has_private_key() + and not self._key_unavailable_warned + ): broadcast_error( "Community MQTT unavailable", "Radio firmware does not support private key export.", @@ -340,10 +354,17 @@ class CommunityMqttPublisher(BaseMqttPublisher): def _is_configured(self) -> bool: """Check if community MQTT is enabled and keys are available.""" - from app.keystore import has_private_key + from app.keystore import get_public_key, has_private_key s: CommunityMqttSettings | None = self._settings - return bool(s and s.community_mqtt_enabled and has_private_key()) + if not s or not s.community_mqtt_enabled: + return False + if get_public_key() is None: + return False + auth_mode = getattr(s, "community_mqtt_auth_mode", "token") + if auth_mode == "token": + return has_private_key() + return True def _build_client_kwargs(self, settings: object) -> dict[str, Any]: s: CommunityMqttSettings = settings # type: ignore[assignment] @@ -352,19 +373,23 @@ class CommunityMqttPublisher(BaseMqttPublisher): private_key = get_private_key() public_key = get_public_key() - assert private_key is not None and public_key is not None # guaranteed by _pre_connect + assert public_key is not None # guaranteed by _pre_connect pubkey_hex = public_key.hex().upper() broker_host = s.community_mqtt_broker_host or _DEFAULT_BROKER broker_port = s.community_mqtt_broker_port or _DEFAULT_PORT - jwt_token = _generate_jwt_token( - private_key, - public_key, - audience=broker_host, - email=s.community_mqtt_email or "", - ) + transport = s.community_mqtt_transport or "websockets" + use_tls = bool(s.community_mqtt_use_tls) + tls_verify = bool(s.community_mqtt_tls_verify) + auth_mode = s.community_mqtt_auth_mode or "token" + secure_connection = use_tls and tls_verify - tls_context = ssl.create_default_context() + tls_context: ssl.SSLContext | None = None + if use_tls: + tls_context = ssl.create_default_context() + if not tls_verify: + tls_context.check_hostname = False + tls_context.verify_mode = ssl.CERT_NONE device_name = "" if radio_manager.meshcore and radio_manager.meshcore.self_info: @@ -380,16 +405,30 @@ class CommunityMqttPublisher(BaseMqttPublisher): } ) - return { + kwargs: dict[str, Any] = { "hostname": broker_host, "port": broker_port, - "transport": "websockets", + "transport": transport, "tls_context": tls_context, - "websocket_path": "/", - "username": f"v1_{pubkey_hex}", - "password": jwt_token, "will": aiomqtt.Will(status_topic, offline_payload, retain=True), } + if auth_mode == "token": + assert private_key is not None + token_audience = (s.community_mqtt_token_audience or "").strip() or broker_host + jwt_token = _generate_jwt_token( + private_key, + public_key, + audience=token_audience, + email=(s.community_mqtt_email or "") if secure_connection else "", + ) + kwargs["username"] = f"v1_{pubkey_hex}" + kwargs["password"] = jwt_token + elif auth_mode == "password": + kwargs["username"] = s.community_mqtt_username or None + kwargs["password"] = s.community_mqtt_password or None + if transport == "websockets": + kwargs["websocket_path"] = "/" + return kwargs def _on_connected(self, settings: object) -> tuple[str, str]: s: CommunityMqttSettings = settings # type: ignore[assignment] @@ -543,7 +582,9 @@ class CommunityMqttPublisher(BaseMqttPublisher): if not self.connected: logger.info("Community MQTT publish failure detected, reconnecting") return True - if elapsed >= _TOKEN_RENEWAL_THRESHOLD: + s: CommunityMqttSettings | None = self._settings + auth_mode = getattr(s, "community_mqtt_auth_mode", "token") if s else "token" + if auth_mode == "token" and elapsed >= _TOKEN_RENEWAL_THRESHOLD: logger.info("Community MQTT JWT nearing expiry, reconnecting") return True return False @@ -551,9 +592,11 @@ class CommunityMqttPublisher(BaseMqttPublisher): async def _pre_connect(self, settings: object) -> bool: from app.keystore import get_private_key, get_public_key + s: CommunityMqttSettings = settings # type: ignore[assignment] + auth_mode = s.community_mqtt_auth_mode or "token" private_key = get_private_key() public_key = get_public_key() - if private_key is None or public_key is None: + if public_key is None or (auth_mode == "token" and private_key is None): # Keys not available yet, wait for settings change or key export self.connected = False self._version_event.clear() diff --git a/app/fanout/mqtt_community.py b/app/fanout/mqtt_community.py index 982efa6..ee63180 100644 --- a/app/fanout/mqtt_community.py +++ b/app/fanout/mqtt_community.py @@ -4,6 +4,7 @@ from __future__ import annotations import logging import re +import string from types import SimpleNamespace from typing import Any @@ -13,6 +14,37 @@ from app.fanout.community_mqtt import CommunityMqttPublisher, _format_raw_packet logger = logging.getLogger(__name__) _IATA_RE = re.compile(r"^[A-Z]{3}$") +_DEFAULT_PACKET_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets" +_TOPIC_TEMPLATE_FIELD_CANONICAL = { + "iata": "IATA", + "public_key": "PUBLIC_KEY", +} + + +def _normalize_topic_template(topic_template: str) -> str: + """Normalize packet topic template fields to canonical uppercase placeholders.""" + template = topic_template.strip() or _DEFAULT_PACKET_TOPIC_TEMPLATE + parts: list[str] = [] + try: + parsed = string.Formatter().parse(template) + for literal_text, field_name, format_spec, conversion in parsed: + parts.append(literal_text) + if field_name is None: + continue + normalized_field = _TOPIC_TEMPLATE_FIELD_CANONICAL.get(field_name.lower()) + if normalized_field is None: + raise ValueError(f"Unsupported topic template field(s): {field_name}") + replacement = ["{", normalized_field] + if conversion: + replacement.extend(["!", conversion]) + if format_spec: + replacement.extend([":", format_spec]) + replacement.append("}") + parts.append("".join(replacement)) + except ValueError: + raise + + return "".join(parts) def _config_to_settings(config: dict) -> SimpleNamespace: @@ -21,11 +53,24 @@ def _config_to_settings(config: dict) -> SimpleNamespace: community_mqtt_enabled=True, community_mqtt_broker_host=config.get("broker_host", "mqtt-us-v1.letsmesh.net"), community_mqtt_broker_port=config.get("broker_port", 443), + community_mqtt_transport=config.get("transport", "websockets"), + community_mqtt_use_tls=config.get("use_tls", True), + community_mqtt_tls_verify=config.get("tls_verify", True), + community_mqtt_auth_mode=config.get("auth_mode", "token"), + community_mqtt_username=config.get("username", ""), + community_mqtt_password=config.get("password", ""), community_mqtt_iata=config.get("iata", ""), community_mqtt_email=config.get("email", ""), + community_mqtt_token_audience=config.get("token_audience", ""), ) +def _render_packet_topic(topic_template: str, *, iata: str, public_key: str) -> str: + """Render the configured raw-packet publish topic.""" + template = _normalize_topic_template(topic_template) + return template.format(IATA=iata, PUBLIC_KEY=public_key) + + class MqttCommunityModule(FanoutModule): """Wraps a CommunityMqttPublisher for community packet sharing.""" @@ -81,7 +126,11 @@ async def _publish_community_packet( if not _IATA_RE.fullmatch(iata): logger.debug("Community MQTT: skipping publish — no valid IATA code configured") return - topic = f"meshcore/{iata}/{pubkey_hex}/packets" + topic = _render_packet_topic( + str(config.get("topic_template", _DEFAULT_PACKET_TOPIC_TEMPLATE)), + iata=iata, + public_key=pubkey_hex, + ) await publisher.publish(topic, packet) diff --git a/app/routers/fanout.py b/app/routers/fanout.py index 476c956..0c1784e 100644 --- a/app/routers/fanout.py +++ b/app/routers/fanout.py @@ -2,6 +2,7 @@ import logging import re +import string from fastapi import APIRouter, HTTPException from pydantic import BaseModel, Field @@ -15,6 +16,48 @@ router = APIRouter(prefix="/fanout", tags=["fanout"]) _VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise"} _IATA_RE = re.compile(r"^[A-Z]{3}$") +_DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets" +_DEFAULT_COMMUNITY_MQTT_BROKER_HOST = "mqtt-us-v1.letsmesh.net" +_DEFAULT_COMMUNITY_MQTT_BROKER_PORT = 443 +_DEFAULT_COMMUNITY_MQTT_TRANSPORT = "websockets" +_DEFAULT_COMMUNITY_MQTT_AUTH_MODE = "token" +_COMMUNITY_MQTT_TEMPLATE_FIELD_CANONICAL = { + "iata": "IATA", + "public_key": "PUBLIC_KEY", +} +_ALLOWED_COMMUNITY_MQTT_TRANSPORTS = {"tcp", "websockets"} +_ALLOWED_COMMUNITY_MQTT_AUTH_MODES = {"token", "password", "none"} + + +def _normalize_community_topic_template(topic_template: str) -> str: + """Normalize Community MQTT topic template placeholders to canonical uppercase form.""" + template = topic_template.strip() or _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE + parts: list[str] = [] + try: + parsed = string.Formatter().parse(template) + for literal_text, field_name, format_spec, conversion in parsed: + parts.append(literal_text) + if field_name is None: + continue + normalized_field = _COMMUNITY_MQTT_TEMPLATE_FIELD_CANONICAL.get(field_name.lower()) + if normalized_field is None: + raise HTTPException( + status_code=400, + detail=( + f"topic_template may only use {{IATA}} and {{PUBLIC_KEY}}; got {field_name}" + ), + ) + replacement = ["{", normalized_field] + if conversion: + replacement.extend(["!", conversion]) + if format_spec: + replacement.extend([":", format_spec]) + replacement.append("}") + parts.append("".join(replacement)) + except ValueError as exc: + raise HTTPException(status_code=400, detail=f"Invalid topic_template: {exc}") from None + + return "".join(parts) class FanoutConfigCreate(BaseModel): @@ -43,6 +86,46 @@ def _validate_mqtt_private_config(config: dict) -> None: def _validate_mqtt_community_config(config: dict) -> None: """Validate mqtt_community config blob. Normalizes IATA to uppercase.""" + broker_host = str(config.get("broker_host", _DEFAULT_COMMUNITY_MQTT_BROKER_HOST)).strip() + if not broker_host: + broker_host = _DEFAULT_COMMUNITY_MQTT_BROKER_HOST + config["broker_host"] = broker_host + + port = config.get("broker_port", _DEFAULT_COMMUNITY_MQTT_BROKER_PORT) + if not isinstance(port, int) or port < 1 or port > 65535: + raise HTTPException(status_code=400, detail="broker_port must be between 1 and 65535") + config["broker_port"] = port + + transport = str(config.get("transport", _DEFAULT_COMMUNITY_MQTT_TRANSPORT)).strip().lower() + if transport not in _ALLOWED_COMMUNITY_MQTT_TRANSPORTS: + raise HTTPException( + status_code=400, + detail="transport must be 'websockets' or 'tcp'", + ) + config["transport"] = transport + config["use_tls"] = bool(config.get("use_tls", True)) + config["tls_verify"] = bool(config.get("tls_verify", True)) + + auth_mode = str(config.get("auth_mode", _DEFAULT_COMMUNITY_MQTT_AUTH_MODE)).strip().lower() + if auth_mode not in _ALLOWED_COMMUNITY_MQTT_AUTH_MODES: + raise HTTPException( + status_code=400, + detail="auth_mode must be 'token', 'password', or 'none'", + ) + config["auth_mode"] = auth_mode + username = str(config.get("username", "")).strip() + password = str(config.get("password", "")).strip() + if auth_mode == "password" and (not username or not password): + raise HTTPException( + status_code=400, + detail="username and password are required when auth_mode is 'password'", + ) + config["username"] = username + config["password"] = password + + token_audience = str(config.get("token_audience", "")).strip() + config["token_audience"] = token_audience + iata = config.get("iata", "").upper().strip() if not iata or not _IATA_RE.fullmatch(iata): raise HTTPException( @@ -51,6 +134,14 @@ def _validate_mqtt_community_config(config: dict) -> None: ) config["iata"] = iata + topic_template = str( + config.get("topic_template", _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE) + ).strip() + if not topic_template: + topic_template = _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE + + config["topic_template"] = _normalize_community_topic_template(topic_template) + def _validate_bot_config(config: dict) -> None: """Validate bot config blob (syntax-check the code).""" diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx index 4c2c137..bac330c 100644 --- a/frontend/src/components/settings/SettingsFanoutSection.tsx +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -14,7 +14,7 @@ const BotCodeEditor = lazy(() => const TYPE_LABELS: Record = { mqtt_private: 'Private MQTT', - mqtt_community: 'Community MQTT', + mqtt_community: 'Community MQTT/mesh2mqtt', bot: 'Bot', webhook: 'Webhook', apprise: 'Apprise', @@ -22,12 +22,32 @@ const TYPE_LABELS: Record = { const TYPE_OPTIONS = [ { value: 'mqtt_private', label: 'Private MQTT' }, - { value: 'mqtt_community', label: 'Community MQTT' }, + { value: 'mqtt_community', label: 'Community MQTT/mesh2mqtt' }, { value: 'bot', label: 'Bot' }, { value: 'webhook', label: 'Webhook' }, { value: 'apprise', label: 'Apprise' }, ]; +const DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE = 'meshcore/{IATA}/{PUBLIC_KEY}/packets'; +const DEFAULT_COMMUNITY_BROKER_HOST = 'mqtt-us-v1.letsmesh.net'; +const DEFAULT_COMMUNITY_BROKER_PORT = 443; +const DEFAULT_COMMUNITY_TRANSPORT = 'websockets'; +const DEFAULT_COMMUNITY_AUTH_MODE = 'token'; + +function formatBrokerSummary( + config: Record, + defaults: { host: string; port: number } +) { + const host = (config.broker_host as string) || defaults.host; + const port = typeof config.broker_port === 'number' ? config.broker_port : defaults.port; + return `${host}:${port}`; +} + +function formatPrivateTopicSummary(config: Record) { + const prefix = (config.topic_prefix as string) || 'meshcore'; + return `${prefix}/dm:, ${prefix}/gm:, ${prefix}/raw/...`; +} + const DEFAULT_BOT_CODE = `def bot( sender_name: str | None, sender_key: str | None, @@ -75,10 +95,11 @@ function getStatusLabel(status: string | undefined, type?: string) { } function getStatusColor(status: string | undefined, enabled?: boolean) { - if (enabled === false) return 'bg-warning shadow-[0_0_6px_hsl(var(--warning)/0.5)]'; + if (enabled === false) return 'bg-muted-foreground'; if (status === 'connected') return 'bg-status-connected shadow-[0_0_6px_hsl(var(--status-connected)/0.5)]'; - if (status === 'error') return 'bg-destructive shadow-[0_0_6px_hsl(var(--destructive)/0.5)]'; + if (status === 'error' || status === 'disconnected') + return 'bg-destructive shadow-[0_0_6px_hsl(var(--destructive)/0.5)]'; return 'bg-muted-foreground'; } @@ -201,6 +222,8 @@ function MqttCommunityConfigEditor({ config: Record; onChange: (config: Record) => void; }) { + const authMode = (config.auth_mode as string) || DEFAULT_COMMUNITY_AUTH_MODE; + return (

@@ -214,8 +237,8 @@ function MqttCommunityConfigEditor({ onChange({ ...config, broker_host: e.target.value })} />

@@ -226,14 +249,124 @@ function MqttCommunityConfigEditor({ type="number" min="1" max="65535" - value={(config.broker_port as number) || 443} + value={(config.broker_port as number) || DEFAULT_COMMUNITY_BROKER_PORT} onChange={(e) => - onChange({ ...config, broker_port: parseInt(e.target.value, 10) || 443 }) + onChange({ + ...config, + broker_port: parseInt(e.target.value, 10) || DEFAULT_COMMUNITY_BROKER_PORT, + }) } /> +
+
+ + +
+
+ + +
+
+ +

+ LetsMesh uses token auth. MeshRank uses none. +

+ + {authMode === 'token' && ( +
+
+ + onChange({ ...config, token_audience: e.target.value })} + /> +

Defaults to the broker host when blank

+
+
+ + onChange({ ...config, email: e.target.value })} + /> +

+ Used to claim your node on the community aggregator +

+
+
+ )} + + {authMode === 'password' && ( +
+
+ + onChange({ ...config, username: e.target.value })} + /> +
+
+ + onChange({ ...config, password: e.target.value })} + /> +
+
+ )} + +
+ + + +
+
- + onChange({ ...config, email: e.target.value })} + id="fanout-comm-topic-template" + type="text" + value={(config.topic_template as string) || DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE} + onChange={(e) => onChange({ ...config, topic_template: e.target.value })} />

- Used to claim your node on the community aggregator + Use {'{IATA}'} and {'{PUBLIC_KEY}'}. Default:{' '} + {DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE}

@@ -943,9 +1076,17 @@ export function SettingsFanoutSection({ }, mqtt_community: { broker_host: 'mqtt-us-v1.letsmesh.net', - broker_port: 443, + broker_port: DEFAULT_COMMUNITY_BROKER_PORT, + transport: DEFAULT_COMMUNITY_TRANSPORT, + use_tls: true, + tls_verify: true, + auth_mode: DEFAULT_COMMUNITY_AUTH_MODE, + username: '', + password: '', iata: '', email: '', + token_audience: '', + topic_template: DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE, }, bot: { code: DEFAULT_BOT_CODE, @@ -1093,7 +1234,7 @@ export function SettingsFanoutSection({ )}
- Add: + Add a new entry: {TYPE_OPTIONS.filter((opt) => opt.value !== 'bot' || !health?.bots_disabled).map((opt) => (
+ + {cfg.type === 'mqtt_community' && ( +
+
+ Broker:{' '} + {formatBrokerSummary(communityConfig, { + host: DEFAULT_COMMUNITY_BROKER_HOST, + port: DEFAULT_COMMUNITY_BROKER_PORT, + })} +
+
+ Topic:{' '} + + {(communityConfig.topic_template as string) || + DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE} + +
+
+ )} + + {cfg.type === 'mqtt_private' && ( +
+
+ Broker:{' '} + {formatBrokerSummary(cfg.config as Record, { + host: '', + port: 1883, + })} +
+
+ Topics:{' '} + + {formatPrivateTopicSummary(cfg.config as Record)} + +
+
+ )} ); })} diff --git a/frontend/src/test/fanoutSection.test.tsx b/frontend/src/test/fanoutSection.test.tsx index d5b79e9..5137090 100644 --- a/frontend/src/test/fanoutSection.test.tsx +++ b/frontend/src/test/fanoutSection.test.tsx @@ -66,12 +66,20 @@ describe('SettingsFanoutSection', () => { renderSection(); await waitFor(() => { expect(screen.getByRole('button', { name: 'Private MQTT' })).toBeInTheDocument(); + expect(screen.getByRole('button', { name: 'Community MQTT/mesh2mqtt' })).toBeInTheDocument(); expect(screen.getByRole('button', { name: 'Webhook' })).toBeInTheDocument(); expect(screen.getByRole('button', { name: 'Apprise' })).toBeInTheDocument(); expect(screen.getByRole('button', { name: 'Bot' })).toBeInTheDocument(); }); }); + it('shows updated add label phrasing', async () => { + renderSection(); + await waitFor(() => { + expect(screen.getByText('Add a new entry:')).toBeInTheDocument(); + }); + }); + it('hides bot add button when bots_disabled', async () => { renderSection({ health: { ...baseHealth, bots_disabled: true } }); await waitFor(() => { @@ -265,4 +273,192 @@ describe('SettingsFanoutSection', () => { expect(screen.getByLabelText(/URL/)).toBeInTheDocument(); }); }); + + it('community MQTT editor exposes packet topic template', async () => { + const communityConfig: FanoutConfig = { + id: 'comm-1', + type: 'mqtt_community', + name: 'Community MQTT/mesh2mqtt', + enabled: false, + config: { + broker_host: 'mqtt-us-v1.letsmesh.net', + broker_port: 443, + transport: 'tcp', + use_tls: true, + tls_verify: true, + auth_mode: 'token', + iata: 'LAX', + email: '', + token_audience: 'meshrank.net', + topic_template: 'mesh2mqtt/{IATA}/node/{PUBLIC_KEY}', + }, + scope: { messages: 'none', raw_packets: 'all' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]); + renderSection(); + await waitFor(() => expect(screen.getByText('Community MQTT/mesh2mqtt')).toBeInTheDocument()); + + fireEvent.click(screen.getByRole('button', { name: 'Edit' })); + await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument()); + + expect(screen.getByLabelText('Packet Topic Template')).toHaveValue( + 'mesh2mqtt/{IATA}/node/{PUBLIC_KEY}' + ); + expect(screen.getByLabelText('Transport')).toHaveValue('tcp'); + expect(screen.getByLabelText('Authentication')).toHaveValue('token'); + expect(screen.getByLabelText('Token Audience')).toHaveValue('meshrank.net'); + expect(screen.getByText(/LetsMesh uses/)).toBeInTheDocument(); + }); + + it('existing community MQTT config without auth_mode defaults to token in the editor', async () => { + const communityConfig: FanoutConfig = { + id: 'comm-legacy', + type: 'mqtt_community', + name: 'Legacy Community MQTT', + enabled: false, + config: { + broker_host: 'mqtt-us-v1.letsmesh.net', + broker_port: 443, + transport: 'websockets', + use_tls: true, + tls_verify: true, + iata: 'LAX', + email: 'user@example.com', + token_audience: '', + topic_template: 'meshcore/{IATA}/{PUBLIC_KEY}/packets', + }, + scope: { messages: 'none', raw_packets: 'all' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]); + renderSection(); + await waitFor(() => expect(screen.getByText('Legacy Community MQTT')).toBeInTheDocument()); + + fireEvent.click(screen.getByRole('button', { name: 'Edit' })); + await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument()); + + expect(screen.getByLabelText('Authentication')).toHaveValue('token'); + expect(screen.getByLabelText('Token Audience')).toBeInTheDocument(); + }); + + it('community MQTT token audience can be cleared back to blank', async () => { + const communityConfig: FanoutConfig = { + id: 'comm-1', + type: 'mqtt_community', + name: 'Community MQTT/mesh2mqtt', + enabled: false, + config: { + broker_host: 'mqtt-us-v1.letsmesh.net', + broker_port: 443, + transport: 'websockets', + use_tls: true, + tls_verify: true, + auth_mode: 'token', + iata: 'LAX', + email: '', + token_audience: 'meshrank.net', + topic_template: 'meshcore/{IATA}/{PUBLIC_KEY}/packets', + }, + scope: { messages: 'none', raw_packets: 'all' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]); + renderSection(); + await waitFor(() => expect(screen.getByText('Community MQTT/mesh2mqtt')).toBeInTheDocument()); + + fireEvent.click(screen.getByRole('button', { name: 'Edit' })); + await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument()); + + const audienceInput = screen.getByLabelText('Token Audience'); + fireEvent.change(audienceInput, { target: { value: '' } }); + + expect(audienceInput).toHaveValue(''); + }); + + it('community MQTT can be configured for no auth', async () => { + const communityConfig: FanoutConfig = { + id: 'comm-1', + type: 'mqtt_community', + name: 'Community MQTT/mesh2mqtt', + enabled: false, + config: { + broker_host: 'meshrank.net', + broker_port: 8883, + transport: 'tcp', + use_tls: true, + tls_verify: true, + auth_mode: 'none', + iata: 'LAX', + topic_template: 'meshrank/uplink/ROOM/{PUBLIC_KEY}/packets', + }, + scope: { messages: 'none', raw_packets: 'all' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]); + renderSection(); + await waitFor(() => expect(screen.getByText('Community MQTT/mesh2mqtt')).toBeInTheDocument()); + + fireEvent.click(screen.getByRole('button', { name: 'Edit' })); + await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument()); + + expect(screen.getByLabelText('Authentication')).toHaveValue('none'); + expect(screen.queryByLabelText('Token Audience')).not.toBeInTheDocument(); + }); + + it('community MQTT list shows configured packet topic', async () => { + const communityConfig: FanoutConfig = { + id: 'comm-1', + type: 'mqtt_community', + name: 'Community MQTT/mesh2mqtt', + enabled: false, + config: { + broker_host: 'mqtt-us-v1.letsmesh.net', + broker_port: 443, + transport: 'websockets', + use_tls: true, + tls_verify: true, + auth_mode: 'token', + iata: 'LAX', + email: '', + token_audience: 'mqtt-us-v1.letsmesh.net', + topic_template: 'mesh2mqtt/{IATA}/node/{PUBLIC_KEY}', + }, + scope: { messages: 'none', raw_packets: 'all' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]); + renderSection(); + + await waitFor(() => + expect(screen.getByText('Broker: mqtt-us-v1.letsmesh.net:443')).toBeInTheDocument() + ); + expect(screen.getByText('mesh2mqtt/{IATA}/node/{PUBLIC_KEY}')).toBeInTheDocument(); + expect(screen.queryByText('Region: LAX')).not.toBeInTheDocument(); + }); + + it('private MQTT list shows broker and topic summary', async () => { + const privateConfig: FanoutConfig = { + id: 'mqtt-1', + type: 'mqtt_private', + name: 'Private MQTT', + enabled: true, + config: { broker_host: 'broker.local', broker_port: 1883, topic_prefix: 'meshcore' }, + scope: { messages: 'all', raw_packets: 'all' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([privateConfig]); + renderSection(); + + await waitFor(() => expect(screen.getByText('Broker: broker.local:1883')).toBeInTheDocument()); + expect( + screen.getByText('meshcore/dm:, meshcore/gm:, meshcore/raw/...') + ).toBeInTheDocument(); + }); }); diff --git a/tests/test_community_mqtt.py b/tests/test_community_mqtt.py index 23d7ca1..fae00d0 100644 --- a/tests/test_community_mqtt.py +++ b/tests/test_community_mqtt.py @@ -23,6 +23,11 @@ from app.fanout.community_mqtt import ( _generate_jwt_token, _get_client_version, ) +from app.fanout.mqtt_community import ( + _config_to_settings, + _publish_community_packet, + _render_packet_topic, +) def _make_test_keys() -> tuple[bytes, bytes]: @@ -55,8 +60,15 @@ def _make_community_settings(**overrides) -> SimpleNamespace: "community_mqtt_enabled": True, "community_mqtt_broker_host": "mqtt-us-v1.letsmesh.net", "community_mqtt_broker_port": 443, + "community_mqtt_transport": "websockets", + "community_mqtt_use_tls": True, + "community_mqtt_tls_verify": True, + "community_mqtt_auth_mode": "token", + "community_mqtt_username": "", + "community_mqtt_password": "", "community_mqtt_iata": "", "community_mqtt_email": "", + "community_mqtt_token_audience": "mqtt-us-v1.letsmesh.net", } defaults.update(overrides) return SimpleNamespace(**defaults) @@ -390,19 +402,37 @@ class TestCommunityMqttPublisher: def test_is_configured_false_when_disabled(self): pub = CommunityMqttPublisher() pub._settings = SimpleNamespace(community_mqtt_enabled=False) - with patch("app.keystore.has_private_key", return_value=True): + with patch("app.keystore.get_public_key", return_value=b"x" * 32): assert pub._is_configured() is False def test_is_configured_false_when_no_private_key(self): pub = CommunityMqttPublisher() - pub._settings = SimpleNamespace(community_mqtt_enabled=True) - with patch("app.keystore.has_private_key", return_value=False): + pub._settings = SimpleNamespace( + community_mqtt_enabled=True, community_mqtt_auth_mode="token" + ) + with ( + patch("app.keystore.get_public_key", return_value=b"x" * 32), + patch("app.keystore.has_private_key", return_value=False), + ): assert pub._is_configured() is False def test_is_configured_true_when_enabled_with_key(self): pub = CommunityMqttPublisher() - pub._settings = SimpleNamespace(community_mqtt_enabled=True) - with patch("app.keystore.has_private_key", return_value=True): + pub._settings = SimpleNamespace( + community_mqtt_enabled=True, community_mqtt_auth_mode="token" + ) + with ( + patch("app.keystore.get_public_key", return_value=b"x" * 32), + patch("app.keystore.has_private_key", return_value=True), + ): + assert pub._is_configured() is True + + def test_is_configured_true_for_none_auth_without_private_key(self): + pub = CommunityMqttPublisher() + pub._settings = SimpleNamespace( + community_mqtt_enabled=True, community_mqtt_auth_mode="none" + ) + with patch("app.keystore.get_public_key", return_value=b"x" * 32): assert pub._is_configured() is True @@ -431,6 +461,49 @@ class TestBuildStatusTopic: assert topic == "meshcore/LAX/PUBKEY/status" +class TestRenderPacketTopic: + def test_uses_default_template(self): + topic = _render_packet_topic( + "meshcore/{IATA}/{PUBLIC_KEY}/packets", + iata="LAX", + public_key="AABB1122", + ) + assert topic == "meshcore/LAX/AABB1122/packets" + + def test_supports_custom_template(self): + topic = _render_packet_topic( + "mesh2mqtt/{IATA}/node/{PUBLIC_KEY}", + iata="PDX", + public_key="PUBKEY", + ) + assert topic == "mesh2mqtt/PDX/node/PUBKEY" + + def test_accepts_mixed_case_placeholders(self): + topic = _render_packet_topic( + "mesh2mqtt/{iata}/node/{Public_Key}", + iata="SEA", + public_key="PUBKEY", + ) + assert topic == "mesh2mqtt/SEA/node/PUBKEY" + + +class TestCommunityConfigDefaults: + def test_existing_config_without_new_fields_gets_letsmesh_defaults(self): + settings = _config_to_settings({"iata": "LAX", "email": "user@example.com"}) + + assert settings.community_mqtt_broker_host == "mqtt-us-v1.letsmesh.net" + assert settings.community_mqtt_broker_port == 443 + assert settings.community_mqtt_transport == "websockets" + assert settings.community_mqtt_use_tls is True + assert settings.community_mqtt_tls_verify is True + assert settings.community_mqtt_auth_mode == "token" + assert settings.community_mqtt_username == "" + assert settings.community_mqtt_password == "" + assert settings.community_mqtt_token_audience == "" + assert settings.community_mqtt_iata == "LAX" + assert settings.community_mqtt_email == "user@example.com" + + class TestLwtAndStatusPublish: def test_build_client_kwargs_includes_will(self): """_build_client_kwargs should return a will with offline status.""" @@ -460,6 +533,70 @@ class TestLwtAndStatusPublish: assert payload["origin_id"] == pubkey_hex assert "timestamp" in payload assert "client" not in payload + assert kwargs["transport"] == "websockets" + assert kwargs["websocket_path"] == "/" + assert kwargs["tls_context"] is not None + assert kwargs["username"] == f"v1_{pubkey_hex}" + + def test_build_client_kwargs_supports_tcp_transport_and_custom_audience(self): + pub = CommunityMqttPublisher() + private_key, public_key = _make_test_keys() + settings = _make_community_settings( + community_mqtt_broker_host="meshrank.net", + community_mqtt_broker_port=8883, + community_mqtt_transport="tcp", + community_mqtt_use_tls=True, + community_mqtt_tls_verify=True, + community_mqtt_token_audience="meshrank.net", + community_mqtt_email="user@example.com", + community_mqtt_iata="SFO", + ) + + with ( + patch("app.keystore.get_private_key", return_value=private_key), + patch("app.keystore.get_public_key", return_value=public_key), + patch("app.radio.radio_manager") as mock_radio, + ): + mock_radio.meshcore = None + kwargs = pub._build_client_kwargs(settings) + + assert kwargs["hostname"] == "meshrank.net" + assert kwargs["port"] == 8883 + assert kwargs["transport"] == "tcp" + assert "websocket_path" not in kwargs + assert kwargs["tls_context"] is not None + + payload_b64 = kwargs["password"].split(".")[1] + import base64 + + padded = payload_b64 + "=" * (4 - len(payload_b64) % 4) + payload = json.loads(base64.urlsafe_b64decode(padded)) + assert payload["aud"] == "meshrank.net" + assert payload["email"] == "user@example.com" + + def test_build_client_kwargs_supports_none_auth(self): + pub = CommunityMqttPublisher() + _, public_key = _make_test_keys() + settings = _make_community_settings( + community_mqtt_broker_host="meshrank.net", + community_mqtt_broker_port=8883, + community_mqtt_transport="tcp", + community_mqtt_auth_mode="none", + ) + + with ( + patch("app.keystore.get_private_key", return_value=None), + patch("app.keystore.get_public_key", return_value=public_key), + patch("app.radio.radio_manager") as mock_radio, + ): + mock_radio.meshcore = None + kwargs = pub._build_client_kwargs(settings) + + assert kwargs["hostname"] == "meshrank.net" + assert kwargs["port"] == 8883 + assert kwargs["transport"] == "tcp" + assert "username" not in kwargs + assert "password" not in kwargs @pytest.mark.asyncio async def test_on_connected_async_publishes_online_status(self): @@ -579,6 +716,42 @@ class TestLwtAndStatusPublish: assert payload["origin"] == "MeshCore Device" +class TestCommunityPacketPublishTopic: + @pytest.mark.asyncio + async def test_publish_community_packet_uses_configured_topic_template(self): + publisher = MagicMock() + publisher.publish = AsyncMock() + publisher.connected = True + publisher._settings = object() + + data = { + "id": 1, + "observation_id": 1, + "timestamp": 1700000000, + "data": "0100", + "payload_type": "GROUP_TEXT", + "snr": None, + "rssi": None, + "decrypted": False, + "decrypted_info": None, + } + config = {"iata": "lax", "topic_template": "mesh2mqtt/{IATA}/node/{PUBLIC_KEY}"} + + mock_radio = MagicMock() + mock_radio.meshcore = MagicMock() + mock_radio.meshcore.self_info = {"name": "Node"} + + with ( + patch("app.keystore.get_public_key", return_value=bytes.fromhex("AA" * 32)), + patch("app.radio.radio_manager", mock_radio), + ): + await _publish_community_packet(publisher, config, data) + + publisher.publish.assert_awaited_once() + topic = publisher.publish.await_args.args[0] + assert topic == f"mesh2mqtt/LAX/node/{'AA' * 32}" + + def _mock_radio_operation(mc_mock): """Create a mock async context manager for radio_operation.""" diff --git a/tests/test_fanout_hitlist.py b/tests/test_fanout_hitlist.py index cce6b27..4cee09a 100644 --- a/tests/test_fanout_hitlist.py +++ b/tests/test_fanout_hitlist.py @@ -656,3 +656,67 @@ class TestCommunityMqttIataValidation: with pytest.raises(HTTPException): _validate_mqtt_community_config({"iata": "pdx1"}) + + def test_topic_template_defaults_when_missing(self): + from app.routers.fanout import _validate_mqtt_community_config + + config = {"iata": "PDX"} + _validate_mqtt_community_config(config) + assert config["broker_host"] == "mqtt-us-v1.letsmesh.net" + assert config["broker_port"] == 443 + assert config["transport"] == "websockets" + assert config["use_tls"] is True + assert config["tls_verify"] is True + assert config["auth_mode"] == "token" + assert config["username"] == "" + assert config["password"] == "" + assert config["token_audience"] == "" + assert config["topic_template"] == "meshcore/{IATA}/{PUBLIC_KEY}/packets" + + def test_topic_template_normalizes_placeholder_capitalization(self): + from app.routers.fanout import _validate_mqtt_community_config + + config = {"iata": "PDX", "topic_template": "mesh2mqtt/{iata}/node/{Public_Key}"} + _validate_mqtt_community_config(config) + assert config["topic_template"] == "mesh2mqtt/{IATA}/node/{PUBLIC_KEY}" + + def test_topic_template_rejects_unknown_placeholder(self): + from app.routers.fanout import _validate_mqtt_community_config + + with pytest.raises(HTTPException) as exc_info: + _validate_mqtt_community_config( + {"iata": "PDX", "topic_template": "meshcore/{foo}/{PUBLIC_KEY}/packets"} + ) + assert exc_info.value.status_code == 400 + assert "topic_template" in exc_info.value.detail + + def test_transport_rejects_unknown_value(self): + from app.routers.fanout import _validate_mqtt_community_config + + with pytest.raises(HTTPException) as exc_info: + _validate_mqtt_community_config({"iata": "PDX", "transport": "udp"}) + assert exc_info.value.status_code == 400 + assert "transport" in exc_info.value.detail + + def test_blank_token_audience_is_preserved(self): + from app.routers.fanout import _validate_mqtt_community_config + + config = {"iata": "PDX", "token_audience": " "} + _validate_mqtt_community_config(config) + assert config["token_audience"] == "" + + def test_auth_mode_rejects_unknown_value(self): + from app.routers.fanout import _validate_mqtt_community_config + + with pytest.raises(HTTPException) as exc_info: + _validate_mqtt_community_config({"iata": "PDX", "auth_mode": "jwt"}) + assert exc_info.value.status_code == 400 + assert "auth_mode" in exc_info.value.detail + + def test_password_auth_requires_credentials(self): + from app.routers.fanout import _validate_mqtt_community_config + + with pytest.raises(HTTPException) as exc_info: + _validate_mqtt_community_config({"iata": "PDX", "auth_mode": "password"}) + assert exc_info.value.status_code == 400 + assert "username and password" in exc_info.value.detail