diff --git a/AGENTS.md b/AGENTS.md index a72a1fd..90be2d7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -21,7 +21,7 @@ A web interface for MeshCore mesh radio networks. The backend connects to a Mesh - `frontend/AGENTS.md` - Frontend (React, state management, WebSocket, components) Ancillary AGENTS.md files which should generally not be reviewed unless specific work is being performed on those features include: -- `app/fanout/AGENTS_fanout.md` - Fanout bus architecture (MQTT, bots, webhooks, Apprise) +- `app/fanout/AGENTS_fanout.md` - Fanout bus architecture (MQTT, bots, webhooks, Apprise, SQS) - `frontend/src/components/AGENTS_packet_visualizer.md` - Packet visualizer (force-directed graph, advert-path identity, layout engine) ## Architecture Overview @@ -77,7 +77,7 @@ Ancillary AGENTS.md files which should generally not be reviewed unless specific - Raw packet feed — a debug/observation tool ("radio aquarium"); interesting to watch or copy packets from, but not critical infrastructure - Map view — visual display of node locations from advertisements - Network visualizer — force-directed graph of mesh topology -- Fanout integrations (MQTT, bots, webhooks, Apprise) — see `app/fanout/AGENTS_fanout.md` +- Fanout integrations (MQTT, bots, webhooks, Apprise, SQS) — see `app/fanout/AGENTS_fanout.md` - Read state tracking / mark-all-read — convenience feature for unread badges; no need for transactional atomicity or race-condition hardening ## Error Handling Philosophy @@ -181,7 +181,7 @@ This message-layer echo/path handling is independent of raw-packet storage dedup │ ├── event_handlers.py # Radio events │ ├── decoder.py # Packet decryption │ ├── websocket.py # Real-time broadcasts -│ └── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise (see fanout/AGENTS_fanout.md) +│ └── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise, SQS (see fanout/AGENTS_fanout.md) ├── frontend/ # React frontend │ ├── AGENTS.md # Frontend documentation │ ├── src/ @@ -393,7 +393,7 @@ Read state (`last_read_at`) is tracked **server-side** for consistency across de **Note:** These are NOT the same as `Message.conversation_key` (the database field). -### Fanout Bus (MQTT, Bots, Webhooks, Apprise) +### Fanout Bus (MQTT, Bots, Webhooks, Apprise, SQS) All external integrations are managed through the fanout bus (`app/fanout/`). Each integration is a `FanoutModule` with scope-based event filtering, stored in the `fanout_configs` table and managed via `GET/POST/PATCH/DELETE /api/fanout`. @@ -446,7 +446,7 @@ mc.subscribe(EventType.ACK, handler) | `MESHCORE_BASIC_AUTH_USERNAME` | *(none)* | Optional app-wide HTTP Basic auth username; must be set together with `MESHCORE_BASIC_AUTH_PASSWORD` | | `MESHCORE_BASIC_AUTH_PASSWORD` | *(none)* | Optional app-wide HTTP Basic auth password; must be set together with `MESHCORE_BASIC_AUTH_USERNAME` | -**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `flood_scope`, `blocked_keys`, and `blocked_names`. `max_radio_contacts` is the configured radio contact capacity baseline used by background maintenance: favorites reload first, non-favorite fill targets about 80% of that value, and full offload/reload triggers around 95% occupancy. They are configured via `GET/PATCH /api/settings`. MQTT, bot, webhook, and Apprise configs are stored in the `fanout_configs` table, managed via `/api/fanout`. +**Note:** Runtime app settings are stored in the database (`app_settings` table), not environment variables. These include `max_radio_contacts`, `auto_decrypt_dm_on_advert`, `sidebar_sort_order`, `advert_interval`, `last_advert_time`, `favorites`, `last_message_times`, `flood_scope`, `blocked_keys`, and `blocked_names`. `max_radio_contacts` is the configured radio contact capacity baseline used by background maintenance: favorites reload first, non-favorite fill targets about 80% of that value, and full offload/reload triggers around 95% occupancy. They are configured via `GET/PATCH /api/settings`. MQTT, bot, webhook, Apprise, and SQS configs are stored in the `fanout_configs` table, managed via `/api/fanout`. Byte-perfect channel retries are user-triggered via `POST /api/messages/channel/{message_id}/resend` and are allowed for 30 seconds after the original send. diff --git a/app/AGENTS.md b/app/AGENTS.md index ebd9310..57d50a0 100644 --- a/app/AGENTS.md +++ b/app/AGENTS.md @@ -45,7 +45,7 @@ app/ ├── events.py # Typed WS event payload serialization ├── websocket.py # WS manager + broadcast helpers ├── security.py # Optional app-wide HTTP Basic auth middleware for HTTP + WS -├── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise (see fanout/AGENTS_fanout.md) +├── fanout/ # Fanout bus: MQTT, bots, webhooks, Apprise, SQS (see fanout/AGENTS_fanout.md) ├── dependencies.py # Shared FastAPI dependency providers ├── path_utils.py # Path hex rendering and hop-width helpers ├── keystore.py # Ephemeral private/public key storage for DM decryption @@ -132,7 +132,7 @@ app/ ### Fanout bus -- All external integrations (MQTT, bots, webhooks, Apprise) are managed through the fanout bus (`app/fanout/`). +- All external integrations (MQTT, bots, webhooks, Apprise, SQS) are managed through the fanout bus (`app/fanout/`). - Configs stored in `fanout_configs` table, managed via `GET/POST/PATCH/DELETE /api/fanout`. - `broadcast_event()` in `websocket.py` dispatches to the fanout manager for `message` and `raw_packet` events. - Each integration is a `FanoutModule` with scope-based filtering. diff --git a/app/fanout/AGENTS_fanout.md b/app/fanout/AGENTS_fanout.md index 2567c75..2a1df05 100644 --- a/app/fanout/AGENTS_fanout.md +++ b/app/fanout/AGENTS_fanout.md @@ -79,6 +79,14 @@ Push notifications via Apprise library. Config blob: - `preserve_identity` — suppress Discord webhook name/avatar override - `include_path` — include routing path in notification body +### sqs (sqs.py) +Amazon SQS delivery. Config blob: +- `queue_url` — target queue URL +- `region_name` (optional), `endpoint_url` (optional) +- `access_key_id`, `secret_access_key`, `session_token` (all optional; blank uses the normal AWS credential chain) +- Publishes a JSON envelope of the form `{"event_type":"message"|"raw_packet","data":...}` +- Supports both decoded messages and raw packets via normal scope selection + ## Adding a New Integration Type ### Step-by-step checklist @@ -132,7 +140,7 @@ Three changes needed: **a)** Add to `_VALID_TYPES` set: ```python -_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "my_type"} +_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs", "my_type"} ``` **b)** Add a validation function: @@ -280,6 +288,7 @@ Migrations: - `app/fanout/bot_exec.py` — Bot code execution, response processing, rate limiting - `app/fanout/webhook.py` — Webhook fanout module - `app/fanout/apprise_mod.py` — Apprise fanout module +- `app/fanout/sqs.py` — Amazon SQS fanout module - `app/repository/fanout.py` — Database CRUD - `app/routers/fanout.py` — REST API - `app/websocket.py` — `broadcast_event()` dispatches to fanout diff --git a/app/fanout/manager.py b/app/fanout/manager.py index 665179a..b393a89 100644 --- a/app/fanout/manager.py +++ b/app/fanout/manager.py @@ -23,6 +23,7 @@ def _register_module_types() -> None: from app.fanout.bot import BotModule from app.fanout.mqtt_community import MqttCommunityModule from app.fanout.mqtt_private import MqttPrivateModule + from app.fanout.sqs import SqsModule from app.fanout.webhook import WebhookModule _MODULE_TYPES["mqtt_private"] = MqttPrivateModule @@ -30,6 +31,7 @@ def _register_module_types() -> None: _MODULE_TYPES["bot"] = BotModule _MODULE_TYPES["webhook"] = WebhookModule _MODULE_TYPES["apprise"] = AppriseModule + _MODULE_TYPES["sqs"] = SqsModule def _matches_filter(filter_value: Any, key: str) -> bool: diff --git a/app/fanout/sqs.py b/app/fanout/sqs.py new file mode 100644 index 0000000..fbf8a65 --- /dev/null +++ b/app/fanout/sqs.py @@ -0,0 +1,142 @@ +"""Fanout module for Amazon SQS delivery.""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import logging +from functools import partial + +import boto3 +from botocore.exceptions import BotoCoreError, ClientError + +from app.fanout.base import FanoutModule + +logger = logging.getLogger(__name__) + + +def _build_payload(data: dict, *, event_type: str) -> str: + """Serialize a fanout event into a stable JSON envelope.""" + return json.dumps( + { + "event_type": event_type, + "data": data, + }, + separators=(",", ":"), + sort_keys=True, + ) + + +def _is_fifo_queue(queue_url: str) -> bool: + """Return True when the configured queue URL points at an SQS FIFO queue.""" + return queue_url.rstrip("/").endswith(".fifo") + + +def _build_message_group_id(data: dict, *, event_type: str) -> str: + """Choose a stable FIFO group ID from the event identity.""" + if event_type == "message": + conversation_key = str(data.get("conversation_key", "")).strip() + if conversation_key: + return f"message-{conversation_key}" + return "message-default" + return "raw-packets" + + +def _build_message_deduplication_id(data: dict, *, event_type: str, body: str) -> str: + """Choose a deterministic deduplication ID for FIFO queues.""" + if event_type == "message": + message_id = data.get("id") + if isinstance(message_id, int): + return f"message-{message_id}" + else: + observation_id = data.get("observation_id") + if isinstance(observation_id, str) and observation_id.strip(): + return f"raw-{observation_id}" + packet_id = data.get("id") + if isinstance(packet_id, int): + return f"raw-{packet_id}" + return hashlib.sha256(body.encode()).hexdigest() + + +class SqsModule(FanoutModule): + """Delivers message and raw-packet events to an Amazon SQS queue.""" + + def __init__(self, config_id: str, config: dict, *, name: str = "") -> None: + super().__init__(config_id, config, name=name) + self._client = None + self._last_error: str | None = None + + async def start(self) -> None: + kwargs: dict[str, str] = {} + region_name = str(self.config.get("region_name", "")).strip() + endpoint_url = str(self.config.get("endpoint_url", "")).strip() + access_key_id = str(self.config.get("access_key_id", "")).strip() + secret_access_key = str(self.config.get("secret_access_key", "")).strip() + session_token = str(self.config.get("session_token", "")).strip() + + if region_name: + kwargs["region_name"] = region_name + if endpoint_url: + kwargs["endpoint_url"] = endpoint_url + if access_key_id and secret_access_key: + kwargs["aws_access_key_id"] = access_key_id + kwargs["aws_secret_access_key"] = secret_access_key + if session_token: + kwargs["aws_session_token"] = session_token + + self._client = boto3.client("sqs", **kwargs) + self._last_error = None + + async def stop(self) -> None: + self._client = None + + async def on_message(self, data: dict) -> None: + await self._send(data, event_type="message") + + async def on_raw(self, data: dict) -> None: + await self._send(data, event_type="raw_packet") + + async def _send(self, data: dict, *, event_type: str) -> None: + if self._client is None: + return + + queue_url = str(self.config.get("queue_url", "")).strip() + if not queue_url: + return + + body = _build_payload(data, event_type=event_type) + request_kwargs: dict[str, object] = { + "QueueUrl": queue_url, + "MessageBody": body, + "MessageAttributes": { + "event_type": { + "DataType": "String", + "StringValue": event_type, + } + }, + } + + if _is_fifo_queue(queue_url): + request_kwargs["MessageGroupId"] = _build_message_group_id(data, event_type=event_type) + request_kwargs["MessageDeduplicationId"] = _build_message_deduplication_id( + data, event_type=event_type, body=body + ) + + try: + await asyncio.to_thread(partial(self._client.send_message, **request_kwargs)) + self._last_error = None + except (ClientError, BotoCoreError) as exc: + self._last_error = str(exc) + logger.warning("SQS %s send error: %s", self.config_id, exc) + except Exception as exc: + self._last_error = str(exc) + logger.exception("Unexpected SQS send error for %s", self.config_id) + + @property + def status(self) -> str: + if not str(self.config.get("queue_url", "")).strip(): + return "disconnected" + if self._last_error: + return "error" + return "connected" diff --git a/app/models.py b/app/models.py index b4a052a..f1145ef 100644 --- a/app/models.py +++ b/app/models.py @@ -565,7 +565,7 @@ class FanoutConfig(BaseModel): """Configuration for a single fanout integration.""" id: str - type: str # 'mqtt_private' | 'mqtt_community' | 'bot' | 'webhook' | 'apprise' + type: str # 'mqtt_private' | 'mqtt_community' | 'bot' | 'webhook' | 'apprise' | 'sqs' name: str enabled: bool config: dict diff --git a/app/routers/fanout.py b/app/routers/fanout.py index 0c1784e..76349e9 100644 --- a/app/routers/fanout.py +++ b/app/routers/fanout.py @@ -13,7 +13,7 @@ from app.repository.fanout import FanoutConfigRepository logger = logging.getLogger(__name__) router = APIRouter(prefix="/fanout", tags=["fanout"]) -_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise"} +_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook", "apprise", "sqs"} _IATA_RE = re.compile(r"^[A-Z]{3}$") _DEFAULT_COMMUNITY_MQTT_TOPIC_TEMPLATE = "meshcore/{IATA}/{PUBLIC_KEY}/packets" @@ -179,6 +179,39 @@ def _validate_webhook_config(config: dict) -> None: raise HTTPException(status_code=400, detail="headers must be a JSON object") +def _validate_sqs_config(config: dict) -> None: + """Validate sqs config blob.""" + queue_url = str(config.get("queue_url", "")).strip() + if not queue_url: + raise HTTPException(status_code=400, detail="queue_url is required for sqs") + if not queue_url.startswith(("https://", "http://")): + raise HTTPException(status_code=400, detail="queue_url must start with http:// or https://") + + endpoint_url = str(config.get("endpoint_url", "")).strip() + if endpoint_url and not endpoint_url.startswith(("https://", "http://")): + raise HTTPException( + status_code=400, + detail="endpoint_url must start with http:// or https://", + ) + + access_key_id = str(config.get("access_key_id", "")).strip() + secret_access_key = str(config.get("secret_access_key", "")).strip() + session_token = str(config.get("session_token", "")).strip() + has_static_keypair = bool(access_key_id) and bool(secret_access_key) + has_partial_keypair = bool(access_key_id) != bool(secret_access_key) + + if has_partial_keypair: + raise HTTPException( + status_code=400, + detail="access_key_id and secret_access_key must be set together for sqs", + ) + if session_token and not has_static_keypair: + raise HTTPException( + status_code=400, + detail="session_token requires access_key_id and secret_access_key for sqs", + ) + + def _enforce_scope(config_type: str, scope: dict) -> dict: """Enforce type-specific scope constraints. Returns normalized scope.""" if config_type == "mqtt_community": @@ -193,7 +226,7 @@ def _enforce_scope(config_type: str, scope: dict) -> dict: detail="scope.messages must be 'all', 'none', or a filter object", ) return {"messages": messages, "raw_packets": "none"} - # For mqtt_private, validate scope values + # For mqtt_private and sqs, validate scope values messages = scope.get("messages", "all") if messages not in ("all", "none") and not isinstance(messages, dict): raise HTTPException( @@ -240,6 +273,8 @@ async def create_fanout_config(body: FanoutConfigCreate) -> dict: _validate_webhook_config(body.config) elif body.type == "apprise": _validate_apprise_config(body.config) + elif body.type == "sqs": + _validate_sqs_config(body.config) scope = _enforce_scope(body.type, body.scope) @@ -295,6 +330,8 @@ async def update_fanout_config(config_id: str, body: FanoutConfigUpdate) -> dict _validate_webhook_config(config_to_validate) elif existing["type"] == "apprise": _validate_apprise_config(config_to_validate) + elif existing["type"] == "sqs": + _validate_sqs_config(config_to_validate) updated = await FanoutConfigRepository.update(config_id, **kwargs) if updated is None: diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx index d64af08..3106ccd 100644 --- a/frontend/src/components/settings/SettingsFanoutSection.tsx +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -18,6 +18,7 @@ const TYPE_LABELS: Record = { bot: 'Bot', webhook: 'Webhook', apprise: 'Apprise', + sqs: 'Amazon SQS', }; const TYPE_OPTIONS = [ @@ -26,6 +27,7 @@ const TYPE_OPTIONS = [ { value: 'bot', label: 'Bot' }, { value: 'webhook', label: 'Webhook' }, { value: 'apprise', label: 'Apprise' }, + { value: 'sqs', label: 'Amazon SQS' }, ]; const DEFAULT_COMMUNITY_PACKET_TOPIC_TEMPLATE = 'meshcore/{IATA}/{PUBLIC_KEY}/packets'; @@ -60,6 +62,12 @@ function formatAppriseTargets(urls: string | undefined, maxLength = 80) { return `${joined.slice(0, maxLength - 3)}...`; } +function formatSqsQueueSummary(config: Record) { + const queueUrl = ((config.queue_url as string) || '').trim(); + if (!queueUrl) return 'No queue configured'; + return queueUrl; +} + function getDefaultIntegrationName(type: string, configs: FanoutConfig[]) { const label = TYPE_LABELS[type] || type; const nextIndex = configs.filter((cfg) => cfg.type === type).length + 1; @@ -998,6 +1006,111 @@ function WebhookConfigEditor({ ); } +function SqsConfigEditor({ + config, + scope, + onChange, + onScopeChange, +}: { + config: Record; + scope: Record; + onChange: (config: Record) => void; + onScopeChange: (scope: Record) => void; +}) { + return ( +
+

+ Send matched mesh events to an Amazon SQS queue for durable processing by workers, Lambdas, + or downstream automation. +

+ +
+ Outgoing messages and any selected raw packets will be delivered exactly as forwarded by the + fanout scope, including decrypted/plaintext message content. +
+ +
+ + onChange({ ...config, queue_url: e.target.value })} + /> +
+ +
+
+ + onChange({ ...config, region_name: e.target.value })} + /> +
+
+ + onChange({ ...config, endpoint_url: e.target.value })} + /> +

Useful for LocalStack or custom endpoints

+
+
+ + + +
+ +

+ Leave blank to use the server's normal AWS credential chain. +

+
+ +
+
+ + onChange({ ...config, access_key_id: e.target.value })} + /> +
+
+ + onChange({ ...config, secret_access_key: e.target.value })} + /> +
+
+ +
+ + onChange({ ...config, session_token: e.target.value })} + /> +
+ + + + +
+ ); +} + export function SettingsFanoutSection({ health, onHealthRefresh, @@ -1208,6 +1321,14 @@ export function SettingsFanoutSection({ preserve_identity: true, include_path: true, }, + sqs: { + queue_url: '', + region_name: '', + endpoint_url: '', + access_key_id: '', + secret_access_key: '', + session_token: '', + }, }; const defaultScopes: Record> = { mqtt_private: { messages: 'all', raw_packets: 'all' }, @@ -1215,6 +1336,7 @@ export function SettingsFanoutSection({ bot: { messages: 'all', raw_packets: 'none' }, webhook: { messages: 'all', raw_packets: 'none' }, apprise: { messages: 'all', raw_packets: 'none' }, + sqs: { messages: 'all', raw_packets: 'none' }, }; setAddMenuOpen(false); setEditingId(null); @@ -1296,6 +1418,15 @@ export function SettingsFanoutSection({ /> )} + {detailType === 'sqs' && ( + + )} +
@@ -1520,6 +1651,17 @@ export function SettingsFanoutSection({
)} + + {cfg.type === 'sqs' && ( +
+
+ Queue:{' '} + + {formatSqsQueueSummary(cfg.config as Record)} + +
+
+ )} ); })} diff --git a/frontend/src/test/fanoutSection.test.tsx b/frontend/src/test/fanoutSection.test.tsx index 2195453..8aa6dec 100644 --- a/frontend/src/test/fanoutSection.test.tsx +++ b/frontend/src/test/fanoutSection.test.tsx @@ -90,6 +90,7 @@ describe('SettingsFanoutSection', () => { ).toBeInTheDocument(); expect(screen.getByRole('menuitem', { name: 'Webhook' })).toBeInTheDocument(); expect(screen.getByRole('menuitem', { name: 'Apprise' })).toBeInTheDocument(); + expect(screen.getByRole('menuitem', { name: 'Amazon SQS' })).toBeInTheDocument(); expect(screen.getByRole('menuitem', { name: 'Bot' })).toBeInTheDocument(); }); @@ -309,6 +310,23 @@ describe('SettingsFanoutSection', () => { expect(mockedApi.createFanoutConfig).not.toHaveBeenCalled(); }); + it('new SQS draft shows queue url fields and sensible defaults', async () => { + renderSection(); + await waitFor(() => { + expect(screen.getByRole('button', { name: 'Add Integration' })).toBeInTheDocument(); + }); + + fireEvent.click(screen.getByRole('button', { name: 'Add Integration' })); + fireEvent.click(screen.getByRole('menuitem', { name: 'Amazon SQS' })); + + await waitFor(() => { + expect(screen.getByText('← Back to list')).toBeInTheDocument(); + expect(screen.getByLabelText('Name')).toHaveValue('Amazon SQS #1'); + expect(screen.getByLabelText('Queue URL')).toBeInTheDocument(); + expect(screen.getByText('Forward raw packets')).toBeInTheDocument(); + }); + }); + it('backing out of a new draft does not create an integration', async () => { renderSection(); await waitFor(() => { @@ -672,6 +690,30 @@ describe('SettingsFanoutSection', () => { ); }); + it('sqs list shows queue url summary', async () => { + const config: FanoutConfig = { + id: 'sqs-1', + type: 'sqs', + name: 'Queue Feed', + enabled: true, + config: { + queue_url: 'https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events', + region_name: 'us-east-1', + }, + scope: { messages: 'all', raw_packets: 'none' }, + sort_order: 0, + created_at: 1000, + }; + mockedApi.getFanoutConfigs.mockResolvedValue([config]); + renderSection(); + + await waitFor(() => + expect( + screen.getByText('https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events') + ).toBeInTheDocument() + ); + }); + it('groups integrations by type and sorts entries alphabetically within each group', async () => { mockedApi.getFanoutConfigs.mockResolvedValue([ { diff --git a/pyproject.toml b/pyproject.toml index 7993aaa..636daba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "meshcore==2.2.29", "aiomqtt>=2.0", "apprise>=1.9.7", + "boto3>=1.38.0", ] [project.optional-dependencies] diff --git a/tests/test_fanout.py b/tests/test_fanout.py index 9f07ffe..28f78e3 100644 --- a/tests/test_fanout.py +++ b/tests/test_fanout.py @@ -1,7 +1,7 @@ """Tests for fanout bus: manager, scope matching, repository, and modules.""" import asyncio -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -570,6 +570,121 @@ class TestWebhookValidation: assert scope["messages"] == {"channels": ["ch1"], "contacts": "none"} +# --------------------------------------------------------------------------- +# SQS module unit tests +# --------------------------------------------------------------------------- + + +class TestSqsModule: + @pytest.mark.asyncio + async def test_status_disconnected_when_no_queue_url(self): + from app.fanout.sqs import SqsModule + + with patch("app.fanout.sqs.boto3.client", return_value=MagicMock()): + mod = SqsModule("test", {"queue_url": ""}) + await mod.start() + assert mod.status == "disconnected" + await mod.stop() + + @pytest.mark.asyncio + async def test_status_connected_with_queue_url(self): + from app.fanout.sqs import SqsModule + + with patch("app.fanout.sqs.boto3.client", return_value=MagicMock()): + mod = SqsModule( + "test", + {"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"}, + ) + await mod.start() + assert mod.status == "connected" + await mod.stop() + + @pytest.mark.asyncio + async def test_sends_message_payload(self): + from app.fanout.sqs import SqsModule + + mock_client = MagicMock() + with patch("app.fanout.sqs.boto3.client", return_value=mock_client): + mod = SqsModule( + "test", + {"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"}, + ) + await mod.start() + await mod.on_message( + {"id": 42, "type": "PRIV", "conversation_key": "pk1", "text": "hi"} + ) + + mock_client.send_message.assert_called_once() + kwargs = mock_client.send_message.call_args.kwargs + assert kwargs["QueueUrl"].endswith("/mesh-events") + assert kwargs["MessageAttributes"]["event_type"]["StringValue"] == "message" + assert '"event_type":"message"' in kwargs["MessageBody"] + assert '"text":"hi"' in kwargs["MessageBody"] + + @pytest.mark.asyncio + async def test_fifo_queue_adds_group_and_dedup_ids(self): + from app.fanout.sqs import SqsModule + + mock_client = MagicMock() + with patch("app.fanout.sqs.boto3.client", return_value=mock_client): + mod = SqsModule( + "test", + {"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events.fifo"}, + ) + await mod.start() + await mod.on_raw({"observation_id": "obs-123", "id": 7, "raw": "abcd"}) + + kwargs = mock_client.send_message.call_args.kwargs + assert kwargs["MessageGroupId"] == "raw-packets" + assert kwargs["MessageDeduplicationId"] == "raw-obs-123" + + +# --------------------------------------------------------------------------- +# SQS router validation tests +# --------------------------------------------------------------------------- + + +class TestSqsValidation: + def test_validate_sqs_config_requires_queue_url(self): + from fastapi import HTTPException + + from app.routers.fanout import _validate_sqs_config + + with pytest.raises(HTTPException) as exc_info: + _validate_sqs_config({"queue_url": ""}) + assert exc_info.value.status_code == 400 + assert "queue_url is required" in exc_info.value.detail + + def test_validate_sqs_config_requires_static_keypair_together(self): + from fastapi import HTTPException + + from app.routers.fanout import _validate_sqs_config + + with pytest.raises(HTTPException) as exc_info: + _validate_sqs_config( + { + "queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events", + "access_key_id": "AKIA...", + } + ) + assert exc_info.value.status_code == 400 + assert "must be set together" in exc_info.value.detail + + def test_validate_sqs_config_accepts_minimal_valid_config(self): + from app.routers.fanout import _validate_sqs_config + + _validate_sqs_config( + {"queue_url": "https://sqs.us-east-1.amazonaws.com/123456789012/mesh-events"} + ) + + def test_enforce_scope_sqs_preserves_raw_packets_setting(self): + from app.routers.fanout import _enforce_scope + + scope = _enforce_scope("sqs", {"messages": "all", "raw_packets": "all"}) + assert scope["messages"] == "all" + assert scope["raw_packets"] == "all" + + # --------------------------------------------------------------------------- # Apprise module unit tests # --------------------------------------------------------------------------- diff --git a/uv.lock b/uv.lock index a77e777..5cb36d5 100644 --- a/uv.lock +++ b/uv.lock @@ -118,6 +118,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/99/fe/22aec895f040c1e457d6e6fcc79286fbb17d54602600ab2a58837bec7be1/bleak-2.1.1-py3-none-any.whl", hash = "sha256:61ac1925073b580c896a92a8c404088c5e5ec9dc3c5bd6fc17554a15779d83de", size = 141258 }, ] +[[package]] +name = "boto3" +version = "1.42.66" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, + { name = "jmespath" }, + { name = "s3transfer" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/0a/2e/67206daa5acb6053157ae5241421713a84ed6015d33d0781985bd5558898/boto3-1.42.66.tar.gz", hash = "sha256:3bec5300fb2429c3be8e8961fdb1f11e85195922c8a980022332c20af05616d5", size = 112805 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4c/09/83224363c3f5e468e298e48beb577ffe8cb51f18c2116bc1ecf404796e60/boto3-1.42.66-py3-none-any.whl", hash = "sha256:7c6c60dc5500e8a2967a306372a5fdb4c7f9a5b8adc5eb9aa2ebb5081c51ff47", size = 140557 }, +] + +[[package]] +name = "botocore" +version = "1.42.66" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "jmespath" }, + { name = "python-dateutil" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/77/ef/1c8f89da69b0c3742120e19a6ea72ec46ac0596294466924fdd4cf0f36bb/botocore-1.42.66.tar.gz", hash = "sha256:39756a21142b646de552d798dde2105759b0b8fa0d881a34c26d15bd4c9448fa", size = 14977446 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/13/6f/7b45ed2ca300c1ad38ecfc82c1368546d4a90512d9dff589ebbd182a7317/botocore-1.42.66-py3-none-any.whl", hash = "sha256:ac48af1ab527dfa08c4617c387413ca56a7f87780d7bfc1da34ef847a59219a5", size = 14653886 }, +] + [[package]] name = "certifi" version = "2026.1.4" @@ -486,6 +514,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484 }, ] +[[package]] +name = "jmespath" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d3/59/322338183ecda247fb5d1763a6cbe46eff7222eaeebafd9fa65d4bf5cb11/jmespath-1.1.0.tar.gz", hash = "sha256:472c87d80f36026ae83c6ddd0f1d05d4e510134ed462851fd5f754c8c3cbb88d", size = 27377 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/14/2f/967ba146e6d58cf6a652da73885f52fc68001525b4197effc174321d70b4/jmespath-1.1.0-py3-none-any.whl", hash = "sha256:a5663118de4908c91729bea0acadca56526eb2698e83de10cd116ae0f4e97c64", size = 20419 }, +] + [[package]] name = "markdown" version = "3.10.2" @@ -974,6 +1011,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ca/31/d4e37e9e550c2b92a9cbc2e4d0b7420a27224968580b5a447f420847c975/pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88", size = 46396 }, ] +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "six" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/66/c0/0c8b6ad9f17a802ee498c46e004a0eb49bc148f2fd230864601a86dcf6db/python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3", size = 342432 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427", size = 229892 }, +] + [[package]] name = "python-dotenv" version = "1.2.1" @@ -1055,6 +1104,7 @@ dependencies = [ { name = "aiomqtt" }, { name = "aiosqlite" }, { name = "apprise" }, + { name = "boto3" }, { name = "fastapi" }, { name = "httpx" }, { name = "meshcore" }, @@ -1088,6 +1138,7 @@ requires-dist = [ { name = "aiomqtt", specifier = ">=2.0" }, { name = "aiosqlite", specifier = ">=0.19.0" }, { name = "apprise", specifier = ">=1.9.7" }, + { name = "boto3", specifier = ">=1.38.0" }, { name = "fastapi", specifier = ">=0.115.0" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", marker = "extra == 'test'", specifier = ">=0.27.0" }, @@ -1167,6 +1218,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c4/1c/1dbe51782c0e1e9cfce1d1004752672d2d4629ea46945d19d731ad772b3b/ruff-0.14.11-py3-none-win_arm64.whl", hash = "sha256:649fb6c9edd7f751db276ef42df1f3df41c38d67d199570ae2a7bd6cbc3590f0", size = 12938644 }, ] +[[package]] +name = "s3transfer" +version = "0.16.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "botocore" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/05/04/74127fc843314818edfa81b5540e26dd537353b123a4edc563109d8f17dd/s3transfer-0.16.0.tar.gz", hash = "sha256:8e990f13268025792229cd52fa10cb7163744bf56e719e0b9cb925ab79abf920", size = 153827 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fc/51/727abb13f44c1fcf6d145979e1535a35794db0f6e450a0cb46aa24732fe2/s3transfer-0.16.0-py3-none-any.whl", hash = "sha256:18e25d66fed509e3868dc1572b3f427ff947dd2c56f844a5bf09481ad3f3b2fe", size = 86830 }, +] + +[[package]] +name = "six" +version = "1.17.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/94/e7/b2c673351809dca68a0e064b6af791aa332cf192da575fd474ed7d6f16a2/six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81", size = 34031 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050 }, +] + [[package]] name = "starlette" version = "0.50.0"