Fanout hitlist fixes: bugs, quality, tests, webhook HMAC signing

This commit is contained in:
Jack Kingsman
2026-03-05 22:27:24 -08:00
parent 7534f0cc54
commit cb4333df4f
13 changed files with 840 additions and 54 deletions

View File

@@ -88,8 +88,8 @@ def _send_sync(urls_raw: str, body: str, *, preserve_identity: bool) -> bool:
class AppriseModule(FanoutModule):
"""Sends push notifications via Apprise for incoming messages."""
def __init__(self, config_id: str, config: dict) -> None:
super().__init__(config_id, config)
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._last_error: str | None = None
async def on_message(self, data: dict) -> None:

View File

@@ -12,9 +12,10 @@ class FanoutModule:
Subclasses must override the ``status`` property.
"""
def __init__(self, config_id: str, config: dict) -> None:
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
self.config_id = config_id
self.config = config
self.name = name
async def start(self) -> None:
"""Start the module (e.g. connect to broker). Override for persistent connections."""

View File

@@ -20,8 +20,7 @@ class BotModule(FanoutModule):
"""
def __init__(self, config_id: str, config: dict, *, name: str = "Bot") -> None:
super().__init__(config_id, config)
self._name = name
super().__init__(config_id, config, name=name)
async def on_message(self, data: dict) -> None:
"""Kick off bot execution in a background task so we don't block dispatch."""
@@ -110,10 +109,10 @@ class BotModule(FanoutModule):
timeout=BOT_EXECUTION_TIMEOUT,
)
except asyncio.TimeoutError:
logger.warning("Bot '%s' execution timed out", self._name)
logger.warning("Bot '%s' execution timed out", self.name)
return
except Exception as e:
logger.warning("Bot '%s' execution error: %s", self._name, e)
logger.warning("Bot '%s' execution error: %s", self.name, e)
return
if response:

View File

@@ -108,10 +108,7 @@ class FanoutManager:
return
try:
if config_type == "bot":
module = cls(config_id, config_blob, name=cfg.get("name", "Bot"))
else:
module = cls(config_id, config_blob)
module = cls(config_id, config_blob, name=cfg.get("name", ""))
await module.start()
self._modules[config_id] = (module, scope)
logger.info(

View File

@@ -29,8 +29,8 @@ def _config_to_settings(config: dict) -> SimpleNamespace:
class MqttCommunityModule(FanoutModule):
"""Wraps a CommunityMqttPublisher for community packet sharing."""
def __init__(self, config_id: str, config: dict) -> None:
super().__init__(config_id, config)
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._publisher = CommunityMqttPublisher()
async def start(self) -> None:

View File

@@ -29,8 +29,8 @@ def _config_to_settings(config: dict) -> SimpleNamespace:
class MqttPrivateModule(FanoutModule):
"""Wraps an MqttPublisher instance for private MQTT forwarding."""
def __init__(self, config_id: str, config: dict) -> None:
super().__init__(config_id, config)
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._publisher = MqttPublisher()
async def start(self) -> None:

View File

@@ -2,6 +2,8 @@
from __future__ import annotations
import hashlib
import hmac
import logging
import httpx
@@ -14,8 +16,8 @@ logger = logging.getLogger(__name__)
class WebhookModule(FanoutModule):
"""Delivers message data to an HTTP endpoint via POST (or configurable method)."""
def __init__(self, config_id: str, config: dict) -> None:
super().__init__(config_id, config)
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._client: httpx.AsyncClient | None = None
self._last_error: str | None = None
@@ -44,18 +46,25 @@ class WebhookModule(FanoutModule):
method = self.config.get("method", "POST").upper()
extra_headers = self.config.get("headers", {})
secret = self.config.get("secret", "")
hmac_secret = self.config.get("hmac_secret", "")
hmac_header = self.config.get("hmac_header", "X-Webhook-Signature")
headers = {
"Content-Type": "application/json",
"X-Webhook-Event": event_type,
**extra_headers,
}
if secret:
headers["X-Webhook-Secret"] = secret
import json as _json
body_bytes = _json.dumps(data, separators=(",", ":"), sort_keys=True).encode()
if hmac_secret:
sig = hmac.new(hmac_secret.encode(), body_bytes, hashlib.sha256).hexdigest()
headers[hmac_header or "X-Webhook-Signature"] = f"sha256={sig}"
try:
resp = await self._client.request(method, url, json=data, headers=headers)
resp = await self._client.request(method, url, content=body_bytes, headers=headers)
resp.raise_for_status()
self._last_error = None
except httpx.HTTPStatusError as exc:

View File

@@ -44,10 +44,10 @@ def _validate_mqtt_private_config(config: dict) -> None:
def _validate_mqtt_community_config(config: dict) -> None:
"""Validate mqtt_community config blob."""
iata = config.get("iata", "")
if iata and not _IATA_RE.fullmatch(iata.upper().strip()):
if not iata or not _IATA_RE.fullmatch(iata.upper().strip()):
raise HTTPException(
status_code=400,
detail="IATA code must be exactly 3 uppercase alphabetic characters",
detail="IATA code is required and must be exactly 3 uppercase alphabetic characters",
)

View File

@@ -777,19 +777,43 @@ function WebhookConfigEditor({
<option value="PATCH">PATCH</option>
</select>
</div>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-webhook-secret">Secret (optional)</Label>
<Input
id="fanout-webhook-secret"
type="password"
placeholder="Sent as X-Webhook-Secret header"
value={(config.secret as string) || ''}
onChange={(e) => onChange({ ...config, secret: e.target.value })}
/>
<Separator />
<div className="space-y-3">
<Label>HMAC Signing</Label>
<p className="text-xs text-muted-foreground">
When a secret is set, each request includes an HMAC-SHA256 signature of the JSON body in
the specified header (e.g. <code className="bg-muted px-1 rounded">sha256=ab12cd...</code>
).
</p>
<div className="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div className="space-y-2">
<Label htmlFor="fanout-webhook-hmac-secret">HMAC Secret</Label>
<Input
id="fanout-webhook-hmac-secret"
type="password"
placeholder="Leave empty to disable signing"
value={(config.hmac_secret as string) || ''}
onChange={(e) => onChange({ ...config, hmac_secret: e.target.value })}
/>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-webhook-hmac-header">Signature Header Name</Label>
<Input
id="fanout-webhook-hmac-header"
type="text"
placeholder="X-Webhook-Signature"
value={(config.hmac_header as string) || ''}
onChange={(e) => onChange({ ...config, hmac_header: e.target.value })}
/>
</div>
</div>
</div>
<Separator />
<div className="space-y-2">
<Label htmlFor="fanout-webhook-headers">Extra Headers (JSON)</Label>
<textarea
@@ -915,7 +939,8 @@ export function SettingsFanoutSection({
url: '',
method: 'POST',
headers: {},
secret: '',
hmac_secret: '',
hmac_header: '',
},
apprise: {
urls: '',

View File

@@ -0,0 +1,156 @@
import { render, screen, waitFor, fireEvent } from '@testing-library/react';
import { describe, expect, it, vi, beforeEach } from 'vitest';
import { SettingsFanoutSection } from '../components/settings/SettingsFanoutSection';
import type { HealthStatus, FanoutConfig } from '../types';
// Mock the api module
vi.mock('../api', () => ({
api: {
getFanoutConfigs: vi.fn(),
createFanoutConfig: vi.fn(),
updateFanoutConfig: vi.fn(),
deleteFanoutConfig: vi.fn(),
getChannels: vi.fn(),
getContacts: vi.fn(),
},
}));
// Suppress BotCodeEditor lazy load in tests
vi.mock('../components/BotCodeEditor', () => ({
BotCodeEditor: () => <textarea data-testid="bot-code-editor" />,
}));
import { api } from '../api';
const mockedApi = vi.mocked(api);
const baseHealth: HealthStatus = {
status: 'connected',
radio_connected: true,
connection_info: 'Serial: /dev/ttyUSB0',
database_size_mb: 1.2,
oldest_undecrypted_timestamp: null,
fanout_statuses: {},
bots_disabled: false,
};
const webhookConfig: FanoutConfig = {
id: 'wh-1',
type: 'webhook',
name: 'Test Hook',
enabled: true,
config: { url: 'https://example.com/hook', method: 'POST', headers: {} },
scope: { messages: 'all', raw_packets: 'none' },
sort_order: 0,
created_at: 1000,
};
function renderSection(overrides?: { health?: HealthStatus }) {
return render(
<SettingsFanoutSection
health={overrides?.health ?? baseHealth}
onHealthRefresh={vi.fn(async () => {})}
/>
);
}
beforeEach(() => {
vi.clearAllMocks();
mockedApi.getFanoutConfigs.mockResolvedValue([]);
mockedApi.getChannels.mockResolvedValue([]);
mockedApi.getContacts.mockResolvedValue([]);
});
describe('SettingsFanoutSection', () => {
it('shows add buttons for all integration types', async () => {
renderSection();
await waitFor(() => {
expect(screen.getByRole('button', { name: 'Private MQTT' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Webhook' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Apprise' })).toBeInTheDocument();
expect(screen.getByRole('button', { name: 'Bot' })).toBeInTheDocument();
});
});
it('hides bot add button when bots_disabled', async () => {
renderSection({ health: { ...baseHealth, bots_disabled: true } });
await waitFor(() => {
expect(screen.queryByRole('button', { name: 'Bot' })).not.toBeInTheDocument();
});
});
it('shows bots disabled banner when bots_disabled', async () => {
renderSection({ health: { ...baseHealth, bots_disabled: true } });
await waitFor(() => {
expect(screen.getByText(/Bot system is disabled/)).toBeInTheDocument();
});
});
it('lists existing configs after load', async () => {
mockedApi.getFanoutConfigs.mockResolvedValue([webhookConfig]);
renderSection();
await waitFor(() => {
expect(screen.getByText('Test Hook')).toBeInTheDocument();
});
});
it('navigates to edit view when clicking edit', async () => {
mockedApi.getFanoutConfigs.mockResolvedValue([webhookConfig]);
renderSection();
await waitFor(() => {
expect(screen.getByText('Test Hook')).toBeInTheDocument();
});
const editBtn = screen.getByRole('button', { name: 'Edit' });
fireEvent.click(editBtn);
await waitFor(() => {
expect(screen.getByText('← Back to list')).toBeInTheDocument();
});
});
it('calls toggle enabled on checkbox click', async () => {
mockedApi.getFanoutConfigs.mockResolvedValue([webhookConfig]);
mockedApi.updateFanoutConfig.mockResolvedValue({ ...webhookConfig, enabled: false });
renderSection();
await waitFor(() => {
expect(screen.getByText('Test Hook')).toBeInTheDocument();
});
const checkbox = screen.getByRole('checkbox');
fireEvent.click(checkbox);
await waitFor(() => {
expect(mockedApi.updateFanoutConfig).toHaveBeenCalledWith('wh-1', { enabled: false });
});
});
it('navigates to create view when clicking add button', async () => {
const createdWebhook: FanoutConfig = {
id: 'wh-new',
type: 'webhook',
name: 'Webhook',
enabled: false,
config: { url: '', method: 'POST', headers: {} },
scope: { messages: 'all', raw_packets: 'none' },
sort_order: 0,
created_at: 2000,
};
mockedApi.createFanoutConfig.mockResolvedValue(createdWebhook);
// After creation, getFanoutConfigs returns the new config
mockedApi.getFanoutConfigs.mockResolvedValueOnce([]).mockResolvedValueOnce([createdWebhook]);
renderSection();
await waitFor(() => {
expect(screen.getByRole('button', { name: 'Webhook' })).toBeInTheDocument();
});
fireEvent.click(screen.getByRole('button', { name: 'Webhook' }));
await waitFor(() => {
expect(screen.getByText('← Back to list')).toBeInTheDocument();
// Should show the URL input for webhook type
expect(screen.getByLabelText(/URL/)).toBeInTheDocument();
});
});
});

View File

@@ -40,10 +40,6 @@ test.describe('Webhook integration settings', () => {
// Verify method defaults to POST
await expect(page.locator('#fanout-webhook-method')).toHaveValue('POST');
// Fill in a secret
const secretInput = page.locator('#fanout-webhook-secret');
await secretInput.fill('e2e-secret');
// Rename it
const nameInput = page.locator('#fanout-edit-name');
await nameInput.clear();
@@ -69,7 +65,7 @@ test.describe('Webhook integration settings', () => {
const webhook = await createFanoutConfig({
type: 'webhook',
name: 'API Webhook',
config: { url: 'https://example.com/hook', method: 'POST', headers: {}, secret: '' },
config: { url: 'https://example.com/hook', method: 'POST', headers: {} },
enabled: true,
});
createdWebhookId = webhook.id;
@@ -105,7 +101,7 @@ test.describe('Webhook integration settings', () => {
const webhook = await createFanoutConfig({
type: 'webhook',
name: 'Scope Webhook',
config: { url: 'https://example.com/hook', method: 'POST', headers: {}, secret: '' },
config: { url: 'https://example.com/hook', method: 'POST', headers: {} },
});
createdWebhookId = webhook.id;
@@ -140,7 +136,7 @@ test.describe('Webhook integration settings', () => {
const webhook = await createFanoutConfig({
type: 'webhook',
name: 'Delete Me Webhook',
config: { url: 'https://example.com/hook', method: 'POST', headers: {}, secret: '' },
config: { url: 'https://example.com/hook', method: 'POST', headers: {} },
});
createdWebhookId = webhook.id;

View File

@@ -0,0 +1,563 @@
"""Tests addressing fanout hitlist gaps: BotModule params, migrations 036-038,
disable_bots PATCH guard, and community MQTT IATA validation."""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import aiosqlite
import pytest
from fastapi import HTTPException
from app.migrations import set_version
# ---------------------------------------------------------------------------
# T1: BotModule parameter extraction
# ---------------------------------------------------------------------------
class TestBotModuleParameterExtraction:
"""Verify BotModule._run_for_message extracts params from broadcast data."""
@pytest.mark.asyncio
async def test_channel_is_outgoing_propagated(self):
"""Channel messages with outgoing=True pass is_outgoing=True to bot code."""
from app.fanout.bot import BotModule
captured = {}
def fake_execute(
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
):
captured["is_outgoing"] = is_outgoing
captured["is_dm"] = is_dm
return None
mod = BotModule("test", {"code": "def bot(**k): pass"}, name="Test")
with (
patch("app.fanout.bot_exec.execute_bot_code", side_effect=fake_execute),
patch(
"app.fanout.bot_exec._bot_semaphore",
MagicMock(__aenter__=AsyncMock(), __aexit__=AsyncMock()),
),
patch("app.fanout.bot.asyncio.sleep", new_callable=AsyncMock),
patch("app.repository.ChannelRepository") as mock_chan,
):
mock_chan.get_by_key = AsyncMock(return_value=MagicMock(name="#test"))
await mod._run_for_message(
{
"type": "CHAN",
"conversation_key": "ch1",
"text": "Alice: hello",
"sender_name": "Alice",
"outgoing": True,
}
)
assert captured["is_outgoing"] is True
assert captured["is_dm"] is False
@pytest.mark.asyncio
async def test_channel_is_outgoing_false_by_default(self):
"""Channel messages without outgoing field default to is_outgoing=False."""
from app.fanout.bot import BotModule
captured = {}
def fake_execute(
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
):
captured["is_outgoing"] = is_outgoing
return None
mod = BotModule("test", {"code": "def bot(**k): pass"}, name="Test")
with (
patch("app.fanout.bot_exec.execute_bot_code", side_effect=fake_execute),
patch(
"app.fanout.bot_exec._bot_semaphore",
MagicMock(__aenter__=AsyncMock(), __aexit__=AsyncMock()),
),
patch("app.fanout.bot.asyncio.sleep", new_callable=AsyncMock),
patch("app.repository.ChannelRepository") as mock_chan,
):
mock_chan.get_by_key = AsyncMock(return_value=MagicMock(name="#test"))
await mod._run_for_message(
{
"type": "CHAN",
"conversation_key": "ch1",
"text": "Bob: hi",
"sender_name": "Bob",
}
)
assert captured["is_outgoing"] is False
@pytest.mark.asyncio
async def test_path_extracted_from_paths_list(self):
"""Path is extracted from paths list-of-dicts format."""
from app.fanout.bot import BotModule
captured = {}
def fake_execute(
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
):
captured["path"] = path
return None
mod = BotModule("test", {"code": "def bot(**k): pass"}, name="Test")
with (
patch("app.fanout.bot_exec.execute_bot_code", side_effect=fake_execute),
patch(
"app.fanout.bot_exec._bot_semaphore",
MagicMock(__aenter__=AsyncMock(), __aexit__=AsyncMock()),
),
patch("app.fanout.bot.asyncio.sleep", new_callable=AsyncMock),
patch("app.repository.ContactRepository") as mock_contact,
):
mock_contact.get_by_key = AsyncMock(return_value=MagicMock(name="Alice"))
await mod._run_for_message(
{
"type": "PRIV",
"conversation_key": "pk1",
"text": "hello",
"paths": [{"path": "aabb", "rssi": -50}],
}
)
assert captured["path"] == "aabb"
@pytest.mark.asyncio
async def test_channel_sender_prefix_stripped(self):
"""Channel message text has 'SenderName: ' prefix stripped."""
from app.fanout.bot import BotModule
captured = {}
def fake_execute(
code,
sender_name,
sender_key,
message_text,
is_dm,
channel_key,
channel_name,
sender_timestamp,
path,
is_outgoing,
):
captured["message_text"] = message_text
captured["sender_name"] = sender_name
return None
mod = BotModule("test", {"code": "def bot(**k): pass"}, name="Test")
with (
patch("app.fanout.bot_exec.execute_bot_code", side_effect=fake_execute),
patch(
"app.fanout.bot_exec._bot_semaphore",
MagicMock(__aenter__=AsyncMock(), __aexit__=AsyncMock()),
),
patch("app.fanout.bot.asyncio.sleep", new_callable=AsyncMock),
patch("app.repository.ChannelRepository") as mock_chan,
):
mock_chan.get_by_key = AsyncMock(return_value=MagicMock(name="#general"))
await mod._run_for_message(
{
"type": "CHAN",
"conversation_key": "ch1",
"text": "Alice: the actual message",
"sender_name": "Alice",
}
)
assert captured["message_text"] == "the actual message"
assert captured["sender_name"] == "Alice"
# ---------------------------------------------------------------------------
# T2: Migration 036, 037, 038 tests
# ---------------------------------------------------------------------------
# Helper to build an app_settings schema at version 35 (pre-fanout)
_APP_SETTINGS_V35 = """
CREATE TABLE app_settings (
id INTEGER PRIMARY KEY,
mqtt_broker_host TEXT DEFAULT '',
mqtt_broker_port INTEGER DEFAULT 1883,
mqtt_username TEXT DEFAULT '',
mqtt_password TEXT DEFAULT '',
mqtt_use_tls INTEGER DEFAULT 0,
mqtt_tls_insecure INTEGER DEFAULT 0,
mqtt_topic_prefix TEXT DEFAULT 'meshcore',
mqtt_publish_messages INTEGER DEFAULT 0,
mqtt_publish_raw_packets INTEGER DEFAULT 0,
community_mqtt_enabled INTEGER DEFAULT 0,
community_mqtt_iata TEXT DEFAULT '',
community_mqtt_broker_host TEXT DEFAULT 'mqtt-us-v1.letsmesh.net',
community_mqtt_broker_port INTEGER DEFAULT 443,
community_mqtt_email TEXT DEFAULT '',
bots TEXT DEFAULT '[]'
)
"""
class TestMigration036:
"""Test migration 036: create fanout_configs and migrate MQTT settings."""
@pytest.mark.asyncio
async def test_migrates_private_mqtt(self):
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 35)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute(
"""INSERT INTO app_settings (id, mqtt_broker_host, mqtt_broker_port,
mqtt_publish_messages, mqtt_publish_raw_packets)
VALUES (1, 'broker.test', 8883, 1, 0)"""
)
await conn.commit()
from app.migrations import _migrate_036_create_fanout_configs
await _migrate_036_create_fanout_configs(conn)
cursor = await conn.execute("SELECT * FROM fanout_configs WHERE type = 'mqtt_private'")
row = await cursor.fetchone()
assert row is not None
config = json.loads(row["config"])
assert config["broker_host"] == "broker.test"
assert config["broker_port"] == 8883
assert row["enabled"] == 1
scope = json.loads(row["scope"])
assert scope["messages"] == "all"
assert scope["raw_packets"] == "none"
finally:
await conn.close()
@pytest.mark.asyncio
async def test_migrates_enabled_community_mqtt(self):
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 35)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute(
"""INSERT INTO app_settings (id, community_mqtt_enabled,
community_mqtt_iata, community_mqtt_email)
VALUES (1, 1, 'PDX', 'user@test.com')"""
)
await conn.commit()
from app.migrations import _migrate_036_create_fanout_configs
await _migrate_036_create_fanout_configs(conn)
cursor = await conn.execute(
"SELECT * FROM fanout_configs WHERE type = 'mqtt_community'"
)
row = await cursor.fetchone()
assert row is not None
assert row["enabled"] == 1
config = json.loads(row["config"])
assert config["iata"] == "PDX"
assert config["email"] == "user@test.com"
finally:
await conn.close()
@pytest.mark.asyncio
async def test_preserves_disabled_but_configured_community_mqtt(self):
"""B4 fix: disabled community MQTT with populated fields is preserved."""
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 35)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute(
"""INSERT INTO app_settings (id, community_mqtt_enabled,
community_mqtt_iata, community_mqtt_email)
VALUES (1, 0, 'SEA', 'test@test.com')"""
)
await conn.commit()
from app.migrations import _migrate_036_create_fanout_configs
await _migrate_036_create_fanout_configs(conn)
cursor = await conn.execute(
"SELECT * FROM fanout_configs WHERE type = 'mqtt_community'"
)
row = await cursor.fetchone()
assert row is not None
assert row["enabled"] == 0 # Preserved as disabled
config = json.loads(row["config"])
assert config["iata"] == "SEA"
finally:
await conn.close()
@pytest.mark.asyncio
async def test_skips_empty_settings(self):
"""No fanout rows created when MQTT is unconfigured."""
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 35)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute("INSERT INTO app_settings (id) VALUES (1)")
await conn.commit()
from app.migrations import _migrate_036_create_fanout_configs
await _migrate_036_create_fanout_configs(conn)
cursor = await conn.execute("SELECT COUNT(*) FROM fanout_configs")
row = await cursor.fetchone()
assert row[0] == 0
finally:
await conn.close()
class TestMigration037:
"""Test migration 037: migrate bots to fanout_configs."""
@pytest.mark.asyncio
async def test_migrates_bots(self):
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 36)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute("""
CREATE TABLE IF NOT EXISTS fanout_configs (
id TEXT PRIMARY KEY, type TEXT NOT NULL, name TEXT NOT NULL,
enabled INTEGER DEFAULT 0, config TEXT NOT NULL DEFAULT '{}',
scope TEXT NOT NULL DEFAULT '{}', sort_order INTEGER DEFAULT 0,
created_at INTEGER NOT NULL DEFAULT 0
)
""")
bots = [
{"name": "Echo", "enabled": True, "code": "def bot(**k): return k['message_text']"},
{"name": "Silent", "enabled": False, "code": "def bot(**k): pass"},
]
await conn.execute(
"INSERT INTO app_settings (id, bots) VALUES (1, ?)",
(json.dumps(bots),),
)
await conn.commit()
from app.migrations import _migrate_037_bots_to_fanout
await _migrate_037_bots_to_fanout(conn)
cursor = await conn.execute(
"SELECT * FROM fanout_configs WHERE type = 'bot' ORDER BY sort_order"
)
rows = await cursor.fetchall()
assert len(rows) == 2
assert rows[0]["name"] == "Echo"
assert rows[0]["enabled"] == 1
assert rows[1]["name"] == "Silent"
assert rows[1]["enabled"] == 0
finally:
await conn.close()
@pytest.mark.asyncio
async def test_empty_bots_is_noop(self):
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 36)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute("""
CREATE TABLE IF NOT EXISTS fanout_configs (
id TEXT PRIMARY KEY, type TEXT NOT NULL, name TEXT NOT NULL,
enabled INTEGER DEFAULT 0, config TEXT NOT NULL DEFAULT '{}',
scope TEXT NOT NULL DEFAULT '{}', sort_order INTEGER DEFAULT 0,
created_at INTEGER NOT NULL DEFAULT 0
)
""")
await conn.execute("INSERT INTO app_settings (id, bots) VALUES (1, '[]')")
await conn.commit()
from app.migrations import _migrate_037_bots_to_fanout
await _migrate_037_bots_to_fanout(conn)
cursor = await conn.execute("SELECT COUNT(*) FROM fanout_configs")
row = await cursor.fetchone()
assert row[0] == 0
finally:
await conn.close()
class TestMigration038:
"""Test migration 038: drop legacy columns from app_settings."""
@pytest.mark.asyncio
async def test_drops_legacy_columns(self):
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 37)
await conn.execute(_APP_SETTINGS_V35)
await conn.execute("INSERT INTO app_settings (id) VALUES (1)")
await conn.commit()
from app.migrations import _migrate_038_drop_legacy_columns
await _migrate_038_drop_legacy_columns(conn)
cursor = await conn.execute("PRAGMA table_info(app_settings)")
remaining = {row[1] for row in await cursor.fetchall()}
assert "mqtt_broker_host" not in remaining
assert "bots" not in remaining
assert "community_mqtt_enabled" not in remaining
# id should remain
assert "id" in remaining
finally:
await conn.close()
@pytest.mark.asyncio
async def test_handles_already_dropped_columns(self):
"""Migration handles columns already dropped (idempotent)."""
conn = await aiosqlite.connect(":memory:")
conn.row_factory = aiosqlite.Row
try:
await set_version(conn, 37)
# Minimal table with only id — all legacy columns already gone
await conn.execute("CREATE TABLE app_settings (id INTEGER PRIMARY KEY)")
await conn.commit()
from app.migrations import _migrate_038_drop_legacy_columns
# Should not raise
await _migrate_038_drop_legacy_columns(conn)
finally:
await conn.close()
# ---------------------------------------------------------------------------
# T3: PATCH /api/fanout/{id} disable_bots guard
# ---------------------------------------------------------------------------
class TestDisableBotsPatchGuard:
"""Verify PATCH /api/fanout/{id} returns 403 for bots when disabled."""
@pytest.mark.asyncio
async def test_bot_update_returns_403_when_disabled(self, test_db):
"""PATCH on an existing bot config returns 403 when bots are disabled."""
from app.repository.fanout import FanoutConfigRepository
from app.routers.fanout import FanoutConfigUpdate, update_fanout_config
# Create a bot config first (with bots enabled)
cfg = await FanoutConfigRepository.create(
config_type="bot",
name="Test Bot",
config={"code": "def bot(**k): pass"},
scope={"messages": "all", "raw_packets": "none"},
enabled=False,
)
# Now try to update with bots disabled
with patch("app.routers.fanout.server_settings", MagicMock(disable_bots=True)):
with pytest.raises(HTTPException) as exc_info:
await update_fanout_config(
cfg["id"],
FanoutConfigUpdate(enabled=True),
)
assert exc_info.value.status_code == 403
assert "disabled" in exc_info.value.detail.lower()
@pytest.mark.asyncio
async def test_mqtt_update_allowed_when_bots_disabled(self, test_db):
"""PATCH on a non-bot config is allowed even when bots are disabled."""
from app.repository.fanout import FanoutConfigRepository
from app.routers.fanout import FanoutConfigUpdate, update_fanout_config
cfg = await FanoutConfigRepository.create(
config_type="mqtt_private",
name="Test MQTT",
config={"broker_host": "localhost", "broker_port": 1883},
scope={"messages": "all", "raw_packets": "all"},
enabled=False,
)
with patch("app.routers.fanout.server_settings", MagicMock(disable_bots=True)):
with patch("app.fanout.manager.fanout_manager.reload_config", new_callable=AsyncMock):
result = await update_fanout_config(
cfg["id"],
FanoutConfigUpdate(name="Renamed"),
)
assert result["name"] == "Renamed"
# ---------------------------------------------------------------------------
# Q4: Community MQTT IATA validation
# ---------------------------------------------------------------------------
class TestCommunityMqttIataValidation:
"""Verify community MQTT requires valid IATA when enabled."""
def test_empty_iata_rejected(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException) as exc_info:
_validate_mqtt_community_config({"iata": ""})
assert exc_info.value.status_code == 400
assert "IATA" in exc_info.value.detail
def test_missing_iata_rejected(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException) as exc_info:
_validate_mqtt_community_config({})
assert exc_info.value.status_code == 400
def test_valid_iata_accepted(self):
from app.routers.fanout import _validate_mqtt_community_config
# Should not raise
_validate_mqtt_community_config({"iata": "PDX"})
def test_invalid_iata_format_rejected(self):
from app.routers.fanout import _validate_mqtt_community_config
with pytest.raises(HTTPException):
_validate_mqtt_community_config({"iata": "PD"})
with pytest.raises(HTTPException):
_validate_mqtt_community_config({"iata": "pdx1"})

View File

@@ -493,12 +493,11 @@ async def webhook_server():
await server.stop()
def _webhook_config(port: int, secret: str = "") -> dict:
def _webhook_config(port: int, extra_headers: dict | None = None) -> dict:
return {
"url": f"http://127.0.0.1:{port}/hook",
"method": "POST",
"headers": {},
"secret": secret,
"headers": extra_headers or {},
}
@@ -540,12 +539,12 @@ class TestFanoutWebhookIntegration:
assert results[0]["headers"].get("x-webhook-event") == "message"
@pytest.mark.asyncio
async def test_webhook_sends_secret_header(self, webhook_server, integration_db):
"""Webhook sends X-Webhook-Secret when configured."""
async def test_webhook_sends_custom_headers(self, webhook_server, integration_db):
"""Webhook forwards custom headers from config."""
cfg = await FanoutConfigRepository.create(
config_type="webhook",
name="Secret Hook",
config=_webhook_config(webhook_server.port, secret="my-secret-123"),
name="Custom Header Hook",
config=_webhook_config(webhook_server.port, extra_headers={"X-Custom": "my-value"}),
scope={"messages": "all", "raw_packets": "none"},
enabled=True,
)
@@ -556,7 +555,7 @@ class TestFanoutWebhookIntegration:
await _wait_connected(manager, cfg["id"])
await manager.broadcast_message(
{"type": "CHAN", "conversation_key": "ch1", "text": "secret test"}
{"type": "CHAN", "conversation_key": "ch1", "text": "header test"}
)
results = await webhook_server.wait_for(1)
@@ -564,7 +563,48 @@ class TestFanoutWebhookIntegration:
await manager.stop_all()
assert len(results) == 1
assert results[0]["headers"].get("x-webhook-secret") == "my-secret-123"
assert results[0]["headers"].get("x-custom") == "my-value"
@pytest.mark.asyncio
async def test_webhook_hmac_signature(self, webhook_server, integration_db):
"""Webhook sends HMAC-SHA256 signature when hmac_secret is configured."""
import hashlib
import hmac
import json
secret = "test-secret-key"
cfg = await FanoutConfigRepository.create(
config_type="webhook",
name="HMAC Hook",
config={
**_webhook_config(webhook_server.port),
"hmac_secret": secret,
"hmac_header": "X-My-Sig",
},
scope={"messages": "all", "raw_packets": "none"},
enabled=True,
)
manager = FanoutManager()
try:
await manager.load_from_db()
await _wait_connected(manager, cfg["id"])
msg = {"type": "CHAN", "conversation_key": "ch1", "text": "hmac test"}
await manager.broadcast_message(msg)
results = await webhook_server.wait_for(1)
finally:
await manager.stop_all()
assert len(results) == 1
sig_header = results[0]["headers"].get("x-my-sig", "")
assert sig_header.startswith("sha256=")
# Verify the signature matches
body_bytes = json.dumps(results[0]["body"], separators=(",", ":"), sort_keys=True).encode()
expected = hmac.new(secret.encode(), body_bytes, hashlib.sha256).hexdigest()
assert sig_header == f"sha256={expected}"
@pytest.mark.asyncio
async def test_webhook_disabled_no_delivery(self, webhook_server, integration_db):
@@ -725,14 +765,14 @@ class TestFanoutWebhookIntegration:
cfg_a = await FanoutConfigRepository.create(
config_type="webhook",
name="Hook A",
config=_webhook_config(webhook_server.port, secret="a"),
config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": "a"}),
scope={"messages": "all", "raw_packets": "none"},
enabled=True,
)
cfg_b = await FanoutConfigRepository.create(
config_type="webhook",
name="Hook B",
config=_webhook_config(webhook_server.port, secret="b"),
config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": "b"}),
scope={"messages": "all", "raw_packets": "none"},
enabled=True,
)
@@ -752,9 +792,9 @@ class TestFanoutWebhookIntegration:
await manager.stop_all()
assert len(results) == 2
secrets = {r["headers"].get("x-webhook-secret") for r in results}
assert "a" in secrets
assert "b" in secrets
hook_ids = {r["headers"].get("x-hook-id") for r in results}
assert "a" in hook_ids
assert "b" in hook_ids
@pytest.mark.asyncio
async def test_webhook_disable_stops_delivery(self, webhook_server, integration_db):