diff --git a/tests/test_fanout_integration.py b/tests/test_fanout_integration.py index e6254d1..81f98fe 100644 --- a/tests/test_fanout_integration.py +++ b/tests/test_fanout_integration.py @@ -882,6 +882,88 @@ class TestFanoutWebhookIntegration: assert "a" in hook_ids assert "b" in hook_ids + @pytest.mark.asyncio + async def test_two_webhooks_with_different_channel_scopes_receive_only_matches( + self, webhook_server, integration_db + ): + """Scoped webhooks run in parallel and each receive only their selected room.""" + cfg_a = await FanoutConfigRepository.create( + config_type="webhook", + name="Hook A", + config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": "a"}), + scope={"messages": {"channels": ["ch-a"], "contacts": "none"}, "raw_packets": "none"}, + enabled=True, + ) + cfg_b = await FanoutConfigRepository.create( + config_type="webhook", + name="Hook B", + config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": "b"}), + scope={"messages": {"channels": ["ch-b"], "contacts": "none"}, "raw_packets": "none"}, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + await _wait_connected(manager, cfg_a["id"]) + await _wait_connected(manager, cfg_b["id"]) + + await manager.broadcast_message( + {"type": "CHAN", "conversation_key": "ch-a", "text": "room a"} + ) + await manager.broadcast_message( + {"type": "CHAN", "conversation_key": "ch-b", "text": "room b"} + ) + await manager.broadcast_message( + {"type": "CHAN", "conversation_key": "ch-c", "text": "room c"} + ) + + results = await webhook_server.wait_for(2) + await asyncio.sleep(0.3) + finally: + await manager.stop_all() + + assert len(results) == 2 + seen = {(r["headers"].get("x-hook-id"), r["body"]["conversation_key"]) for r in results} + assert ("a", "ch-a") in seen + assert ("b", "ch-b") in seen + + @pytest.mark.asyncio + async def test_fifty_webhooks_same_target_all_deliver(self, webhook_server, integration_db): + """A large number of webhook modules targeting one endpoint all deliver.""" + config_ids: list[str] = [] + webhook_count = 50 + + for i in range(webhook_count): + cfg = await FanoutConfigRepository.create( + config_type="webhook", + name=f"Hook {i}", + config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": str(i)}), + scope={"messages": "all", "raw_packets": "none"}, + enabled=True, + ) + config_ids.append(cfg["id"]) + + manager = FanoutManager() + try: + await manager.load_from_db() + for config_id in config_ids: + await _wait_connected(manager, config_id) + + await manager.broadcast_message( + {"type": "PRIV", "conversation_key": "pk1", "text": "fanout storm"} + ) + + results = await webhook_server.wait_for(webhook_count, timeout=10.0) + await asyncio.sleep(0.5) + finally: + await manager.stop_all() + + assert len(results) == webhook_count + hook_ids = {r["headers"].get("x-hook-id") for r in results} + assert hook_ids == {str(i) for i in range(webhook_count)} + assert all(r["body"]["text"] == "fanout storm" for r in results) + @pytest.mark.asyncio async def test_webhook_disable_stops_delivery(self, webhook_server, integration_db): """Disabling a webhook stops delivery immediately.""" @@ -1284,6 +1366,179 @@ class TestFanoutAppriseIntegration: body_text = str(apprise_capture_server.received[0]) assert "included" in body_text + @pytest.mark.asyncio + async def test_two_apprise_modules_with_different_channel_scopes_receive_only_matches( + self, integration_db + ): + """Scoped Apprise modules run in parallel and each receive only their selected room.""" + server_a = AppriseJsonCaptureServer() + server_b = AppriseJsonCaptureServer() + await server_a.start() + await server_b.start() + try: + cfg_a = await FanoutConfigRepository.create( + config_type="apprise", + name="Apprise A", + config={ + "urls": f"json://127.0.0.1:{server_a.port}", + "include_path": False, + }, + scope={ + "messages": {"channels": ["ch-a"], "contacts": "none"}, + "raw_packets": "none", + }, + enabled=True, + ) + cfg_b = await FanoutConfigRepository.create( + config_type="apprise", + name="Apprise B", + config={ + "urls": f"json://127.0.0.1:{server_b.port}", + "include_path": False, + }, + scope={ + "messages": {"channels": ["ch-b"], "contacts": "none"}, + "raw_packets": "none", + }, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + assert cfg_a["id"] in manager._modules + assert cfg_b["id"] in manager._modules + + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-a", + "channel_name": "#a", + "text": "room a", + "sender_name": "Alice", + } + ) + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-b", + "channel_name": "#b", + "text": "room b", + "sender_name": "Bob", + } + ) + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-c", + "channel_name": "#c", + "text": "room c", + "sender_name": "Carol", + } + ) + + results_a = await server_a.wait_for(1) + results_b = await server_b.wait_for(1) + await asyncio.sleep(1.0) + finally: + await manager.stop_all() + + assert len(results_a) == 1 + assert len(results_b) == 1 + assert "#a" in str(results_a[0]) + assert "room a" in str(results_a[0]) + assert "#b" in str(results_b[0]) + assert "room b" in str(results_b[0]) + finally: + await server_a.stop() + await server_b.stop() + + @pytest.mark.asyncio + async def test_webhook_and_apprise_with_different_channel_scopes_receive_only_matches( + self, integration_db + ): + """Webhook and Apprise dispatch in parallel and each honor their own room scope.""" + webhook = WebhookCaptureServer() + apprise = AppriseJsonCaptureServer() + await webhook.start() + await apprise.start() + try: + webhook_cfg = await FanoutConfigRepository.create( + config_type="webhook", + name="Room A Hook", + config=_webhook_config(webhook.port), + scope={ + "messages": {"channels": ["ch-a"], "contacts": "none"}, + "raw_packets": "none", + }, + enabled=True, + ) + apprise_cfg = await FanoutConfigRepository.create( + config_type="apprise", + name="Room B Apprise", + config={ + "urls": f"json://127.0.0.1:{apprise.port}", + "include_path": False, + }, + scope={ + "messages": {"channels": ["ch-b"], "contacts": "none"}, + "raw_packets": "none", + }, + enabled=True, + ) + + manager = FanoutManager() + try: + await manager.load_from_db() + await _wait_connected(manager, webhook_cfg["id"]) + assert apprise_cfg["id"] in manager._modules + + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-a", + "channel_name": "#a", + "text": "room a", + "sender_name": "Alice", + } + ) + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-b", + "channel_name": "#b", + "text": "room b", + "sender_name": "Bob", + } + ) + await manager.broadcast_message( + { + "type": "CHAN", + "conversation_key": "ch-c", + "channel_name": "#c", + "text": "room c", + "sender_name": "Carol", + } + ) + + webhook_results = await webhook.wait_for(1) + apprise_results = await apprise.wait_for(1) + await asyncio.sleep(1.0) + finally: + await manager.stop_all() + + assert len(webhook_results) == 1 + assert webhook_results[0]["body"]["conversation_key"] == "ch-a" + assert webhook_results[0]["body"]["text"] == "room a" + + assert len(apprise_results) == 1 + apprise_body = str(apprise_results[0]) + assert "#b" in apprise_body + assert "room b" in apprise_body + finally: + await webhook.stop() + await apprise.stop() + @pytest.mark.asyncio async def test_apprise_includes_routing_path(self, apprise_capture_server, integration_db): """Apprise with include_path=True shows routing hops in the body."""