Add better coverage for alternative community MQTTs. Closes #39

This commit is contained in:
Jack Kingsman
2026-03-06 18:14:28 -08:00
parent b5e2a4c269
commit f60c656566
7 changed files with 837 additions and 42 deletions

View File

@@ -52,8 +52,15 @@ class CommunityMqttSettings(Protocol):
community_mqtt_enabled: bool
community_mqtt_broker_host: str
community_mqtt_broker_port: int
community_mqtt_transport: str
community_mqtt_use_tls: bool
community_mqtt_tls_verify: bool
community_mqtt_auth_mode: str
community_mqtt_username: str
community_mqtt_password: str
community_mqtt_iata: str
community_mqtt_email: str
community_mqtt_token_audience: str
def _base64url_encode(data: bytes) -> str:
@@ -327,11 +334,18 @@ class CommunityMqttPublisher(BaseMqttPublisher):
await super().start(settings)
def _on_not_configured(self) -> None:
from app.keystore import has_private_key
from app.keystore import get_public_key, has_private_key
from app.websocket import broadcast_error
s: CommunityMqttSettings | None = self._settings
if s and not has_private_key() and not self._key_unavailable_warned:
auth_mode = getattr(s, "community_mqtt_auth_mode", "token") if s else "token"
if (
s
and auth_mode == "token"
and get_public_key() is not None
and not has_private_key()
and not self._key_unavailable_warned
):
broadcast_error(
"Community MQTT unavailable",
"Radio firmware does not support private key export.",
@@ -340,10 +354,17 @@ class CommunityMqttPublisher(BaseMqttPublisher):
def _is_configured(self) -> bool:
"""Check if community MQTT is enabled and keys are available."""
from app.keystore import has_private_key
from app.keystore import get_public_key, has_private_key
s: CommunityMqttSettings | None = self._settings
return bool(s and s.community_mqtt_enabled and has_private_key())
if not s or not s.community_mqtt_enabled:
return False
if get_public_key() is None:
return False
auth_mode = getattr(s, "community_mqtt_auth_mode", "token")
if auth_mode == "token":
return has_private_key()
return True
def _build_client_kwargs(self, settings: object) -> dict[str, Any]:
s: CommunityMqttSettings = settings # type: ignore[assignment]
@@ -352,19 +373,23 @@ class CommunityMqttPublisher(BaseMqttPublisher):
private_key = get_private_key()
public_key = get_public_key()
assert private_key is not None and public_key is not None # guaranteed by _pre_connect
assert public_key is not None # guaranteed by _pre_connect
pubkey_hex = public_key.hex().upper()
broker_host = s.community_mqtt_broker_host or _DEFAULT_BROKER
broker_port = s.community_mqtt_broker_port or _DEFAULT_PORT
jwt_token = _generate_jwt_token(
private_key,
public_key,
audience=broker_host,
email=s.community_mqtt_email or "",
)
transport = s.community_mqtt_transport or "websockets"
use_tls = bool(s.community_mqtt_use_tls)
tls_verify = bool(s.community_mqtt_tls_verify)
auth_mode = s.community_mqtt_auth_mode or "token"
secure_connection = use_tls and tls_verify
tls_context = ssl.create_default_context()
tls_context: ssl.SSLContext | None = None
if use_tls:
tls_context = ssl.create_default_context()
if not tls_verify:
tls_context.check_hostname = False
tls_context.verify_mode = ssl.CERT_NONE
device_name = ""
if radio_manager.meshcore and radio_manager.meshcore.self_info:
@@ -380,16 +405,30 @@ class CommunityMqttPublisher(BaseMqttPublisher):
}
)
return {
kwargs: dict[str, Any] = {
"hostname": broker_host,
"port": broker_port,
"transport": "websockets",
"transport": transport,
"tls_context": tls_context,
"websocket_path": "/",
"username": f"v1_{pubkey_hex}",
"password": jwt_token,
"will": aiomqtt.Will(status_topic, offline_payload, retain=True),
}
if auth_mode == "token":
assert private_key is not None
token_audience = (s.community_mqtt_token_audience or "").strip() or broker_host
jwt_token = _generate_jwt_token(
private_key,
public_key,
audience=token_audience,
email=(s.community_mqtt_email or "") if secure_connection else "",
)
kwargs["username"] = f"v1_{pubkey_hex}"
kwargs["password"] = jwt_token
elif auth_mode == "password":
kwargs["username"] = s.community_mqtt_username or None
kwargs["password"] = s.community_mqtt_password or None
if transport == "websockets":
kwargs["websocket_path"] = "/"
return kwargs
def _on_connected(self, settings: object) -> tuple[str, str]:
s: CommunityMqttSettings = settings # type: ignore[assignment]
@@ -543,7 +582,9 @@ class CommunityMqttPublisher(BaseMqttPublisher):
if not self.connected:
logger.info("Community MQTT publish failure detected, reconnecting")
return True
if elapsed >= _TOKEN_RENEWAL_THRESHOLD:
s: CommunityMqttSettings | None = self._settings
auth_mode = getattr(s, "community_mqtt_auth_mode", "token") if s else "token"
if auth_mode == "token" and elapsed >= _TOKEN_RENEWAL_THRESHOLD:
logger.info("Community MQTT JWT nearing expiry, reconnecting")
return True
return False
@@ -551,9 +592,11 @@ class CommunityMqttPublisher(BaseMqttPublisher):
async def _pre_connect(self, settings: object) -> bool:
from app.keystore import get_private_key, get_public_key
s: CommunityMqttSettings = settings # type: ignore[assignment]
auth_mode = s.community_mqtt_auth_mode or "token"
private_key = get_private_key()
public_key = get_public_key()
if private_key is None or public_key is None:
if public_key is None or (auth_mode == "token" and private_key is None):
# Keys not available yet, wait for settings change or key export
self.connected = False
self._version_event.clear()

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import logging
import re
import string
from types import SimpleNamespace
from typing import Any
@@ -13,6 +14,37 @@ from app.fanout.community_mqtt import CommunityMqttPublisher, _format_raw_packet
logger = logging.getLogger(__name__)
_IATA_RE = re.compile(r"^[A-Z]{3}$")
_DEFAULT_PACKET_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets"
_TOPIC_TEMPLATE_FIELD_CANONICAL = {
"iata": "IATA",
"public_key": "PUBLIC_KEY",
}
def _normalize_topic_template(topic_template: str) -> str:
"""Normalize packet topic template fields to canonical uppercase placeholders."""
template = topic_template.strip() or _DEFAULT_PACKET_TOPIC_TEMPLATE
parts: list[str] = []
try:
parsed = string.Formatter().parse(template)
for literal_text, field_name, format_spec, conversion in parsed:
parts.append(literal_text)
if field_name is None:
continue
normalized_field = _TOPIC_TEMPLATE_FIELD_CANONICAL.get(field_name.lower())
if normalized_field is None:
raise ValueError(f"Unsupported topic template field(s): {field_name}")
replacement = ["{", normalized_field]
if conversion:
replacement.extend(["!", conversion])
if format_spec:
replacement.extend([":", format_spec])
replacement.append("}")
parts.append("".join(replacement))
except ValueError:
raise
return "".join(parts)
def _config_to_settings(config: dict) -> SimpleNamespace:
@@ -21,11 +53,24 @@ def _config_to_settings(config: dict) -> SimpleNamespace:
community_mqtt_enabled=True,
community_mqtt_broker_host=config.get("broker_host", "mqtt-us-v1.letsmesh.net"),
community_mqtt_broker_port=config.get("broker_port", 443),
community_mqtt_transport=config.get("transport", "websockets"),
community_mqtt_use_tls=config.get("use_tls", True),
community_mqtt_tls_verify=config.get("tls_verify", True),
community_mqtt_auth_mode=config.get("auth_mode", "token"),
community_mqtt_username=config.get("username", ""),
community_mqtt_password=config.get("password", ""),
community_mqtt_iata=config.get("iata", ""),
community_mqtt_email=config.get("email", ""),
community_mqtt_token_audience=config.get("token_audience", ""),
)
def _render_packet_topic(topic_template: str, *, iata: str, public_key: str) -> str:
"""Render the configured raw-packet publish topic."""
template = _normalize_topic_template(topic_template)
return template.format(IATA=iata, PUBLIC_KEY=public_key)
class MqttCommunityModule(FanoutModule):
"""Wraps a CommunityMqttPublisher for community packet sharing."""
@@ -81,7 +126,11 @@ async def _publish_community_packet(
if not _IATA_RE.fullmatch(iata):
logger.debug("Community MQTT: skipping publish — no valid IATA code configured")
return
topic = f"meshcore/{iata}/{pubkey_hex}/packets"
topic = _render_packet_topic(
str(config.get("topic_template", _DEFAULT_PACKET_TOPIC_TEMPLATE)),
iata=iata,
public_key=pubkey_hex,
)
await publisher.publish(topic, packet)

View File

@@ -2,6 +2,7 @@
import logging
import re
import string
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
@@ -15,6 +16,48 @@ router = APIRouter(prefix="/fanout", tags=["fanout"])
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise"}
_IATA_RE = re.compile(r"^[A-Z]{3}$")
_DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets"
_DEFAULT_COMMUNITY_MQTT_BROKER_HOST = "mqtt-us-v1.letsmesh.net"
_DEFAULT_COMMUNITY_MQTT_BROKER_PORT = 443
_DEFAULT_COMMUNITY_MQTT_TRANSPORT = "websockets"
_DEFAULT_COMMUNITY_MQTT_AUTH_MODE = "token"
_COMMUNITY_MQTT_TEMPLATE_FIELD_CANONICAL = {
"iata": "IATA",
"public_key": "PUBLIC_KEY",
}
_ALLOWED_COMMUNITY_MQTT_TRANSPORTS = {"tcp", "websockets"}
_ALLOWED_COMMUNITY_MQTT_AUTH_MODES = {"token", "password", "none"}
def _normalize_community_topic_template(topic_template: str) -> str:
"""Normalize Community MQTT topic template placeholders to canonical uppercase form."""
template = topic_template.strip() or _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE
parts: list[str] = []
try:
parsed = string.Formatter().parse(template)
for literal_text, field_name, format_spec, conversion in parsed:
parts.append(literal_text)
if field_name is None:
continue
normalized_field = _COMMUNITY_MQTT_TEMPLATE_FIELD_CANONICAL.get(field_name.lower())
if normalized_field is None:
raise HTTPException(
status_code=400,
detail=(
f"topic_template may only use {{IATA}} and {{PUBLIC_KEY}}; got {field_name}"
),
)
replacement = ["{", normalized_field]
if conversion:
replacement.extend(["!", conversion])
if format_spec:
replacement.extend([":", format_spec])
replacement.append("}")
parts.append("".join(replacement))
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Invalid topic_template: {exc}") from None
return "".join(parts)
class FanoutConfigCreate(BaseModel):
@@ -43,6 +86,46 @@ def _validate_mqtt_private_config(config: dict) -> None:
def _validate_mqtt_community_config(config: dict) -> None:
"""Validate mqtt_community config blob. Normalizes IATA to uppercase."""
broker_host = str(config.get("broker_host", _DEFAULT_COMMUNITY_MQTT_BROKER_HOST)).strip()
if not broker_host:
broker_host = _DEFAULT_COMMUNITY_MQTT_BROKER_HOST
config["broker_host"] = broker_host
port = config.get("broker_port", _DEFAULT_COMMUNITY_MQTT_BROKER_PORT)
if not isinstance(port, int) or port < 1 or port > 65535:
raise HTTPException(status_code=400, detail="broker_port must be between 1 and 65535")
config["broker_port"] = port
transport = str(config.get("transport", _DEFAULT_COMMUNITY_MQTT_TRANSPORT)).strip().lower()
if transport not in _ALLOWED_COMMUNITY_MQTT_TRANSPORTS:
raise HTTPException(
status_code=400,
detail="transport must be 'websockets' or 'tcp'",
)
config["transport"] = transport
config["use_tls"] = bool(config.get("use_tls", True))
config["tls_verify"] = bool(config.get("tls_verify", True))
auth_mode = str(config.get("auth_mode", _DEFAULT_COMMUNITY_MQTT_AUTH_MODE)).strip().lower()
if auth_mode not in _ALLOWED_COMMUNITY_MQTT_AUTH_MODES:
raise HTTPException(
status_code=400,
detail="auth_mode must be 'token', 'password', or 'none'",
)
config["auth_mode"] = auth_mode
username = str(config.get("username", "")).strip()
password = str(config.get("password", "")).strip()
if auth_mode == "password" and (not username or not password):
raise HTTPException(
status_code=400,
detail="username and password are required when auth_mode is 'password'",
)
config["username"] = username
config["password"] = password
token_audience = str(config.get("token_audience", "")).strip()
config["token_audience"] = token_audience
iata = config.get("iata", "").upper().strip()
if not iata or not _IATA_RE.fullmatch(iata):
raise HTTPException(
@@ -51,6 +134,14 @@ def _validate_mqtt_community_config(config: dict) -> None:
)
config["iata"] = iata
topic_template = str(
config.get("topic_template", _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE)
).strip()
if not topic_template:
topic_template = _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE
config["topic_template"] = _normalize_community_topic_template(topic_template)
def _validate_bot_config(config: dict) -> None:
"""Validate bot config blob (syntax-check the code)."""

View File

@@ -14,7 +14,7 @@ const BotCodeEditor = lazy(() =>
const TYPE_LABELS: Record<string, string> = {
mqtt_private: 'Private MQTT',
mqtt_community: 'Community MQTT',
mqtt_community: 'Community MQTT/mesh2mqtt',
bot: 'Bot',
webhook: 'Webhook',
apprise: 'Apprise',
@@ -22,12 +22,32 @@ const TYPE_LABELS: Record<string, string> = {
const TYPE_OPTIONS = [
{ value: 'mqtt_private', label: 'Private MQTT' },
{ value: 'mqtt_community', label: 'Community MQTT' },
{ value: 'mqtt_community', label: 'Community MQTT/mesh2mqtt' },
{ value: 'bot', label: 'Bot' },
{ value: 'webhook', label: 'Webhook' },
{ value: 'apprise', label: 'Apprise' },
];
const DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE = 'meshcore/{IATA}/{PUBLIC_KEY}/packets';
const DEFAULT_COMMUNITY_BROKER_HOST = 'mqtt-us-v1.letsmesh.net';
const DEFAULT_COMMUNITY_BROKER_PORT = 443;
const DEFAULT_COMMUNITY_TRANSPORT = 'websockets';
const DEFAULT_COMMUNITY_AUTH_MODE = 'token';
function formatBrokerSummary(
config: Record<string, unknown>,
defaults: { host: string; port: number }
) {
const host = (config.broker_host as string) || defaults.host;
const port = typeof config.broker_port === 'number' ? config.broker_port : defaults.port;
return `${host}:${port}`;
}
function formatPrivateTopicSummary(config: Record<string, unknown>) {
const prefix = (config.topic_prefix as string) || 'meshcore';
return `${prefix}/dm:<pubkey>, ${prefix}/gm:<channel>, ${prefix}/raw/...`;
}
const DEFAULT_BOT_CODE = `def bot(
sender_name: str | None,
sender_key: str | None,
@@ -75,10 +95,11 @@ function getStatusLabel(status: string | undefined, type?: string) {
}
function getStatusColor(status: string | undefined, enabled?: boolean) {
if (enabled === false) return 'bg-warning shadow-[0_0_6px_hsl(var(--warning)/0.5)]';
if (enabled === false) return 'bg-muted-foreground';
if (status === 'connected')
return 'bg-status-connected shadow-[0_0_6px_hsl(var(--status-connected)/0.5)]';
if (status === 'error') return 'bg-destructive shadow-[0_0_6px_hsl(var(--destructive)/0.5)]';
if (status === 'error' || status === 'disconnected')
return 'bg-destructive shadow-[0_0_6px_hsl(var(--destructive)/0.5)]';
return 'bg-muted-foreground';
}
@@ -201,6 +222,8 @@ function MqttCommunityConfigEditor({
config: Record<string, unknown>;
onChange: (config: Record<string, unknown>) => void;
}) {
const authMode = (config.auth_mode as string) || DEFAULT_COMMUNITY_AUTH_MODE;
return (
<div className="space-y-3">
<p className="text-xs text-muted-foreground">
@@ -214,8 +237,8 @@ function MqttCommunityConfigEditor({
<Input
id="fanout-comm-host"
type="text"
placeholder="mqtt-us-v1.letsmesh.net"
value={(config.broker_host as string) || 'mqtt-us-v1.letsmesh.net'}
placeholder={DEFAULT_COMMUNITY_BROKER_HOST}
value={(config.broker_host as string) || DEFAULT_COMMUNITY_BROKER_HOST}
onChange={(e) => onChange({ ...config, broker_host: e.target.value })}
/>
</div>
@@ -226,14 +249,124 @@ function MqttCommunityConfigEditor({
type="number"
min="1"
max="65535"
value={(config.broker_port as number) || 443}
value={(config.broker_port as number) || DEFAULT_COMMUNITY_BROKER_PORT}
onChange={(e) =>
onChange({ ...config, broker_port: parseInt(e.target.value, 10) || 443 })
onChange({
...config,
broker_port: parseInt(e.target.value, 10) || DEFAULT_COMMUNITY_BROKER_PORT,
})
}
/>
</div>
</div>
<div className="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div className="space-y-2">
<Label htmlFor="fanout-comm-transport">Transport</Label>
<select
id="fanout-comm-transport"
value={(config.transport as string) || DEFAULT_COMMUNITY_TRANSPORT}
onChange={(e) => onChange({ ...config, transport: e.target.value })}
className="w-full rounded-md border border-input bg-background px-3 py-2 text-sm"
>
<option value="websockets">WebSockets</option>
<option value="tcp">TCP</option>
</select>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-comm-auth-mode">Authentication</Label>
<select
id="fanout-comm-auth-mode"
value={authMode}
onChange={(e) => onChange({ ...config, auth_mode: e.target.value })}
className="w-full rounded-md border border-input bg-background px-3 py-2 text-sm"
>
<option value="token">Token</option>
<option value="none">None</option>
<option value="password">Username / Password</option>
</select>
</div>
</div>
<p className="text-xs text-muted-foreground">
LetsMesh uses <code>token</code> auth. MeshRank uses <code>none</code>.
</p>
{authMode === 'token' && (
<div className="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div className="space-y-2">
<Label htmlFor="fanout-comm-token-audience">Token Audience</Label>
<Input
id="fanout-comm-token-audience"
type="text"
placeholder={(config.broker_host as string) || DEFAULT_COMMUNITY_BROKER_HOST}
value={(config.token_audience as string | undefined) ?? ''}
onChange={(e) => onChange({ ...config, token_audience: e.target.value })}
/>
<p className="text-xs text-muted-foreground">Defaults to the broker host when blank</p>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-comm-email">Owner Email (optional)</Label>
<Input
id="fanout-comm-email"
type="email"
placeholder="you@example.com"
value={(config.email as string) || ''}
onChange={(e) => onChange({ ...config, email: e.target.value })}
/>
<p className="text-xs text-muted-foreground">
Used to claim your node on the community aggregator
</p>
</div>
</div>
)}
{authMode === 'password' && (
<div className="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div className="space-y-2">
<Label htmlFor="fanout-comm-username">Username</Label>
<Input
id="fanout-comm-username"
type="text"
value={(config.username as string) || ''}
onChange={(e) => onChange({ ...config, username: e.target.value })}
/>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-comm-password">Password</Label>
<Input
id="fanout-comm-password"
type="password"
value={(config.password as string) || ''}
onChange={(e) => onChange({ ...config, password: e.target.value })}
/>
</div>
</div>
)}
<div className="space-y-2">
<label className="flex items-center gap-3 cursor-pointer">
<input
type="checkbox"
checked={config.use_tls === undefined ? true : !!config.use_tls}
onChange={(e) => onChange({ ...config, use_tls: e.target.checked })}
className="h-4 w-4 rounded border-border"
/>
<span className="text-sm">Use TLS</span>
</label>
<label className="flex items-center gap-3 cursor-pointer ml-7">
<input
type="checkbox"
checked={config.tls_verify === undefined ? true : !!config.tls_verify}
onChange={(e) => onChange({ ...config, tls_verify: e.target.checked })}
className="h-4 w-4 rounded border-border"
disabled={config.use_tls === undefined ? false : !config.use_tls}
/>
<span className="text-sm">Verify TLS certificates</span>
</label>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-comm-iata">Region Code (IATA)</Label>
<Input
@@ -251,16 +384,16 @@ function MqttCommunityConfigEditor({
</div>
<div className="space-y-2">
<Label htmlFor="fanout-comm-email">Owner Email (optional)</Label>
<Label htmlFor="fanout-comm-topic-template">Packet Topic Template</Label>
<Input
id="fanout-comm-email"
type="email"
placeholder="you@example.com"
value={(config.email as string) || ''}
onChange={(e) => onChange({ ...config, email: e.target.value })}
id="fanout-comm-topic-template"
type="text"
value={(config.topic_template as string) || DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE}
onChange={(e) => onChange({ ...config, topic_template: e.target.value })}
/>
<p className="text-xs text-muted-foreground">
Used to claim your node on the community aggregator
Use <code>{'{IATA}'}</code> and <code>{'{PUBLIC_KEY}'}</code>. Default:{' '}
<code>{DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE}</code>
</p>
</div>
</div>
@@ -943,9 +1076,17 @@ export function SettingsFanoutSection({
},
mqtt_community: {
broker_host: 'mqtt-us-v1.letsmesh.net',
broker_port: 443,
broker_port: DEFAULT_COMMUNITY_BROKER_PORT,
transport: DEFAULT_COMMUNITY_TRANSPORT,
use_tls: true,
tls_verify: true,
auth_mode: DEFAULT_COMMUNITY_AUTH_MODE,
username: '',
password: '',
iata: '',
email: '',
token_audience: '',
topic_template: DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE,
},
bot: {
code: DEFAULT_BOT_CODE,
@@ -1093,7 +1234,7 @@ export function SettingsFanoutSection({
)}
<div className="flex flex-wrap items-center gap-2">
<span className="text-sm text-muted-foreground">Add:</span>
<span className="text-sm text-muted-foreground">Add a new entry:</span>
{TYPE_OPTIONS.filter((opt) => opt.value !== 'bot' || !health?.bots_disabled).map((opt) => (
<Button
key={opt.value}
@@ -1111,6 +1252,7 @@ export function SettingsFanoutSection({
{configs.map((cfg) => {
const statusEntry = health?.fanout_statuses?.[cfg.id];
const status = cfg.enabled ? statusEntry?.status : undefined;
const communityConfig = cfg.config as Record<string, unknown>;
return (
<div key={cfg.id} className="border border-input rounded-md overflow-hidden">
<div className="flex items-center gap-2 px-3 py-2 bg-muted/50">
@@ -1155,6 +1297,43 @@ export function SettingsFanoutSection({
Edit
</Button>
</div>
{cfg.type === 'mqtt_community' && (
<div className="space-y-1 border-t border-input px-3 py-2 text-xs text-muted-foreground">
<div>
Broker:{' '}
{formatBrokerSummary(communityConfig, {
host: DEFAULT_COMMUNITY_BROKER_HOST,
port: DEFAULT_COMMUNITY_BROKER_PORT,
})}
</div>
<div className="break-all">
Topic:{' '}
<code>
{(communityConfig.topic_template as string) ||
DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE}
</code>
</div>
</div>
)}
{cfg.type === 'mqtt_private' && (
<div className="space-y-1 border-t border-input px-3 py-2 text-xs text-muted-foreground">
<div>
Broker:{' '}
{formatBrokerSummary(cfg.config as Record<string, unknown>, {
host: '',
port: 1883,
})}
</div>
<div className="break-all">
Topics:{' '}
<code>
{formatPrivateTopicSummary(cfg.config as Record<string, unknown>)}
</code>
</div>
</div>
)}
</div>
);
})}

View File

@@ -66,12 +66,20 @@ describe('SettingsFanoutSection', () => {
renderSection();
await waitFor(() => {
expect(screen.getByRole('button', { name: 'Private MQTT' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Community MQTT/mesh2mqtt' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Webhook' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Apprise' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Bot' })).toBeInTheDocument();
});
});
it('shows updated add label phrasing', async () => {
renderSection();
await waitFor(() => {
expect(screen.getByText('Add a new entry:')).toBeInTheDocument();
});
});
it('hides bot add button when bots_disabled', async () => {
renderSection({ health: { ...baseHealth, bots_disabled: true } });
await waitFor(() => {
@@ -265,4 +273,192 @@ describe('SettingsFanoutSection', () => {
expect(screen.getByLabelText(/URL/)).toBeInTheDocument();
});
});
it('community MQTT editor exposes packet topic template', async () => {
const communityConfig: FanoutConfig = {
id: 'comm-1',
type: 'mqtt_community',
name: 'Community MQTT/mesh2mqtt',
enabled: false,
config: {
broker_host: 'mqtt-us-v1.letsmesh.net',
broker_port: 443,
transport: 'tcp',
use_tls: true,
tls_verify: true,
auth_mode: 'token',
iata: 'LAX',
email: '',
token_audience: 'meshrank.net',
topic_template: 'mesh2mqtt/{IATA}/node/{PUBLIC_KEY}',
},
scope: { messages: 'none', raw_packets: 'all' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]);
renderSection();
await waitFor(() => expect(screen.getByText('Community MQTT/mesh2mqtt')).toBeInTheDocument());
fireEvent.click(screen.getByRole('button', { name: 'Edit' }));
await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument());
expect(screen.getByLabelText('Packet Topic Template')).toHaveValue(
'mesh2mqtt/{IATA}/node/{PUBLIC_KEY}'
);
expect(screen.getByLabelText('Transport')).toHaveValue('tcp');
expect(screen.getByLabelText('Authentication')).toHaveValue('token');
expect(screen.getByLabelText('Token Audience')).toHaveValue('meshrank.net');
expect(screen.getByText(/LetsMesh uses/)).toBeInTheDocument();
});
it('existing community MQTT config without auth_mode defaults to token in the editor', async () => {
const communityConfig: FanoutConfig = {
id: 'comm-legacy',
type: 'mqtt_community',
name: 'Legacy Community MQTT',
enabled: false,
config: {
broker_host: 'mqtt-us-v1.letsmesh.net',
broker_port: 443,
transport: 'websockets',
use_tls: true,
tls_verify: true,
iata: 'LAX',
email: 'user@example.com',
token_audience: '',
topic_template: 'meshcore/{IATA}/{PUBLIC_KEY}/packets',
},
scope: { messages: 'none', raw_packets: 'all' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]);
renderSection();
await waitFor(() => expect(screen.getByText('Legacy Community MQTT')).toBeInTheDocument());
fireEvent.click(screen.getByRole('button', { name: 'Edit' }));
await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument());
expect(screen.getByLabelText('Authentication')).toHaveValue('token');
expect(screen.getByLabelText('Token Audience')).toBeInTheDocument();
});
it('community MQTT token audience can be cleared back to blank', async () => {
const communityConfig: FanoutConfig = {
id: 'comm-1',
type: 'mqtt_community',
name: 'Community MQTT/mesh2mqtt',
enabled: false,
config: {
broker_host: 'mqtt-us-v1.letsmesh.net',
broker_port: 443,
transport: 'websockets',
use_tls: true,
tls_verify: true,
auth_mode: 'token',
iata: 'LAX',
email: '',
token_audience: 'meshrank.net',
topic_template: 'meshcore/{IATA}/{PUBLIC_KEY}/packets',
},
scope: { messages: 'none', raw_packets: 'all' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]);
renderSection();
await waitFor(() => expect(screen.getByText('Community MQTT/mesh2mqtt')).toBeInTheDocument());
fireEvent.click(screen.getByRole('button', { name: 'Edit' }));
await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument());
const audienceInput = screen.getByLabelText('Token Audience');
fireEvent.change(audienceInput, { target: { value: '' } });
expect(audienceInput).toHaveValue('');
});
it('community MQTT can be configured for no auth', async () => {
const communityConfig: FanoutConfig = {
id: 'comm-1',
type: 'mqtt_community',
name: 'Community MQTT/mesh2mqtt',
enabled: false,
config: {
broker_host: 'meshrank.net',
broker_port: 8883,
transport: 'tcp',
use_tls: true,
tls_verify: true,
auth_mode: 'none',
iata: 'LAX',
topic_template: 'meshrank/uplink/ROOM/{PUBLIC_KEY}/packets',
},
scope: { messages: 'none', raw_packets: 'all' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]);
renderSection();
await waitFor(() => expect(screen.getByText('Community MQTT/mesh2mqtt')).toBeInTheDocument());
fireEvent.click(screen.getByRole('button', { name: 'Edit' }));
await waitFor(() => expect(screen.getByText('← Back to list')).toBeInTheDocument());
expect(screen.getByLabelText('Authentication')).toHaveValue('none');
expect(screen.queryByLabelText('Token Audience')).not.toBeInTheDocument();
});
it('community MQTT list shows configured packet topic', async () => {
const communityConfig: FanoutConfig = {
id: 'comm-1',
type: 'mqtt_community',
name: 'Community MQTT/mesh2mqtt',
enabled: false,
config: {
broker_host: 'mqtt-us-v1.letsmesh.net',
broker_port: 443,
transport: 'websockets',
use_tls: true,
tls_verify: true,
auth_mode: 'token',
iata: 'LAX',
email: '',
token_audience: 'mqtt-us-v1.letsmesh.net',
topic_template: 'mesh2mqtt/{IATA}/node/{PUBLIC_KEY}',
},
scope: { messages: 'none', raw_packets: 'all' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([communityConfig]);
renderSection();
await waitFor(() =>
expect(screen.getByText('Broker: mqtt-us-v1.letsmesh.net:443')).toBeInTheDocument()
);
expect(screen.getByText('mesh2mqtt/{IATA}/node/{PUBLIC_KEY}')).toBeInTheDocument();
expect(screen.queryByText('Region: LAX')).not.toBeInTheDocument();
});
it('private MQTT list shows broker and topic summary', async () => {
const privateConfig: FanoutConfig = {
id: 'mqtt-1',
type: 'mqtt_private',
name: 'Private MQTT',
enabled: true,
config: { broker_host: 'broker.local', broker_port: 1883, topic_prefix: 'meshcore' },
scope: { messages: 'all', raw_packets: 'all' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([privateConfig]);
renderSection();
await waitFor(() => expect(screen.getByText('Broker: broker.local:1883')).toBeInTheDocument());
expect(
screen.getByText('meshcore/dm:<pubkey>, meshcore/gm:<channel>, meshcore/raw/...')
).toBeInTheDocument();
});
});

View File

@@ -23,6 +23,11 @@ from app.fanout.community_mqtt import (
_generate_jwt_token,
_get_client_version,
)
from app.fanout.mqtt_community import (
_config_to_settings,
_publish_community_packet,
_render_packet_topic,
)
def _make_test_keys() -> tuple[bytes, bytes]:
@@ -55,8 +60,15 @@ def _make_community_settings(**overrides) -> SimpleNamespace:
"community_mqtt_enabled": True,
"community_mqtt_broker_host": "mqtt-us-v1.letsmesh.net",
"community_mqtt_broker_port": 443,
"community_mqtt_transport": "websockets",
"community_mqtt_use_tls": True,
"community_mqtt_tls_verify": True,
"community_mqtt_auth_mode": "token",
"community_mqtt_username": "",
"community_mqtt_password": "",
"community_mqtt_iata": "",
"community_mqtt_email": "",
"community_mqtt_token_audience": "mqtt-us-v1.letsmesh.net",
}
defaults.update(overrides)
return SimpleNamespace(**defaults)
@@ -390,19 +402,37 @@ class TestCommunityMqttPublisher:
def test_is_configured_false_when_disabled(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=False)
with patch("app.keystore.has_private_key", return_value=True):
with patch("app.keystore.get_public_key", return_value=b"x" * 32):
assert pub._is_configured() is False
def test_is_configured_false_when_no_private_key(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=True)
with patch("app.keystore.has_private_key", return_value=False):
pub._settings = SimpleNamespace(
community_mqtt_enabled=True, community_mqtt_auth_mode="token"
)
with (
patch("app.keystore.get_public_key", return_value=b"x" * 32),
patch("app.keystore.has_private_key", return_value=False),
):
assert pub._is_configured() is False
def test_is_configured_true_when_enabled_with_key(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(community_mqtt_enabled=True)
with patch("app.keystore.has_private_key", return_value=True):
pub._settings = SimpleNamespace(
community_mqtt_enabled=True, community_mqtt_auth_mode="token"
)
with (
patch("app.keystore.get_public_key", return_value=b"x" * 32),
patch("app.keystore.has_private_key", return_value=True),
):
assert pub._is_configured() is True
def test_is_configured_true_for_none_auth_without_private_key(self):
pub = CommunityMqttPublisher()
pub._settings = SimpleNamespace(
community_mqtt_enabled=True, community_mqtt_auth_mode="none"
)
with patch("app.keystore.get_public_key", return_value=b"x" * 32):
assert pub._is_configured() is True
@@ -431,6 +461,49 @@ class TestBuildStatusTopic:
assert topic == "meshcore/LAX/PUBKEY/status"
class TestRenderPacketTopic:
def test_uses_default_template(self):
topic = _render_packet_topic(
"meshcore/{IATA}/{PUBLIC_KEY}/packets",
iata="LAX",
public_key="AABB1122",
)
assert topic == "meshcore/LAX/AABB1122/packets"
def test_supports_custom_template(self):
topic = _render_packet_topic(
"mesh2mqtt/{IATA}/node/{PUBLIC_KEY}",
iata="PDX",
public_key="PUBKEY",
)
assert topic == "mesh2mqtt/PDX/node/PUBKEY"
def test_accepts_mixed_case_placeholders(self):
topic = _render_packet_topic(
"mesh2mqtt/{iata}/node/{Public_Key}",
iata="SEA",
public_key="PUBKEY",
)
assert topic == "mesh2mqtt/SEA/node/PUBKEY"
class TestCommunityConfigDefaults:
def test_existing_config_without_new_fields_gets_letsmesh_defaults(self):
settings = _config_to_settings({"iata": "LAX", "email": "user@example.com"})
assert settings.community_mqtt_broker_host == "mqtt-us-v1.letsmesh.net"
assert settings.community_mqtt_broker_port == 443
assert settings.community_mqtt_transport == "websockets"
assert settings.community_mqtt_use_tls is True
assert settings.community_mqtt_tls_verify is True
assert settings.community_mqtt_auth_mode == "token"
assert settings.community_mqtt_username == ""
assert settings.community_mqtt_password == ""
assert settings.community_mqtt_token_audience == ""
assert settings.community_mqtt_iata == "LAX"
assert settings.community_mqtt_email == "user@example.com"
class TestLwtAndStatusPublish:
def test_build_client_kwargs_includes_will(self):
"""_build_client_kwargs should return a will with offline status."""
@@ -460,6 +533,70 @@ class TestLwtAndStatusPublish:
assert payload["origin_id"] == pubkey_hex
assert "timestamp" in payload
assert "client" not in payload
assert kwargs["transport"] == "websockets"
assert kwargs["websocket_path"] == "/"
assert kwargs["tls_context"] is not None
assert kwargs["username"] == f"v1_{pubkey_hex}"
def test_build_client_kwargs_supports_tcp_transport_and_custom_audience(self):
pub = CommunityMqttPublisher()
private_key, public_key = _make_test_keys()
settings = _make_community_settings(
community_mqtt_broker_host="meshrank.net",
community_mqtt_broker_port=8883,
community_mqtt_transport="tcp",
community_mqtt_use_tls=True,
community_mqtt_tls_verify=True,
community_mqtt_token_audience="meshrank.net",
community_mqtt_email="user@example.com",
community_mqtt_iata="SFO",
)
with (
patch("app.keystore.get_private_key", return_value=private_key),
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager") as mock_radio,
):
mock_radio.meshcore = None
kwargs = pub._build_client_kwargs(settings)
assert kwargs["hostname"] == "meshrank.net"
assert kwargs["port"] == 8883
assert kwargs["transport"] == "tcp"
assert "websocket_path" not in kwargs
assert kwargs["tls_context"] is not None
payload_b64 = kwargs["password"].split(".")[1]
import base64
padded = payload_b64 + "=" * (4 - len(payload_b64) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
assert payload["aud"] == "meshrank.net"
assert payload["email"] == "user@example.com"
def test_build_client_kwargs_supports_none_auth(self):
pub = CommunityMqttPublisher()
_, public_key = _make_test_keys()
settings = _make_community_settings(
community_mqtt_broker_host="meshrank.net",
community_mqtt_broker_port=8883,
community_mqtt_transport="tcp",
community_mqtt_auth_mode="none",
)
with (
patch("app.keystore.get_private_key", return_value=None),
patch("app.keystore.get_public_key", return_value=public_key),
patch("app.radio.radio_manager") as mock_radio,
):
mock_radio.meshcore = None
kwargs = pub._build_client_kwargs(settings)
assert kwargs["hostname"] == "meshrank.net"
assert kwargs["port"] == 8883
assert kwargs["transport"] == "tcp"
assert "username" not in kwargs
assert "password" not in kwargs
@pytest.mark.asyncio
async def test_on_connected_async_publishes_online_status(self):
@@ -579,6 +716,42 @@ class TestLwtAndStatusPublish:
assert payload["origin"] == "MeshCore Device"
class TestCommunityPacketPublishTopic:
@pytest.mark.asyncio
async def test_publish_community_packet_uses_configured_topic_template(self):
publisher = MagicMock()
publisher.publish = AsyncMock()
publisher.connected = True
publisher._settings = object()
data = {
"id": 1,
"observation_id": 1,
"timestamp": 1700000000,
"data": "0100",
"payload_type": "GROUP_TEXT",
"snr": None,
"rssi": None,
"decrypted": False,
"decrypted_info": None,
}
config = {"iata": "lax", "topic_template": "mesh2mqtt/{IATA}/node/{PUBLIC_KEY}"}
mock_radio = MagicMock()
mock_radio.meshcore = MagicMock()
mock_radio.meshcore.self_info = {"name": "Node"}
with (
patch("app.keystore.get_public_key", return_value=bytes.fromhex("AA" * 32)),
patch("app.radio.radio_manager", mock_radio),
):
await _publish_community_packet(publisher, config, data)
publisher.publish.assert_awaited_once()
topic = publisher.publish.await_args.args[0]
assert topic == f"mesh2mqtt/LAX/node/{'AA' * 32}"
def _mock_radio_operation(mc_mock):
"""Create a mock async context manager for radio_operation."""

View File

@@ -656,3 +656,67 @@ class TestCommunityMqttIataValidation:
with pytest.raises(HTTPException):
_validate_mqtt_community_config({"iata": "pdx1"})
def test_topic_template_defaults_when_missing(self):
from app.routers.fanout import _validate_mqtt_community_config
config = {"iata": "PDX"}
_validate_mqtt_community_config(config)
assert config["broker_host"] == "mqtt-us-v1.letsmesh.net"
assert config["broker_port"] == 443
assert config["transport"] == "websockets"
assert config["use_tls"] is True
assert config["tls_verify"] is True
assert config["auth_mode"] == "token"
assert config["username"] == ""
assert config["password"] == ""
assert config["token_audience"] == ""
assert config["topic_template"] == "meshcore/{IATA}/{PUBLIC_KEY}/packets"
def test_topic_template_normalizes_placeholder_capitalization(self):
from app.routers.fanout import _validate_mqtt_community_config
config = {"iata": "PDX", "topic_template": "mesh2mqtt/{iata}/node/{Public_Key}"}
_validate_mqtt_community_config(config)
assert config["topic_template"] == "mesh2mqtt/{IATA}/node/{PUBLIC_KEY}"
def test_topic_template_rejects_unknown_placeholder(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException) as exc_info:
_validate_mqtt_community_config(
{"iata": "PDX", "topic_template": "meshcore/{foo}/{PUBLIC_KEY}/packets"}
)
assert exc_info.value.status_code == 400
assert "topic_template" in exc_info.value.detail
def test_transport_rejects_unknown_value(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException) as exc_info:
_validate_mqtt_community_config({"iata": "PDX", "transport": "udp"})
assert exc_info.value.status_code == 400
assert "transport" in exc_info.value.detail
def test_blank_token_audience_is_preserved(self):
from app.routers.fanout import _validate_mqtt_community_config
config = {"iata": "PDX", "token_audience": " "}
_validate_mqtt_community_config(config)
assert config["token_audience"] == ""
def test_auth_mode_rejects_unknown_value(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException) as exc_info:
_validate_mqtt_community_config({"iata": "PDX", "auth_mode": "jwt"})
assert exc_info.value.status_code == 400
assert "auth_mode" in exc_info.value.detail
def test_password_auth_requires_credentials(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException) as exc_info:
_validate_mqtt_community_config({"iata": "PDX", "auth_mode": "password"})
assert exc_info.value.status_code == 400
assert "username and password" in exc_info.value.detail