Add SQS fanout

This commit is contained in:
Jack Kingsman
2026-03-11 14:17:08 -07:00
parent 472b4a5ed2
commit e7d1f28076
12 changed files with 574 additions and 12 deletions

View File

@@ -21,7 +21,7 @@ A web interface for MeshCore mesh radio networks. The backend connects to a Mesh
- `frontend/AGENTS.md` - Frontend (React, state management, WebSocket, components)
Ancillary AGENTS.md files which should generally not be reviewed unless specific work is being performed on those features include:
- `app/fanout/AGENTS_fanout.md` - Fanout bus architecture (MQTT, bots, webhooks, Apprise)
- `app/fanout/AGENTS_fanout.md` - Fanout bus architecture (MQTT, bots, webhooks, Apprise, SQS)
- `frontend/src/components/AGENTS_packet_visualizer.md` - Packet visualizer (force-directed graph, advert-path identity, layout engine)
## Architecture Overview
@@ -77,7 +77,7 @@ Ancillary AGENTS.md files which should generally not be reviewed unless specific
- Raw packet feed — a debug/observation tool ("radio aquarium"); interesting to watch or copy packets from, but not critical infrastructure
- Map view — visual display of node locations from advertisements
- Network visualizer — force-directed graph of mesh topology
- Fanout integrations (MQTT, bots, webhooks, Apprise) — see `app/fanout/AGENTS_fanout.md`
- Fanout integrations (MQTT, bots, webhooks, Apprise, SQS) — see `app/fanout/AGENTS_fanout.md`
- Read state tracking / mark-all-read — convenience feature for unread badges; no need for transactional atomicity or race-condition hardening
## Error Handling Philosophy
@@ -181,7 +181,7 @@ This message-layer echo/path handling is independent of raw-packet storage dedup
│ ├── event_handlers.py # Radio events
│ ├── decoder.py # Packet decryption
│ ├── websocket.py # Real-time broadcasts
│ └── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise (see fanout/AGENTS_fanout.md)
│ └── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise, SQS (see fanout/AGENTS_fanout.md)
├── frontend/ # React frontend
│ ├── AGENTS.md # Frontend documentation
│ ├── src/
@@ -393,7 +393,7 @@ Read state (`last_read_at`) is tracked **server-side** for consistency across de
**Note:** These are NOT the same as `Message.conversation_key` (the database field).
### Fanout Bus (MQTT, Bots, Webhooks, Apprise)
### Fanout Bus (MQTT, Bots, Webhooks, Apprise, SQS)
All external integrations are managed through the fanout bus (`app/fanout/`). Each integration is a `FanoutModule` with scope-based event filtering, stored in the `fanout_configs` table and managed via `GET/POST/PATCH/DELETE /api/fanout`.
@@ -446,7 +446,7 @@ mc.subscribe(EventType.ACK, handler)
| `MESHCORE_BASIC_AUTH_USERNAME` | *(none)* | Optional app-wide HTTP Basic auth username; must be set together with `MESHCORE_BASIC_AUTH_PASSWORD` |
| `MESHCORE_BASIC_AUTH_PASSWORD` | *(none)* | Optional app-wide HTTP Basic auth password; must be set together with `MESHCORE_BASIC_AUTH_USERNAME` |
**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `flood_scope`, `blocked_keys`, and `blocked_names`. `max_radio_contacts` is the configured radio contact capacity baseline used by background maintenance: favorites reload first, non-favorite fill targets about 80% of that value, and full offload/reload triggers around 95% occupancy. They are configured via `GET/PATCH /api/settings`. MQTT, bot, webhook, and Apprise configs are stored in the `fanout_configs` table, managed via `/api/fanout`.
**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `flood_scope`, `blocked_keys`, and `blocked_names`. `max_radio_contacts` is the configured radio contact capacity baseline used by background maintenance: favorites reload first, non-favorite fill targets about 80% of that value, and full offload/reload triggers around 95% occupancy. They are configured via `GET/PATCH /api/settings`. MQTT, bot, webhook, Apprise, and SQS configs are stored in the `fanout_configs` table, managed via `/api/fanout`.
Byte-perfect channel retries are user-triggered via `POST /api/messages/channel/{message_id}/resend` and are allowed for 30 seconds after the original send.

View File

@@ -45,7 +45,7 @@ app/
├── events.py # Typed WS event payload serialization
├── websocket.py # WS manager + broadcast helpers
├── security.py # Optional app-wide HTTP Basic auth middleware for HTTP + WS
├── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise (see fanout/AGENTS_fanout.md)
├── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise, SQS (see fanout/AGENTS_fanout.md)
├── dependencies.py # Shared FastAPI dependency providers
├── path_utils.py # Path hex rendering and hop-width helpers
├── keystore.py # Ephemeral private/public key storage for DM decryption
@@ -132,7 +132,7 @@ app/
### Fanout bus
- All external integrations (MQTT, bots, webhooks, Apprise) are managed through the fanout bus (`app/fanout/`).
- All external integrations (MQTT, bots, webhooks, Apprise, SQS) are managed through the fanout bus (`app/fanout/`).
- Configs stored in `fanout_configs` table, managed via `GET/POST/PATCH/DELETE /api/fanout`.
- `broadcast_event()` in `websocket.py` dispatches to the fanout manager for `message` and `raw_packet` events.
- Each integration is a `FanoutModule` with scope-based filtering.

View File

@@ -79,6 +79,14 @@ Push notifications via Apprise library. Config blob:
- `preserve_identity` — suppress Discord webhook name/avatar override
- `include_path` — include routing path in notification body
### sqs (sqs.py)
Amazon SQS delivery. Config blob:
- `queue_url` — target queue URL
- `region_name` (optional), `endpoint_url` (optional)
- `access_key_id`, `secret_access_key`, `session_token` (all optional; blank uses the normal AWS credential chain)
- Publishes a JSON envelope of the form `{"event_type":"message"|"raw_packet","data":...}`
- Supports both decoded messages and raw packets via normal scope selection
## Adding a New Integration Type
### Step-by-step checklist
@@ -132,7 +140,7 @@ Three changes needed:
**a)** Add to `_VALID_TYPES` set:
```python
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "my_type"}
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs", "my_type"}
```
**b)** Add a validation function:
@@ -280,6 +288,7 @@ Migrations:
- `app/fanout/bot_exec.py` — Bot code execution, response processing, rate limiting
- `app/fanout/webhook.py` — Webhook fanout module
- `app/fanout/apprise_mod.py` — Apprise fanout module
- `app/fanout/sqs.py` — Amazon SQS fanout module
- `app/repository/fanout.py` — Database CRUD
- `app/routers/fanout.py` — REST API
- `app/websocket.py``broadcast_event()` dispatches to fanout

View File

@@ -23,6 +23,7 @@ def _register_module_types() -> None:
from app.fanout.bot import BotModule
from app.fanout.mqtt_community import MqttCommunityModule
from app.fanout.mqtt_private import MqttPrivateModule
from app.fanout.sqs import SqsModule
from app.fanout.webhook import WebhookModule
_MODULE_TYPES["mqtt_private"] = MqttPrivateModule
@@ -30,6 +31,7 @@ def _register_module_types() -> None:
_MODULE_TYPES["bot"] = BotModule
_MODULE_TYPES["webhook"] = WebhookModule
_MODULE_TYPES["apprise"] = AppriseModule
_MODULE_TYPES["sqs"] = SqsModule
def _matches_filter(filter_value: Any, key: str) -> bool:

142
app/fanout/sqs.py Normal file
View File

@@ -0,0 +1,142 @@
"""Fanout module for Amazon SQS delivery."""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
from functools import partial
import boto3
from botocore.exceptions import BotoCoreError, ClientError
from app.fanout.base import FanoutModule
logger = logging.getLogger(__name__)
def _build_payload(data: dict, *, event_type: str) -> str:
"""Serialize a fanout event into a stable JSON envelope."""
return json.dumps(
{
"event_type": event_type,
"data": data,
},
separators=(",", ":"),
sort_keys=True,
)
def _is_fifo_queue(queue_url: str) -> bool:
"""Return True when the configured queue URL points at an SQS FIFO queue."""
return queue_url.rstrip("/").endswith(".fifo")
def _build_message_group_id(data: dict, *, event_type: str) -> str:
"""Choose a stable FIFO group ID from the event identity."""
if event_type == "message":
conversation_key = str(data.get("conversation_key", "")).strip()
if conversation_key:
return f"message-{conversation_key}"
return "message-default"
return "raw-packets"
def _build_message_deduplication_id(data: dict, *, event_type: str, body: str) -> str:
"""Choose a deterministic deduplication ID for FIFO queues."""
if event_type == "message":
message_id = data.get("id")
if isinstance(message_id, int):
return f"message-{message_id}"
else:
observation_id = data.get("observation_id")
if isinstance(observation_id, str) and observation_id.strip():
return f"raw-{observation_id}"
packet_id = data.get("id")
if isinstance(packet_id, int):
return f"raw-{packet_id}"
return hashlib.sha256(body.encode()).hexdigest()
class SqsModule(FanoutModule):
"""Delivers message and raw-packet events to an Amazon SQS queue."""
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
super().__init__(config_id, config, name=name)
self._client = None
self._last_error: str | None = None
async def start(self) -> None:
kwargs: dict[str, str] = {}
region_name = str(self.config.get("region_name", "")).strip()
endpoint_url = str(self.config.get("endpoint_url", "")).strip()
access_key_id = str(self.config.get("access_key_id", "")).strip()
secret_access_key = str(self.config.get("secret_access_key", "")).strip()
session_token = str(self.config.get("session_token", "")).strip()
if region_name:
kwargs["region_name"] = region_name
if endpoint_url:
kwargs["endpoint_url"] = endpoint_url
if access_key_id and secret_access_key:
kwargs["aws_access_key_id"] = access_key_id
kwargs["aws_secret_access_key"] = secret_access_key
if session_token:
kwargs["aws_session_token"] = session_token
self._client = boto3.client("sqs", **kwargs)
self._last_error = None
async def stop(self) -> None:
self._client = None
async def on_message(self, data: dict) -> None:
await self._send(data, event_type="message")
async def on_raw(self, data: dict) -> None:
await self._send(data, event_type="raw_packet")
async def _send(self, data: dict, *, event_type: str) -> None:
if self._client is None:
return
queue_url = str(self.config.get("queue_url", "")).strip()
if not queue_url:
return
body = _build_payload(data, event_type=event_type)
request_kwargs: dict[str, object] = {
"QueueUrl": queue_url,
"MessageBody": body,
"MessageAttributes": {
"event_type": {
"DataType": "String",
"StringValue": event_type,
}
},
}
if _is_fifo_queue(queue_url):
request_kwargs["MessageGroupId"] = _build_message_group_id(data, event_type=event_type)
request_kwargs["MessageDeduplicationId"] = _build_message_deduplication_id(
data, event_type=event_type, body=body
)
try:
await asyncio.to_thread(partial(self._client.send_message, **request_kwargs))
self._last_error = None
except (ClientError, BotoCoreError) as exc:
self._last_error = str(exc)
logger.warning("SQS %s send error: %s", self.config_id, exc)
except Exception as exc:
self._last_error = str(exc)
logger.exception("Unexpected SQS send error for %s", self.config_id)
@property
def status(self) -> str:
if not str(self.config.get("queue_url", "")).strip():
return "disconnected"
if self._last_error:
return "error"
return "connected"

View File

@@ -565,7 +565,7 @@ class FanoutConfig(BaseModel):
"""Configuration for a single fanout integration."""
id: str
type: str # 'mqtt_private' | 'mqtt_community' | 'bot' | 'webhook' | 'apprise'
type: str # 'mqtt_private' | 'mqtt_community' | 'bot' | 'webhook' | 'apprise' | 'sqs'
name: str
enabled: bool
config: dict

View File

@@ -13,7 +13,7 @@ from app.repository.fanout import FanoutConfigRepository
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/fanout", tags=["fanout"])
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise"}
_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs"}
_IATA_RE = re.compile(r"^[A-Z]{3}$")
_DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets"
@@ -179,6 +179,39 @@ def _validate_webhook_config(config: dict) -> None:
raise HTTPException(status_code=400, detail="headers must be a JSON object")
def _validate_sqs_config(config: dict) -> None:
"""Validate sqs config blob."""
queue_url = str(config.get("queue_url", "")).strip()
if not queue_url:
raise HTTPException(status_code=400, detail="queue_url is required for sqs")
if not queue_url.startswith(("https://", "http://")):
raise HTTPException(status_code=400, detail="queue_url must start with http:// or https://")
endpoint_url = str(config.get("endpoint_url", "")).strip()
if endpoint_url and not endpoint_url.startswith(("https://", "http://")):
raise HTTPException(
status_code=400,
detail="endpoint_url must start with http:// or https://",
)
access_key_id = str(config.get("access_key_id", "")).strip()
secret_access_key = str(config.get("secret_access_key", "")).strip()
session_token = str(config.get("session_token", "")).strip()
has_static_keypair = bool(access_key_id) and bool(secret_access_key)
has_partial_keypair = bool(access_key_id) != bool(secret_access_key)
if has_partial_keypair:
raise HTTPException(
status_code=400,
detail="access_key_id and secret_access_key must be set together for sqs",
)
if session_token and not has_static_keypair:
raise HTTPException(
status_code=400,
detail="session_token requires access_key_id and secret_access_key for sqs",
)
def _enforce_scope(config_type: str, scope: dict) -> dict:
"""Enforce type-specific scope constraints. Returns normalized scope."""
if config_type == "mqtt_community":
@@ -193,7 +226,7 @@ def _enforce_scope(config_type: str, scope: dict) -> dict:
detail="scope.messages must be 'all', 'none', or a filter object",
)
return {"messages": messages, "raw_packets": "none"}
# For mqtt_private, validate scope values
# For mqtt_private and sqs, validate scope values
messages = scope.get("messages", "all")
if messages not in ("all", "none") and not isinstance(messages, dict):
raise HTTPException(
@@ -240,6 +273,8 @@ async def create_fanout_config(body: FanoutConfigCreate) -> dict:
_validate_webhook_config(body.config)
elif body.type == "apprise":
_validate_apprise_config(body.config)
elif body.type == "sqs":
_validate_sqs_config(body.config)
scope = _enforce_scope(body.type, body.scope)
@@ -295,6 +330,8 @@ async def update_fanout_config(config_id: str, body: FanoutConfigUpdate) -> dict
_validate_webhook_config(config_to_validate)
elif existing["type"] == "apprise":
_validate_apprise_config(config_to_validate)
elif existing["type"] == "sqs":
_validate_sqs_config(config_to_validate)
updated = await FanoutConfigRepository.update(config_id, **kwargs)
if updated is None:

View File

@@ -18,6 +18,7 @@ const TYPE_LABELS: Record<string, string> = {
bot: 'Bot',
webhook: 'Webhook',
apprise: 'Apprise',
sqs: 'Amazon SQS',
};
const TYPE_OPTIONS = [
@@ -26,6 +27,7 @@ const TYPE_OPTIONS = [
{ value: 'bot', label: 'Bot' },
{ value: 'webhook', label: 'Webhook' },
{ value: 'apprise', label: 'Apprise' },
{ value: 'sqs', label: 'Amazon SQS' },
];
const DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE = 'meshcore/{IATA}/{PUBLIC_KEY}/packets';
@@ -60,6 +62,12 @@ function formatAppriseTargets(urls: string | undefined, maxLength = 80) {
return `${joined.slice(0, maxLength - 3)}...`;
}
function formatSqsQueueSummary(config: Record<string, unknown>) {
const queueUrl = ((config.queue_url as string) || '').trim();
if (!queueUrl) return 'No queue configured';
return queueUrl;
}
function getDefaultIntegrationName(type: string, configs: FanoutConfig[]) {
const label = TYPE_LABELS[type] || type;
const nextIndex = configs.filter((cfg) => cfg.type === type).length + 1;
@@ -998,6 +1006,111 @@ function WebhookConfigEditor({
);
}
function SqsConfigEditor({
config,
scope,
onChange,
onScopeChange,
}: {
config: Record<string, unknown>;
scope: Record<string, unknown>;
onChange: (config: Record<string, unknown>) => void;
onScopeChange: (scope: Record<string, unknown>) => void;
}) {
return (
<div className="space-y-3">
<p className="text-xs text-muted-foreground">
Send matched mesh events to an Amazon SQS queue for durable processing by workers, Lambdas,
or downstream automation.
</p>
<div className="rounded-md border border-warning/50 bg-warning/10 px-3 py-2 text-xs text-warning">
Outgoing messages and any selected raw packets will be delivered exactly as forwarded by the
fanout scope, including decrypted/plaintext message content.
</div>
<div className="space-y-2">
<Label htmlFor="fanout-sqs-queue-url">Queue URL</Label>
<Input
id="fanout-sqs-queue-url"
type="url"
placeholder="https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"
value={(config.queue_url as string) || ''}
onChange={(e) => onChange({ ...config, queue_url: e.target.value })}
/>
</div>
<div className="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div className="space-y-2">
<Label htmlFor="fanout-sqs-region">Region (optional)</Label>
<Input
id="fanout-sqs-region"
type="text"
placeholder="us-east-1"
value={(config.region_name as string) || ''}
onChange={(e) => onChange({ ...config, region_name: e.target.value })}
/>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-sqs-endpoint">Endpoint URL (optional)</Label>
<Input
id="fanout-sqs-endpoint"
type="url"
placeholder="http://localhost:4566"
value={(config.endpoint_url as string) || ''}
onChange={(e) => onChange({ ...config, endpoint_url: e.target.value })}
/>
<p className="text-xs text-muted-foreground">Useful for LocalStack or custom endpoints</p>
</div>
</div>
<Separator />
<div className="space-y-2">
<Label>Static Credentials (optional)</Label>
<p className="text-xs text-muted-foreground">
Leave blank to use the server&apos;s normal AWS credential chain.
</p>
</div>
<div className="grid grid-cols-1 sm:grid-cols-2 gap-4">
<div className="space-y-2">
<Label htmlFor="fanout-sqs-access-key">Access Key ID</Label>
<Input
id="fanout-sqs-access-key"
type="text"
value={(config.access_key_id as string) || ''}
onChange={(e) => onChange({ ...config, access_key_id: e.target.value })}
/>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-sqs-secret-key">Secret Access Key</Label>
<Input
id="fanout-sqs-secret-key"
type="password"
value={(config.secret_access_key as string) || ''}
onChange={(e) => onChange({ ...config, secret_access_key: e.target.value })}
/>
</div>
</div>
<div className="space-y-2">
<Label htmlFor="fanout-sqs-session-token">Session Token (optional)</Label>
<Input
id="fanout-sqs-session-token"
type="password"
value={(config.session_token as string) || ''}
onChange={(e) => onChange({ ...config, session_token: e.target.value })}
/>
</div>
<Separator />
<ScopeSelector scope={scope} onChange={onScopeChange} showRawPackets />
</div>
);
}
export function SettingsFanoutSection({
health,
onHealthRefresh,
@@ -1208,6 +1321,14 @@ export function SettingsFanoutSection({
preserve_identity: true,
include_path: true,
},
sqs: {
queue_url: '',
region_name: '',
endpoint_url: '',
access_key_id: '',
secret_access_key: '',
session_token: '',
},
};
const defaultScopes: Record<string, Record<string, unknown>> = {
mqtt_private: { messages: 'all', raw_packets: 'all' },
@@ -1215,6 +1336,7 @@ export function SettingsFanoutSection({
bot: { messages: 'all', raw_packets: 'none' },
webhook: { messages: 'all', raw_packets: 'none' },
apprise: { messages: 'all', raw_packets: 'none' },
sqs: { messages: 'all', raw_packets: 'none' },
};
setAddMenuOpen(false);
setEditingId(null);
@@ -1296,6 +1418,15 @@ export function SettingsFanoutSection({
/>
)}
{detailType === 'sqs' && (
<SqsConfigEditor
config={editConfig}
scope={editScope}
onChange={setEditConfig}
onScopeChange={setEditScope}
/>
)}
<Separator />
<div className="flex gap-2">
@@ -1520,6 +1651,17 @@ export function SettingsFanoutSection({
</div>
</div>
)}
{cfg.type === 'sqs' && (
<div className="space-y-1 border-t border-input px-3 py-2 text-xs text-muted-foreground">
<div className="break-all">
Queue:{' '}
<code>
{formatSqsQueueSummary(cfg.config as Record<string, unknown>)}
</code>
</div>
</div>
)}
</div>
);
})}

View File

@@ -90,6 +90,7 @@ describe('SettingsFanoutSection', () => {
).toBeInTheDocument();
expect(screen.getByRole('menuitem', { name: 'Webhook' })).toBeInTheDocument();
expect(screen.getByRole('menuitem', { name: 'Apprise' })).toBeInTheDocument();
expect(screen.getByRole('menuitem', { name: 'Amazon SQS' })).toBeInTheDocument();
expect(screen.getByRole('menuitem', { name: 'Bot' })).toBeInTheDocument();
});
@@ -309,6 +310,23 @@ describe('SettingsFanoutSection', () => {
expect(mockedApi.createFanoutConfig).not.toHaveBeenCalled();
});
it('new SQS draft shows queue url fields and sensible defaults', async () => {
renderSection();
await waitFor(() => {
expect(screen.getByRole('button', { name: 'Add Integration' })).toBeInTheDocument();
});
fireEvent.click(screen.getByRole('button', { name: 'Add Integration' }));
fireEvent.click(screen.getByRole('menuitem', { name: 'Amazon SQS' }));
await waitFor(() => {
expect(screen.getByText('← Back to list')).toBeInTheDocument();
expect(screen.getByLabelText('Name')).toHaveValue('Amazon SQS #1');
expect(screen.getByLabelText('Queue URL')).toBeInTheDocument();
expect(screen.getByText('Forward raw packets')).toBeInTheDocument();
});
});
it('backing out of a new draft does not create an integration', async () => {
renderSection();
await waitFor(() => {
@@ -672,6 +690,30 @@ describe('SettingsFanoutSection', () => {
);
});
it('sqs list shows queue url summary', async () => {
const config: FanoutConfig = {
id: 'sqs-1',
type: 'sqs',
name: 'Queue Feed',
enabled: true,
config: {
queue_url: 'https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events',
region_name: 'us-east-1',
},
scope: { messages: 'all', raw_packets: 'none' },
sort_order: 0,
created_at: 1000,
};
mockedApi.getFanoutConfigs.mockResolvedValue([config]);
renderSection();
await waitFor(() =>
expect(
screen.getByText('https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events')
).toBeInTheDocument()
);
});
it('groups integrations by type and sorts entries alphabetically within each group', async () => {
mockedApi.getFanoutConfigs.mockResolvedValue([
{

View File

@@ -15,6 +15,7 @@ dependencies = [
"meshcore==2.2.29",
"aiomqtt>=2.0",
"apprise>=1.9.7",
"boto3>=1.38.0",
]
[project.optional-dependencies]

View File

@@ -1,7 +1,7 @@
"""Tests for fanout bus: manager, scope matching, repository, and modules."""
import asyncio
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@@ -570,6 +570,121 @@ class TestWebhookValidation:
assert scope["messages"] == {"channels": ["ch1"], "contacts": "none"}
# ---------------------------------------------------------------------------
# SQS module unit tests
# ---------------------------------------------------------------------------
class TestSqsModule:
@pytest.mark.asyncio
async def test_status_disconnected_when_no_queue_url(self):
from app.fanout.sqs import SqsModule
with patch("app.fanout.sqs.boto3.client", return_value=MagicMock()):
mod = SqsModule("test", {"queue_url": ""})
await mod.start()
assert mod.status == "disconnected"
await mod.stop()
@pytest.mark.asyncio
async def test_status_connected_with_queue_url(self):
from app.fanout.sqs import SqsModule
with patch("app.fanout.sqs.boto3.client", return_value=MagicMock()):
mod = SqsModule(
"test",
{"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"},
)
await mod.start()
assert mod.status == "connected"
await mod.stop()
@pytest.mark.asyncio
async def test_sends_message_payload(self):
from app.fanout.sqs import SqsModule
mock_client = MagicMock()
with patch("app.fanout.sqs.boto3.client", return_value=mock_client):
mod = SqsModule(
"test",
{"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"},
)
await mod.start()
await mod.on_message(
{"id": 42, "type": "PRIV", "conversation_key": "pk1", "text": "hi"}
)
mock_client.send_message.assert_called_once()
kwargs = mock_client.send_message.call_args.kwargs
assert kwargs["QueueUrl"].endswith("/mesh-events")
assert kwargs["MessageAttributes"]["event_type"]["StringValue"] == "message"
assert '"event_type":"message"' in kwargs["MessageBody"]
assert '"text":"hi"' in kwargs["MessageBody"]
@pytest.mark.asyncio
async def test_fifo_queue_adds_group_and_dedup_ids(self):
from app.fanout.sqs import SqsModule
mock_client = MagicMock()
with patch("app.fanout.sqs.boto3.client", return_value=mock_client):
mod = SqsModule(
"test",
{"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events.fifo"},
)
await mod.start()
await mod.on_raw({"observation_id": "obs-123", "id": 7, "raw": "abcd"})
kwargs = mock_client.send_message.call_args.kwargs
assert kwargs["MessageGroupId"] == "raw-packets"
assert kwargs["MessageDeduplicationId"] == "raw-obs-123"
# ---------------------------------------------------------------------------
# SQS router validation tests
# ---------------------------------------------------------------------------
class TestSqsValidation:
def test_validate_sqs_config_requires_queue_url(self):
from fastapi import HTTPException
from app.routers.fanout import _validate_sqs_config
with pytest.raises(HTTPException) as exc_info:
_validate_sqs_config({"queue_url": ""})
assert exc_info.value.status_code == 400
assert "queue_url is required" in exc_info.value.detail
def test_validate_sqs_config_requires_static_keypair_together(self):
from fastapi import HTTPException
from app.routers.fanout import _validate_sqs_config
with pytest.raises(HTTPException) as exc_info:
_validate_sqs_config(
{
"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events",
"access_key_id": "AKIA...",
}
)
assert exc_info.value.status_code == 400
assert "must be set together" in exc_info.value.detail
def test_validate_sqs_config_accepts_minimal_valid_config(self):
from app.routers.fanout import _validate_sqs_config
_validate_sqs_config(
{"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"}
)
def test_enforce_scope_sqs_preserves_raw_packets_setting(self):
from app.routers.fanout import _enforce_scope
scope = _enforce_scope("sqs", {"messages": "all", "raw_packets": "all"})
assert scope["messages"] == "all"
assert scope["raw_packets"] == "all"
# ---------------------------------------------------------------------------
# Apprise module unit tests
# ---------------------------------------------------------------------------

72
uv.lock generated
View File

@@ -118,6 +118,34 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/99/fe/22aec895f040c1e457d6e6fcc79286fbb17d54602600ab2a58837bec7be1/bleak-2.1.1-py3-none-any.whl", hash = "sha256:61ac1925073b580c896a92a8c404088c5e5ec9dc3c5bd6fc17554a15779d83de", size = 141258 },
]
[[package]]
name = "boto3"
version = "1.42.66"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
{ name = "jmespath" },
{ name = "s3transfer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/0a/2e/67206daa5acb6053157ae5241421713a84ed6015d33d0781985bd5558898/boto3-1.42.66.tar.gz", hash = "sha256:3bec5300fb2429c3be8e8961fdb1f11e85195922c8a980022332c20af05616d5", size = 112805 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4c/09/83224363c3f5e468e298e48beb577ffe8cb51f18c2116bc1ecf404796e60/boto3-1.42.66-py3-none-any.whl", hash = "sha256:7c6c60dc5500e8a2967a306372a5fdb4c7f9a5b8adc5eb9aa2ebb5081c51ff47", size = 140557 },
]
[[package]]
name = "botocore"
version = "1.42.66"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jmespath" },
{ name = "python-dateutil" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/77/ef/1c8f89da69b0c3742120e19a6ea72ec46ac0596294466924fdd4cf0f36bb/botocore-1.42.66.tar.gz", hash = "sha256:39756a21142b646de552d798dde2105759b0b8fa0d881a34c26d15bd4c9448fa", size = 14977446 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/13/6f/7b45ed2ca300c1ad38ecfc82c1368546d4a90512d9dff589ebbd182a7317/botocore-1.42.66-py3-none-any.whl", hash = "sha256:ac48af1ab527dfa08c4617c387413ca56a7f87780d7bfc1da34ef847a59219a5", size = 14653886 },
]
[[package]]
name = "certifi"
version = "2026.1.4"
@@ -486,6 +514,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484 },
]
[[package]]
name = "jmespath"
version = "1.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d3/59/322338183ecda247fb5d1763a6cbe46eff7222eaeebafd9fa65d4bf5cb11/jmespath-1.1.0.tar.gz", hash = "sha256:472c87d80f36026ae83c6ddd0f1d05d4e510134ed462851fd5f754c8c3cbb88d", size = 27377 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/14/2f/967ba146e6d58cf6a652da73885f52fc68001525b4197effc174321d70b4/jmespath-1.1.0-py3-none-any.whl", hash = "sha256:a5663118de4908c91729bea0acadca56526eb2698e83de10cd116ae0f4e97c64", size = 20419 },
]
[[package]]
name = "markdown"
version = "3.10.2"
@@ -974,6 +1011,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ca/31/d4e37e9e550c2b92a9cbc2e4d0b7420a27224968580b5a447f420847c975/pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88", size = 46396 },
]
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "six" },
]
sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892 },
]
[[package]]
name = "python-dotenv"
version = "1.2.1"
@@ -1055,6 +1104,7 @@ dependencies = [
{ name = "aiomqtt" },
{ name = "aiosqlite" },
{ name = "apprise" },
{ name = "boto3" },
{ name = "fastapi" },
{ name = "httpx" },
{ name = "meshcore" },
@@ -1088,6 +1138,7 @@ requires-dist = [
{ name = "aiomqtt", specifier = ">=2.0" },
{ name = "aiosqlite", specifier = ">=0.19.0" },
{ name = "apprise", specifier = ">=1.9.7" },
{ name = "boto3", specifier = ">=1.38.0" },
{ name = "fastapi", specifier = ">=0.115.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "httpx", marker = "extra == 'test'", specifier = ">=0.27.0" },
@@ -1167,6 +1218,27 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c4/1c/1dbe51782c0e1e9cfce1d1004752672d2d4629ea46945d19d731ad772b3b/ruff-0.14.11-py3-none-win_arm64.whl", hash = "sha256:649fb6c9edd7f751db276ef42df1f3df41c38d67d199570ae2a7bd6cbc3590f0", size = 12938644 },
]
[[package]]
name = "s3transfer"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "botocore" },
]
sdist = { url = "https://files.pythonhosted.org/packages/05/04/74127fc843314818edfa81b5540e26dd537353b123a4edc563109d8f17dd/s3transfer-0.16.0.tar.gz", hash = "sha256:8e990f13268025792229cd52fa10cb7163744bf56e719e0b9cb925ab79abf920", size = 153827 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fc/51/727abb13f44c1fcf6d145979e1535a35794db0f6e450a0cb46aa24732fe2/s3transfer-0.16.0-py3-none-any.whl", hash = "sha256:18e25d66fed509e3868dc1572b3f427ff947dd2c56f844a5bf09481ad3f3b2fe", size = 86830 },
]
[[package]]
name = "six"
version = "1.17.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050 },
]
[[package]]
name = "starlette"
version = "0.50.0"