Add some brutal tests for webhooks

This commit is contained in:
Jack Kingsman
2026-03-06 16:06:13 -08:00
parent dd13768a44
commit 8ffae50b87

View File

@@ -882,6 +882,88 @@ class TestFanoutWebhookIntegration:
assert "a" in hook_ids
assert "b" in hook_ids
@pytest.mark.asyncio
async def test_two_webhooks_with_different_channel_scopes_receive_only_matches(
self, webhook_server, integration_db
):
"""Scoped webhooks run in parallel and each receive only their selected room."""
cfg_a = await FanoutConfigRepository.create(
config_type="webhook",
name="Hook A",
config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": "a"}),
scope={"messages": {"channels": ["ch-a"], "contacts": "none"}, "raw_packets": "none"},
enabled=True,
)
cfg_b = await FanoutConfigRepository.create(
config_type="webhook",
name="Hook B",
config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": "b"}),
scope={"messages": {"channels": ["ch-b"], "contacts": "none"}, "raw_packets": "none"},
enabled=True,
)
manager = FanoutManager()
try:
await manager.load_from_db()
await _wait_connected(manager, cfg_a["id"])
await _wait_connected(manager, cfg_b["id"])
await manager.broadcast_message(
{"type": "CHAN", "conversation_key": "ch-a", "text": "room a"}
)
await manager.broadcast_message(
{"type": "CHAN", "conversation_key": "ch-b", "text": "room b"}
)
await manager.broadcast_message(
{"type": "CHAN", "conversation_key": "ch-c", "text": "room c"}
)
results = await webhook_server.wait_for(2)
await asyncio.sleep(0.3)
finally:
await manager.stop_all()
assert len(results) == 2
seen = {(r["headers"].get("x-hook-id"), r["body"]["conversation_key"]) for r in results}
assert ("a", "ch-a") in seen
assert ("b", "ch-b") in seen
@pytest.mark.asyncio
async def test_fifty_webhooks_same_target_all_deliver(self, webhook_server, integration_db):
"""A large number of webhook modules targeting one endpoint all deliver."""
config_ids: list[str] = []
webhook_count = 50
for i in range(webhook_count):
cfg = await FanoutConfigRepository.create(
config_type="webhook",
name=f"Hook {i}",
config=_webhook_config(webhook_server.port, extra_headers={"X-Hook-Id": str(i)}),
scope={"messages": "all", "raw_packets": "none"},
enabled=True,
)
config_ids.append(cfg["id"])
manager = FanoutManager()
try:
await manager.load_from_db()
for config_id in config_ids:
await _wait_connected(manager, config_id)
await manager.broadcast_message(
{"type": "PRIV", "conversation_key": "pk1", "text": "fanout storm"}
)
results = await webhook_server.wait_for(webhook_count, timeout=10.0)
await asyncio.sleep(0.5)
finally:
await manager.stop_all()
assert len(results) == webhook_count
hook_ids = {r["headers"].get("x-hook-id") for r in results}
assert hook_ids == {str(i) for i in range(webhook_count)}
assert all(r["body"]["text"] == "fanout storm" for r in results)
@pytest.mark.asyncio
async def test_webhook_disable_stops_delivery(self, webhook_server, integration_db):
"""Disabling a webhook stops delivery immediately."""
@@ -1284,6 +1366,179 @@ class TestFanoutAppriseIntegration:
body_text = str(apprise_capture_server.received[0])
assert "included" in body_text
@pytest.mark.asyncio
async def test_two_apprise_modules_with_different_channel_scopes_receive_only_matches(
self, integration_db
):
"""Scoped Apprise modules run in parallel and each receive only their selected room."""
server_a = AppriseJsonCaptureServer()
server_b = AppriseJsonCaptureServer()
await server_a.start()
await server_b.start()
try:
cfg_a = await FanoutConfigRepository.create(
config_type="apprise",
name="Apprise A",
config={
"urls": f"json://127.0.0.1:{server_a.port}",
"include_path": False,
},
scope={
"messages": {"channels": ["ch-a"], "contacts": "none"},
"raw_packets": "none",
},
enabled=True,
)
cfg_b = await FanoutConfigRepository.create(
config_type="apprise",
name="Apprise B",
config={
"urls": f"json://127.0.0.1:{server_b.port}",
"include_path": False,
},
scope={
"messages": {"channels": ["ch-b"], "contacts": "none"},
"raw_packets": "none",
},
enabled=True,
)
manager = FanoutManager()
try:
await manager.load_from_db()
assert cfg_a["id"] in manager._modules
assert cfg_b["id"] in manager._modules
await manager.broadcast_message(
{
"type": "CHAN",
"conversation_key": "ch-a",
"channel_name": "#a",
"text": "room a",
"sender_name": "Alice",
}
)
await manager.broadcast_message(
{
"type": "CHAN",
"conversation_key": "ch-b",
"channel_name": "#b",
"text": "room b",
"sender_name": "Bob",
}
)
await manager.broadcast_message(
{
"type": "CHAN",
"conversation_key": "ch-c",
"channel_name": "#c",
"text": "room c",
"sender_name": "Carol",
}
)
results_a = await server_a.wait_for(1)
results_b = await server_b.wait_for(1)
await asyncio.sleep(1.0)
finally:
await manager.stop_all()
assert len(results_a) == 1
assert len(results_b) == 1
assert "#a" in str(results_a[0])
assert "room a" in str(results_a[0])
assert "#b" in str(results_b[0])
assert "room b" in str(results_b[0])
finally:
await server_a.stop()
await server_b.stop()
@pytest.mark.asyncio
async def test_webhook_and_apprise_with_different_channel_scopes_receive_only_matches(
self, integration_db
):
"""Webhook and Apprise dispatch in parallel and each honor their own room scope."""
webhook = WebhookCaptureServer()
apprise = AppriseJsonCaptureServer()
await webhook.start()
await apprise.start()
try:
webhook_cfg = await FanoutConfigRepository.create(
config_type="webhook",
name="Room A Hook",
config=_webhook_config(webhook.port),
scope={
"messages": {"channels": ["ch-a"], "contacts": "none"},
"raw_packets": "none",
},
enabled=True,
)
apprise_cfg = await FanoutConfigRepository.create(
config_type="apprise",
name="Room B Apprise",
config={
"urls": f"json://127.0.0.1:{apprise.port}",
"include_path": False,
},
scope={
"messages": {"channels": ["ch-b"], "contacts": "none"},
"raw_packets": "none",
},
enabled=True,
)
manager = FanoutManager()
try:
await manager.load_from_db()
await _wait_connected(manager, webhook_cfg["id"])
assert apprise_cfg["id"] in manager._modules
await manager.broadcast_message(
{
"type": "CHAN",
"conversation_key": "ch-a",
"channel_name": "#a",
"text": "room a",
"sender_name": "Alice",
}
)
await manager.broadcast_message(
{
"type": "CHAN",
"conversation_key": "ch-b",
"channel_name": "#b",
"text": "room b",
"sender_name": "Bob",
}
)
await manager.broadcast_message(
{
"type": "CHAN",
"conversation_key": "ch-c",
"channel_name": "#c",
"text": "room c",
"sender_name": "Carol",
}
)
webhook_results = await webhook.wait_for(1)
apprise_results = await apprise.wait_for(1)
await asyncio.sleep(1.0)
finally:
await manager.stop_all()
assert len(webhook_results) == 1
assert webhook_results[0]["body"]["conversation_key"] == "ch-a"
assert webhook_results[0]["body"]["text"] == "room a"
assert len(apprise_results) == 1
apprise_body = str(apprise_results[0])
assert "#b" in apprise_body
assert "room b" in apprise_body
finally:
await webhook.stop()
await apprise.stop()
@pytest.mark.asyncio
async def test_apprise_includes_routing_path(self, apprise_capture_server, integration_db):
"""Apprise with include_path=True shows routing hops in the body."""