diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx index 1a30ebc..5d2d57a 100644 --- a/frontend/src/components/settings/SettingsFanoutSection.tsx +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -75,8 +75,7 @@ function getStatusLabel(status: string | undefined, type?: string) { } function getStatusColor(status: string | undefined, enabled?: boolean) { - if (enabled === false) - return 'bg-warning shadow-[0_0_6px_hsl(var(--warning)/0.5)]'; + if (enabled === false) return 'bg-warning shadow-[0_0_6px_hsl(var(--warning)/0.5)]'; if (status === 'connected') return 'bg-status-connected shadow-[0_0_6px_hsl(var(--status-connected)/0.5)]'; if (status === 'error') return 'bg-destructive shadow-[0_0_6px_hsl(var(--destructive)/0.5)]'; @@ -1088,7 +1087,10 @@ export function SettingsFanoutSection({
diff --git a/tests/e2e/specs/apprise.spec.ts b/tests/e2e/specs/apprise.spec.ts new file mode 100644 index 0000000..c9dc4e7 --- /dev/null +++ b/tests/e2e/specs/apprise.spec.ts @@ -0,0 +1,205 @@ +import { test, expect } from '@playwright/test'; +import { + createFanoutConfig, + deleteFanoutConfig, + getFanoutConfigs, +} from '../helpers/api'; + +test.describe('Apprise integration settings', () => { + let createdAppriseId: string | null = null; + + test.afterEach(async () => { + if (createdAppriseId) { + try { + await deleteFanoutConfig(createdAppriseId); + } catch { + console.warn('Failed to delete test apprise config'); + } + createdAppriseId = null; + } + }); + + test('create apprise via UI, configure URLs, save as enabled', async ({ page }) => { + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + // Open settings and navigate to MQTT & Forwarding + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Click the Apprise add button + await page.getByRole('button', { name: 'Apprise' }).click(); + + // Should navigate to the detail/edit view with default name + await expect(page.getByDisplayValue('Apprise')).toBeVisible(); + + // Fill in notification URL + const urlsTextarea = page.locator('#fanout-apprise-urls'); + await urlsTextarea.fill('json://localhost:9999'); + + // Verify preserve identity checkbox is checked by default + const preserveIdentity = page.getByText('Preserve identity on Discord'); + await expect(preserveIdentity).toBeVisible(); + + // Verify include routing path checkbox is checked by default + const includePath = page.getByText('Include routing path in notifications'); + await expect(includePath).toBeVisible(); + + // Rename it + const nameInput = page.locator('#fanout-edit-name'); + await nameInput.clear(); + await nameInput.fill('E2E Apprise'); + + // Save as enabled + await page.getByRole('button', { name: /Save as Enabled/i }).click(); + await expect(page.getByText('Integration saved and enabled')).toBeVisible(); + + // Should be back on list view with our apprise config visible + await expect(page.getByText('E2E Apprise')).toBeVisible(); + + // Clean up via API + const configs = await getFanoutConfigs(); + const apprise = configs.find((c) => c.name === 'E2E Apprise'); + if (apprise) { + createdAppriseId = apprise.id; + } + }); + + test('create apprise via API, verify options persist after edit', async ({ page }) => { + const apprise = await createFanoutConfig({ + type: 'apprise', + name: 'API Apprise', + config: { + urls: 'json://localhost:9999\nslack://token_a/token_b/token_c', + preserve_identity: false, + include_path: false, + }, + enabled: true, + }); + createdAppriseId = apprise.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Click Edit on our apprise config + const row = page.getByText('API Apprise').locator('..'); + await row.getByRole('button', { name: 'Edit' }).click(); + + // Verify the URLs textarea has our content + const urlsTextarea = page.locator('#fanout-apprise-urls'); + await expect(urlsTextarea).toHaveValue(/json:\/\/localhost:9999/); + await expect(urlsTextarea).toHaveValue(/slack:\/\/token_a/); + + // Verify checkboxes reflect our config (both unchecked) + const preserveCheckbox = page + .getByText('Preserve identity on Discord') + .locator('xpath=ancestor::label[1]') + .locator('input[type="checkbox"]'); + await expect(preserveCheckbox).not.toBeChecked(); + + const pathCheckbox = page + .getByText('Include routing path in notifications') + .locator('xpath=ancestor::label[1]') + .locator('input[type="checkbox"]'); + await expect(pathCheckbox).not.toBeChecked(); + + // Go back + await page.getByText('← Back to list').click(); + }); + + test('apprise shows scope selector', async ({ page }) => { + const apprise = await createFanoutConfig({ + type: 'apprise', + name: 'Scope Apprise', + config: { urls: 'json://localhost:9999' }, + }); + createdAppriseId = apprise.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + const row = page.getByText('Scope Apprise').locator('..'); + await row.getByRole('button', { name: 'Edit' }).click(); + + // Verify scope selector is present + await expect(page.getByText('Message Scope')).toBeVisible(); + await expect(page.getByText('All messages')).toBeVisible(); + + // Select "All except listed" mode + await page.getByText('All except listed channels/contacts').click(); + + // Should show channel and contact lists with exclude label + await expect(page.getByText('(exclude)')).toBeVisible(); + + // Go back + await page.getByText('← Back to list').click(); + }); + + test('apprise disabled config shows amber dot and can be enabled via save button', async ({ + page, + }) => { + const apprise = await createFanoutConfig({ + type: 'apprise', + name: 'Disabled Apprise', + config: { urls: 'json://localhost:9999' }, + enabled: false, + }); + createdAppriseId = apprise.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Should show "Disabled" text + const row = page.getByText('Disabled Apprise').locator('..'); + await expect(row.getByText('Disabled')).toBeVisible(); + + // Edit it + await row.getByRole('button', { name: 'Edit' }).click(); + + // Save as enabled + await page.getByRole('button', { name: /Save as Enabled/i }).click(); + await expect(page.getByText('Integration saved and enabled')).toBeVisible(); + + // Verify it's now enabled via API + const configs = await getFanoutConfigs(); + const updated = configs.find((c) => c.id === apprise.id); + expect(updated?.enabled).toBe(true); + }); + + test('delete apprise via UI', async ({ page }) => { + const apprise = await createFanoutConfig({ + type: 'apprise', + name: 'Delete Me Apprise', + config: { urls: 'json://localhost:9999' }, + }); + createdAppriseId = apprise.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + const row = page.getByText('Delete Me Apprise').locator('..'); + await row.getByRole('button', { name: 'Edit' }).click(); + + // Accept the confirmation dialog + page.on('dialog', (dialog) => dialog.accept()); + + await page.getByRole('button', { name: 'Delete' }).click(); + await expect(page.getByText('Integration deleted')).toBeVisible(); + + // Should be back on list, apprise gone + await expect(page.getByText('Delete Me Apprise')).not.toBeVisible(); + createdAppriseId = null; + }); +}); diff --git a/tests/e2e/specs/bot.spec.ts b/tests/e2e/specs/bot.spec.ts index 42b155c..ebaa6b3 100644 --- a/tests/e2e/specs/bot.spec.ts +++ b/tests/e2e/specs/bot.spec.ts @@ -1,12 +1,9 @@ import { test, expect } from '@playwright/test'; import { ensureFlightlessChannel, - getFanoutConfigs, createFanoutConfig, deleteFanoutConfig, - updateFanoutConfig, } from '../helpers/api'; -import type { FanoutConfig } from '../helpers/api'; const BOT_CODE = `def bot(sender_name, sender_key, message_text, is_dm, channel_key, channel_name, sender_timestamp, path): if channel_name == "#flightless" and "!e2etest" in message_text.lower(): @@ -48,7 +45,7 @@ test.describe('Bot functionality', () => { await expect(page.getByText('Connected')).toBeVisible(); await page.getByText('Settings').click(); - await page.getByRole('button', { name: /Fanout/ }).click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); // The bot name should be visible in the integration list await expect(page.getByText('E2E Test Bot')).toBeVisible(); diff --git a/tests/e2e/specs/webhook.spec.ts b/tests/e2e/specs/webhook.spec.ts new file mode 100644 index 0000000..1f9f3c9 --- /dev/null +++ b/tests/e2e/specs/webhook.spec.ts @@ -0,0 +1,172 @@ +import { test, expect } from '@playwright/test'; +import { + createFanoutConfig, + deleteFanoutConfig, + getFanoutConfigs, +} from '../helpers/api'; + +test.describe('Webhook integration settings', () => { + let createdWebhookId: string | null = null; + + test.afterEach(async () => { + if (createdWebhookId) { + try { + await deleteFanoutConfig(createdWebhookId); + } catch { + console.warn('Failed to delete test webhook'); + } + createdWebhookId = null; + } + }); + + test('create webhook via UI, configure, save as enabled, verify in list', async ({ page }) => { + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + // Open settings and navigate to MQTT & Forwarding + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Click the Webhook add button + await page.getByRole('button', { name: 'Webhook' }).click(); + + // Should navigate to the detail/edit view with default name + await expect(page.getByDisplayValue('Webhook')).toBeVisible(); + + // Fill in webhook URL + const urlInput = page.locator('#fanout-webhook-url'); + await urlInput.fill('https://example.com/e2e-test-hook'); + + // Verify method defaults to POST + await expect(page.locator('#fanout-webhook-method')).toHaveValue('POST'); + + // Fill in a secret + const secretInput = page.locator('#fanout-webhook-secret'); + await secretInput.fill('e2e-secret'); + + // Rename it + const nameInput = page.locator('#fanout-edit-name'); + await nameInput.clear(); + await nameInput.fill('E2E Webhook'); + + // Save as enabled + await page.getByRole('button', { name: /Save as Enabled/i }).click(); + await expect(page.getByText('Integration saved and enabled')).toBeVisible(); + + // Should be back on list view with our webhook visible + await expect(page.getByText('E2E Webhook')).toBeVisible(); + + // Clean up via API + const configs = await getFanoutConfigs(); + const webhook = configs.find((c) => c.name === 'E2E Webhook'); + if (webhook) { + createdWebhookId = webhook.id; + } + }); + + test('create webhook via API, edit in UI, save as disabled', async ({ page }) => { + // Create via API + const webhook = await createFanoutConfig({ + type: 'webhook', + name: 'API Webhook', + config: { url: 'https://example.com/hook', method: 'POST', headers: {}, secret: '' }, + enabled: true, + }); + createdWebhookId = webhook.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Click Edit on our webhook + const row = page.getByText('API Webhook').locator('..'); + await row.getByRole('button', { name: 'Edit' }).click(); + + // Should be in edit view + await expect(page.getByDisplayValue('API Webhook')).toBeVisible(); + + // Change method to PUT + await page.locator('#fanout-webhook-method').selectOption('PUT'); + + // Save as disabled + await page.getByRole('button', { name: /Save as Disabled/i }).click(); + await expect(page.getByText('Integration saved')).toBeVisible(); + + // Verify it's now disabled in the list + const configs = await getFanoutConfigs(); + const updated = configs.find((c) => c.id === webhook.id); + expect(updated?.enabled).toBe(false); + expect(updated?.config.method).toBe('PUT'); + }); + + test('webhook shows scope selector with channel/contact options', async ({ page }) => { + const webhook = await createFanoutConfig({ + type: 'webhook', + name: 'Scope Webhook', + config: { url: 'https://example.com/hook', method: 'POST', headers: {}, secret: '' }, + }); + createdWebhookId = webhook.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Click Edit + const row = page.getByText('Scope Webhook').locator('..'); + await row.getByRole('button', { name: 'Edit' }).click(); + + // Verify scope selector is visible with all four modes + await expect(page.getByText('Message Scope')).toBeVisible(); + await expect(page.getByText('All messages')).toBeVisible(); + await expect(page.getByText('No messages')).toBeVisible(); + await expect(page.getByText('Only listed channels/contacts')).toBeVisible(); + await expect(page.getByText('All except listed channels/contacts')).toBeVisible(); + + // Select "Only listed" to see channel/contact checkboxes + await page.getByText('Only listed channels/contacts').click(); + + // Should show Channels and Contacts sections + await expect(page.getByText('Channels')).toBeVisible(); + await expect(page.getByText('Contacts')).toBeVisible(); + + // Go back without saving + await page.getByText('← Back to list').click(); + }); + + test('delete webhook via UI', async ({ page }) => { + const webhook = await createFanoutConfig({ + type: 'webhook', + name: 'Delete Me Webhook', + config: { url: 'https://example.com/hook', method: 'POST', headers: {}, secret: '' }, + }); + createdWebhookId = webhook.id; + + await page.goto('/'); + await expect(page.getByText('Connected')).toBeVisible(); + + await page.getByText('Settings').click(); + await page.getByRole('button', { name: /MQTT.*Forwarding/ }).click(); + + // Click Edit + const row = page.getByText('Delete Me Webhook').locator('..'); + await row.getByRole('button', { name: 'Edit' }).click(); + + // Accept the confirmation dialog + page.on('dialog', (dialog) => dialog.accept()); + + // Click Delete + await page.getByRole('button', { name: 'Delete' }).click(); + + await expect(page.getByText('Integration deleted')).toBeVisible(); + + // Should be back on list, webhook gone + await expect(page.getByText('Delete Me Webhook')).not.toBeVisible(); + + // Already deleted, clear the cleanup reference + createdWebhookId = null; + }); +}); diff --git a/tests/e2e/specs/radio-settings.spec.ts b/tests/e2e/specs/zz-radio-settings.spec.ts similarity index 100% rename from tests/e2e/specs/radio-settings.spec.ts rename to tests/e2e/specs/zz-radio-settings.spec.ts diff --git a/tests/test_fanout.py b/tests/test_fanout.py index 1407141..f69f9ce 100644 --- a/tests/test_fanout.py +++ b/tests/test_fanout.py @@ -672,6 +672,25 @@ class TestWebhookModule: assert mod.status == "connected" await mod.stop() + @pytest.mark.asyncio + async def test_does_not_skip_outgoing_messages(self): + """Webhook should forward outgoing messages (unlike Apprise).""" + from app.fanout.webhook import WebhookModule + + mod = WebhookModule("test", {"url": "http://localhost:9999/hook"}) + await mod.start() + # Mock the client to capture the request + sent_data: list[dict] = [] + + async def capture_send(data: dict, *, event_type: str) -> None: + sent_data.append(data) + + mod._send = capture_send + await mod.on_message({"type": "PRIV", "text": "outgoing", "outgoing": True}) + assert len(sent_data) == 1 + assert sent_data[0]["outgoing"] is True + await mod.stop() + @pytest.mark.asyncio async def test_dispatch_with_matching_scope(self): """WebhookModule dispatches through FanoutManager scope matching.""" @@ -883,3 +902,142 @@ class TestAppriseValidation: scope = _enforce_scope("apprise", {"messages": "all", "raw_packets": "all"}) assert scope["raw_packets"] == "none" assert scope["messages"] == "all" + + +# --------------------------------------------------------------------------- +# Comprehensive scope/filter selection logic tests +# --------------------------------------------------------------------------- + + +class TestMatchesFilter: + """Test _matches_filter directly for all filter shapes.""" + + def test_all_matches_any_key(self): + from app.fanout.manager import _matches_filter + + assert _matches_filter("all", "anything") + assert _matches_filter("all", "") + assert _matches_filter("all", "special-chars-!@#") + + def test_none_matches_nothing(self): + from app.fanout.manager import _matches_filter + + assert not _matches_filter("none", "anything") + assert not _matches_filter("none", "") + + def test_list_matches_present_key(self): + from app.fanout.manager import _matches_filter + + assert _matches_filter(["a", "b", "c"], "b") + + def test_list_no_match_absent_key(self): + from app.fanout.manager import _matches_filter + + assert not _matches_filter(["a", "b"], "c") + + def test_list_empty_matches_nothing(self): + from app.fanout.manager import _matches_filter + + assert not _matches_filter([], "anything") + + def test_except_excludes_listed(self): + from app.fanout.manager import _matches_filter + + assert not _matches_filter({"except": ["blocked"]}, "blocked") + + def test_except_includes_unlisted(self): + from app.fanout.manager import _matches_filter + + assert _matches_filter({"except": ["blocked"]}, "allowed") + + def test_except_empty_matches_everything(self): + from app.fanout.manager import _matches_filter + + assert _matches_filter({"except": []}, "anything") + assert _matches_filter({"except": []}, "") + + def test_except_multiple_excludes(self): + from app.fanout.manager import _matches_filter + + filt = {"except": ["x", "y", "z"]} + assert not _matches_filter(filt, "x") + assert not _matches_filter(filt, "y") + assert not _matches_filter(filt, "z") + assert _matches_filter(filt, "a") + + def test_unrecognized_shape_returns_false(self): + from app.fanout.manager import _matches_filter + + assert not _matches_filter(42, "key") + assert not _matches_filter({"other": "thing"}, "key") + assert not _matches_filter(True, "key") + + +class TestScopeMatchesMessageCombinations: + """Test _scope_matches_message with complex combinations.""" + + def test_channel_with_only_channels_listed(self): + scope = {"messages": {"channels": ["ch1", "ch2"], "contacts": "all"}} + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch2"}) + assert not _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch3"}) + + def test_contact_with_only_contacts_listed(self): + scope = {"messages": {"channels": "all", "contacts": ["pk1"]}} + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + assert not _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk2"}) + + def test_mixed_channels_all_contacts_except(self): + scope = {"messages": {"channels": "all", "contacts": {"except": ["pk-blocked"]}}} + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk-ok"}) + assert not _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk-blocked"}) + + def test_channels_except_contacts_only(self): + scope = { + "messages": { + "channels": {"except": ["ch-muted"]}, + "contacts": ["pk-friend"], + } + } + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch-ok"}) + assert not _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch-muted"}) + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk-friend"}) + assert not _scope_matches_message( + scope, {"type": "PRIV", "conversation_key": "pk-stranger"} + ) + + def test_both_channels_and_contacts_none(self): + scope = {"messages": {"channels": "none", "contacts": "none"}} + assert not _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + assert not _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + + def test_both_channels_and_contacts_all(self): + scope = {"messages": {"channels": "all", "contacts": "all"}} + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + + def test_missing_contacts_key_defaults_false(self): + scope = {"messages": {"channels": "all"}} + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + # Missing contacts -> defaults to "none" -> no match for DMs + assert not _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + + def test_missing_channels_key_defaults_false(self): + scope = {"messages": {"contacts": "all"}} + assert not _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) + + def test_unknown_message_type_no_match(self): + scope = {"messages": {"channels": "all", "contacts": "all"}} + assert not _scope_matches_message(scope, {"type": "UNKNOWN", "conversation_key": "x"}) + + def test_both_except_empty_matches_everything(self): + scope = { + "messages": { + "channels": {"except": []}, + "contacts": {"except": []}, + } + } + assert _scope_matches_message(scope, {"type": "CHAN", "conversation_key": "ch1"}) + assert _scope_matches_message(scope, {"type": "PRIV", "conversation_key": "pk1"}) diff --git a/tests/test_fanout_integration.py b/tests/test_fanout_integration.py index fe2ad6c..ea591b4 100644 --- a/tests/test_fanout_integration.py +++ b/tests/test_fanout_integration.py @@ -4,6 +4,8 @@ Spins up a minimal in-process MQTT 3.1.1 broker on a random port, creates fanout configs in an in-memory DB, starts real MqttPrivateModule instances via the FanoutManager, and verifies that PUBLISH packets arrive (or don't) based on enabled/disabled state and scope settings. + +Also covers webhook and Apprise modules with real HTTP capture servers. """ import asyncio @@ -835,3 +837,361 @@ class TestFanoutWebhookIntegration: assert "yes" in texts assert "dm yes" in texts assert "nope" not in texts + + @pytest.mark.asyncio + async def test_webhook_delivers_outgoing_messages(self, webhook_server, integration_db): + """Webhooks should deliver outgoing messages (unlike Apprise which skips them).""" + cfg = await FanoutConfigRepository.create( + config_type="webhook", + name="Outgoing Hook", + config=_webhook_config(webhook_server.port), + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + await _wait_connected(manager, cfg["id"]) + + await manager.broadcast_message( + { + "type": "PRIV", + "conversation_key": "pk1", + "text": "outgoing msg", + "outgoing": True, + } + ) + + results = await webhook_server.wait_for(1) + finally: + await manager.stop_all() + + assert len(results) == 1 + assert results[0]["body"]["text"] == "outgoing msg" + assert results[0]["body"]["outgoing"] is True + + +# --------------------------------------------------------------------------- +# Apprise integration tests (real HTTP capture server + real AppriseModule) +# --------------------------------------------------------------------------- + + +class AppriseJsonCaptureServer: + """Minimal HTTP server that captures JSON POSTs from Apprise's json:// plugin. + + Apprise json:// sends POST with JSON body containing title, body, type fields. + """ + + def __init__(self): + self.received: list[dict] = [] + self._server: asyncio.Server | None = None + self.port: int = 0 + + async def start(self) -> int: + self._server = await asyncio.start_server(self._handle, "127.0.0.1", 0) + self.port = self._server.sockets[0].getsockname()[1] + return self.port + + async def stop(self): + if self._server: + self._server.close() + await self._server.wait_closed() + + async def wait_for(self, count: int, timeout: float = 10.0) -> list[dict]: + deadline = asyncio.get_event_loop().time() + timeout + while len(self.received) < count: + if asyncio.get_event_loop().time() >= deadline: + break + await asyncio.sleep(0.05) + return list(self.received) + + async def _handle(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + try: + request_line = await reader.readline() + if not request_line: + return + + headers: dict[str, str] = {} + while True: + line = await reader.readline() + if line in (b"\r\n", b"\n", b""): + break + decoded = line.decode("utf-8", errors="replace").strip() + if ":" in decoded: + key, val = decoded.split(":", 1) + headers[key.strip().lower()] = val.strip() + + content_length = int(headers.get("content-length", "0")) + body = b"" + if content_length > 0: + body = await reader.readexactly(content_length) + + if body: + try: + payload = json.loads(body) + except Exception: + payload = {"_raw": body.decode("utf-8", errors="replace")} + self.received.append(payload) + + response = b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK" + writer.write(response) + await writer.drain() + except (asyncio.IncompleteReadError, ConnectionError, OSError): + pass + finally: + writer.close() + + +@pytest.fixture +async def apprise_capture_server(): + server = AppriseJsonCaptureServer() + await server.start() + yield server + await server.stop() + + +class TestFanoutAppriseIntegration: + """End-to-end: real HTTP capture server <-> real AppriseModule via json:// URL.""" + + @pytest.mark.asyncio + async def test_apprise_delivers_incoming_dm(self, apprise_capture_server, integration_db): + """Apprise module delivers incoming DMs via json:// to a real HTTP server.""" + cfg = await FanoutConfigRepository.create( + config_type="apprise", + name="Test Apprise", + config={ + "urls": f"json://127.0.0.1:{apprise_capture_server.port}", + "preserve_identity": True, + "include_path": False, + }, + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert cfg["id"] in manager._modules + + await manager.broadcast_message( + { + "type": "PRIV", + "conversation_key": "pk1", + "text": "hello from mesh", + "sender_name": "Alice", + "outgoing": False, + } + ) + + results = await apprise_capture_server.wait_for(1) + finally: + await manager.stop_all() + + assert len(results) >= 1 + # Apprise json:// sends body field with the formatted message + body_text = str(results[0]) + assert "Alice" in body_text + assert "hello from mesh" in body_text + + @pytest.mark.asyncio + async def test_apprise_delivers_incoming_channel_msg( + self, apprise_capture_server, integration_db + ): + """Apprise module delivers incoming channel messages.""" + cfg = await FanoutConfigRepository.create( + config_type="apprise", + name="Channel Apprise", + config={ + "urls": f"json://127.0.0.1:{apprise_capture_server.port}", + "include_path": False, + }, + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert cfg["id"] in manager._modules + + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch1", + "channel_name": "#general", + "text": "channel hello", + "sender_name": "Bob", + "outgoing": False, + } + ) + + results = await apprise_capture_server.wait_for(1) + finally: + await manager.stop_all() + + assert len(results) >= 1 + body_text = str(results[0]) + assert "Bob" in body_text + assert "channel hello" in body_text + assert "#general" in body_text + + @pytest.mark.asyncio + async def test_apprise_skips_outgoing(self, apprise_capture_server, integration_db): + """Apprise should NOT deliver outgoing messages.""" + cfg = await FanoutConfigRepository.create( + config_type="apprise", + name="No Outgoing", + config={ + "urls": f"json://127.0.0.1:{apprise_capture_server.port}", + }, + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert cfg["id"] in manager._modules + + await manager.broadcast_message( + { + "type": "PRIV", + "conversation_key": "pk1", + "text": "my outgoing", + "sender_name": "Me", + "outgoing": True, + } + ) + + await asyncio.sleep(1.0) + finally: + await manager.stop_all() + + assert len(apprise_capture_server.received) == 0 + + @pytest.mark.asyncio + async def test_apprise_disabled_no_delivery(self, apprise_capture_server, integration_db): + """Disabled Apprise module should not deliver anything.""" + await FanoutConfigRepository.create( + config_type="apprise", + name="Disabled Apprise", + config={ + "urls": f"json://127.0.0.1:{apprise_capture_server.port}", + }, + scope={"messages": "all", "raw_packets": "none"}, + enabled=False, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert len(manager._modules) == 0 + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "nope"} + ) + await asyncio.sleep(0.5) + finally: + await manager.stop_all() + + assert len(apprise_capture_server.received) == 0 + + @pytest.mark.asyncio + async def test_apprise_scope_selective_channels(self, apprise_capture_server, integration_db): + """Apprise with selective channel scope only delivers matching channels.""" + cfg = await FanoutConfigRepository.create( + config_type="apprise", + name="Selective Apprise", + config={ + "urls": f"json://127.0.0.1:{apprise_capture_server.port}", + "include_path": False, + }, + scope={ + "messages": {"channels": ["ch-yes"], "contacts": "none"}, + "raw_packets": "none", + }, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert cfg["id"] in manager._modules + + # Matching channel + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-yes", + "channel_name": "#yes", + "text": "included", + "sender_name": "A", + } + ) + # Non-matching channel + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-no", + "channel_name": "#no", + "text": "excluded", + "sender_name": "B", + } + ) + # DM — contacts is "none" + await manager.broadcast_message( + { + "type": "PRIV", + "conversation_key": "pk1", + "text": "dm excluded", + "sender_name": "C", + } + ) + + await apprise_capture_server.wait_for(1) + await asyncio.sleep(1.0) + finally: + await manager.stop_all() + + assert len(apprise_capture_server.received) == 1 + body_text = str(apprise_capture_server.received[0]) + assert "included" in body_text + + @pytest.mark.asyncio + async def test_apprise_includes_routing_path(self, apprise_capture_server, integration_db): + """Apprise with include_path=True shows routing hops in the body.""" + cfg = await FanoutConfigRepository.create( + config_type="apprise", + name="Path Apprise", + config={ + "urls": f"json://127.0.0.1:{apprise_capture_server.port}", + "include_path": True, + }, + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert cfg["id"] in manager._modules + + await manager.broadcast_message( + { + "type": "PRIV", + "conversation_key": "pk1", + "text": "routed msg", + "sender_name": "Eve", + "paths": [{"path": "2a3b"}], + } + ) + + results = await apprise_capture_server.wait_for(1) + finally: + await manager.stop_all() + + assert len(results) >= 1 + body_text = str(results[0]) + assert "Eve" in body_text + assert "routed msg" in body_text