diff --git a/app/fanout/manager.py b/app/fanout/manager.py index bb5c38f..4eb7d73 100644 --- a/app/fanout/manager.py +++ b/app/fanout/manager.py @@ -20,10 +20,32 @@ def _register_module_types() -> None: from app.fanout.bot import BotModule from app.fanout.mqtt_community import MqttCommunityModule from app.fanout.mqtt_private import MqttPrivateModule + from app.fanout.webhook import WebhookModule _MODULE_TYPES["mqtt_private"] = MqttPrivateModule _MODULE_TYPES["mqtt_community"] = MqttCommunityModule _MODULE_TYPES["bot"] = BotModule + _MODULE_TYPES["webhook"] = WebhookModule + + +def _matches_filter(filter_value: Any, key: str) -> bool: + """Check a single filter value (channels or contacts) against a key. + + Supported shapes: + "all" -> True + "none" -> False + ["key1", "key2"] -> key in list (only listed) + {"except": ["key1", "key2"]} -> key not in list (all except listed) + """ + if filter_value == "all": + return True + if filter_value == "none": + return False + if isinstance(filter_value, list): + return key in filter_value + if isinstance(filter_value, dict) and "except" in filter_value: + return key not in filter_value["except"] + return False def _scope_matches_message(scope: dict, data: dict) -> bool: @@ -37,21 +59,9 @@ def _scope_matches_message(scope: dict, data: dict) -> bool: msg_type = data.get("type", "") conversation_key = data.get("conversation_key", "") if msg_type == "CHAN": - channels = messages.get("channels", "none") - if channels == "all": - return True - if channels == "none": - return False - if isinstance(channels, list): - return conversation_key in channels + return _matches_filter(messages.get("channels", "none"), conversation_key) elif msg_type == "PRIV": - contacts = messages.get("contacts", "none") - if contacts == "all": - return True - if contacts == "none": - return False - if isinstance(contacts, list): - return conversation_key in contacts + return _matches_filter(messages.get("contacts", "none"), conversation_key) return False diff --git a/app/fanout/webhook.py b/app/fanout/webhook.py new file mode 100644 index 0000000..4f9a798 --- /dev/null +++ b/app/fanout/webhook.py @@ -0,0 +1,79 @@ +"""Fanout module for webhook (HTTP POST) delivery.""" + +from __future__ import annotations + +import logging + +import httpx + +from app.fanout.base import FanoutModule + +logger = logging.getLogger(__name__) + + +class WebhookModule(FanoutModule): + """Delivers message data to an HTTP endpoint via POST (or configurable method).""" + + def __init__(self, config_id: str, config: dict) -> None: + super().__init__(config_id, config) + self._client: httpx.AsyncClient | None = None + self._last_error: str | None = None + + async def start(self) -> None: + self._client = httpx.AsyncClient(timeout=httpx.Timeout(10.0)) + self._last_error = None + + async def stop(self) -> None: + if self._client: + await self._client.aclose() + self._client = None + + async def on_message(self, data: dict) -> None: + await self._send(data, event_type="message") + + async def on_raw(self, data: dict) -> None: + await self._send(data, event_type="raw_packet") + + async def _send(self, data: dict, *, event_type: str) -> None: + if not self._client: + return + + url = self.config.get("url", "") + if not url: + return + + method = self.config.get("method", "POST").upper() + extra_headers = self.config.get("headers", {}) + secret = self.config.get("secret", "") + + headers = { + "Content-Type": "application/json", + "X-Webhook-Event": event_type, + **extra_headers, + } + if secret: + headers["X-Webhook-Secret"] = secret + + try: + resp = await self._client.request(method, url, json=data, headers=headers) + resp.raise_for_status() + self._last_error = None + except httpx.HTTPStatusError as exc: + self._last_error = f"HTTP {exc.response.status_code}" + logger.warning( + "Webhook %s returned %s for %s", + self.config_id, + exc.response.status_code, + url, + ) + except httpx.RequestError as exc: + self._last_error = str(exc) + logger.warning("Webhook %s request error: %s", self.config_id, exc) + + @property + def status(self) -> str: + if not self.config.get("url"): + return "disconnected" + if self._last_error: + return "error" + return "connected" diff --git a/app/routers/fanout.py b/app/routers/fanout.py index 3bf4bdc..f4bd01c 100644 --- a/app/routers/fanout.py +++ b/app/routers/fanout.py @@ -12,7 +12,7 @@ from app.repository.fanout import FanoutConfigRepository logger = logging.getLogger(__name__) router = APIRouter(prefix="/fanout", tags=["fanout"]) -_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot"} +_VALID_TYPES = {"mqtt_private", "mqtt_community", "bot", "webhook"} _IATA_RE = re.compile(r"^[A-Z]{3}$") @@ -65,12 +65,32 @@ def _validate_bot_config(config: dict) -> None: ) from None +def _validate_webhook_config(config: dict) -> None: + """Validate webhook config blob.""" + url = config.get("url", "") + if not url: + raise HTTPException(status_code=400, detail="url is required for webhook") + if not url.startswith(("http://", "https://")): + raise HTTPException(status_code=400, detail="url must start with http:// or https://") + method = config.get("method", "POST").upper() + if method not in ("POST", "PUT", "PATCH"): + raise HTTPException(status_code=400, detail="method must be POST, PUT, or PATCH") + headers = config.get("headers", {}) + if not isinstance(headers, dict): + raise HTTPException(status_code=400, detail="headers must be a JSON object") + + def _enforce_scope(config_type: str, scope: dict) -> dict: """Enforce type-specific scope constraints. Returns normalized scope.""" if config_type == "mqtt_community": return {"messages": "none", "raw_packets": "all"} if config_type == "bot": return {"messages": "all", "raw_packets": "none"} + if config_type == "webhook": + messages = scope.get("messages", "all") + if messages not in ("all", "none") and not isinstance(messages, dict): + messages = "all" + return {"messages": messages, "raw_packets": "none"} # For mqtt_private, validate scope values messages = scope.get("messages", "all") if messages not in ("all", "none") and not isinstance(messages, dict): @@ -108,6 +128,8 @@ async def create_fanout_config(body: FanoutConfigCreate) -> dict: _validate_mqtt_community_config(body.config) elif body.type == "bot": _validate_bot_config(body.config) + elif body.type == "webhook": + _validate_webhook_config(body.config) scope = _enforce_scope(body.type, body.scope) @@ -156,6 +178,8 @@ async def update_fanout_config(config_id: str, body: FanoutConfigUpdate) -> dict _validate_mqtt_community_config(config_to_validate) elif existing["type"] == "bot": _validate_bot_config(config_to_validate) + elif existing["type"] == "webhook": + _validate_webhook_config(config_to_validate) updated = await FanoutConfigRepository.update(config_id, **kwargs) if updated is None: diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx index 7d350dd..67cbbc4 100644 --- a/frontend/src/components/settings/SettingsFanoutSection.tsx +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -6,7 +6,7 @@ import { Separator } from '../ui/separator'; import { toast } from '../ui/sonner'; import { cn } from '@/lib/utils'; import { api } from '../../api'; -import type { FanoutConfig, HealthStatus } from '../../types'; +import type { Channel, Contact, FanoutConfig, HealthStatus } from '../../types'; const BotCodeEditor = lazy(() => import('../BotCodeEditor').then((m) => ({ default: m.BotCodeEditor })) @@ -16,12 +16,14 @@ const TYPE_LABELS: Record = { mqtt_private: 'Private MQTT', mqtt_community: 'Community MQTT', bot: 'Bot', + webhook: 'Webhook', }; const TYPE_OPTIONS = [ { value: 'mqtt_private', label: 'Private MQTT' }, { value: 'mqtt_community', label: 'Community MQTT' }, { value: 'bot', label: 'Bot' }, + { value: 'webhook', label: 'Webhook' }, ]; const DEFAULT_BOT_CODE = `def bot( @@ -62,18 +64,20 @@ const DEFAULT_BOT_CODE = `def bot( return "[BOT] Plong!" return None`; +function getStatusLabel(status: string | undefined, type?: string) { + if (status === 'connected') return type === 'bot' || type === 'webhook' ? 'Active' : 'Connected'; + if (status === 'error') return 'Error'; + if (status === 'disconnected') return 'Disconnected'; + return 'Inactive'; +} + function getStatusColor(status: string | undefined) { if (status === 'connected') return 'bg-status-connected shadow-[0_0_6px_hsl(var(--status-connected)/0.5)]'; + if (status === 'error') return 'bg-destructive shadow-[0_0_6px_hsl(var(--destructive)/0.5)]'; return 'bg-muted-foreground'; } -function getStatusLabel(status: string | undefined, type?: string) { - if (status === 'connected') return type === 'bot' ? 'Active' : 'Connected'; - if (status === 'disconnected') return 'Disconnected'; - return 'Inactive'; -} - function MqttPrivateConfigEditor({ config, scope, @@ -358,6 +362,364 @@ function BotConfigEditor({ ); } +type ScopeMode = 'all' | 'none' | 'only' | 'except'; + +function getScopeMode(value: unknown): ScopeMode { + if (value === 'all') return 'all'; + if (value === 'none') return 'none'; + if (typeof value === 'object' && value !== null) { + // Check if either channels or contacts uses the {except: [...]} shape + const obj = value as Record; + const ch = obj.channels; + const co = obj.contacts; + if ( + (typeof ch === 'object' && ch !== null && !Array.isArray(ch)) || + (typeof co === 'object' && co !== null && !Array.isArray(co)) + ) { + return 'except'; + } + return 'only'; + } + return 'all'; +} + +/** Extract the key list from a filter value, whether it's a plain list or {except: [...]} */ +function getFilterKeys(filter: unknown): string[] { + if (Array.isArray(filter)) return filter as string[]; + if (typeof filter === 'object' && filter !== null && 'except' in filter) + return ((filter as Record).except as string[]) ?? []; + return []; +} + +function ScopeSelector({ + scope, + onChange, +}: { + scope: Record; + onChange: (scope: Record) => void; +}) { + const [channels, setChannels] = useState([]); + const [contacts, setContacts] = useState([]); + + useEffect(() => { + api.getChannels().then(setChannels).catch(console.error); + + // Paginate to fetch all contacts (API caps at 1000 per request) + (async () => { + const all: Contact[] = []; + const pageSize = 1000; + let offset = 0; + + while (true) { + const page = await api.getContacts(pageSize, offset); + all.push(...page); + if (page.length < pageSize) break; + offset += pageSize; + } + setContacts(all); + })().catch(console.error); + }, []); + + const messages = scope.messages ?? 'all'; + const mode = getScopeMode(messages); + const isListMode = mode === 'only' || mode === 'except'; + + const selectedChannels: string[] = + isListMode && typeof messages === 'object' && messages !== null + ? getFilterKeys((messages as Record).channels) + : []; + const selectedContacts: string[] = + isListMode && typeof messages === 'object' && messages !== null + ? getFilterKeys((messages as Record).contacts) + : []; + + /** Wrap channel/contact key lists in the right shape for the current mode */ + const buildMessages = (chKeys: string[], coKeys: string[]) => { + if (mode === 'except') { + return { + channels: { except: chKeys }, + contacts: { except: coKeys }, + }; + } + return { channels: chKeys, contacts: coKeys }; + }; + + const handleModeChange = (newMode: ScopeMode) => { + if (newMode === 'all' || newMode === 'none') { + onChange({ ...scope, messages: newMode }); + } else if (newMode === 'only') { + onChange({ ...scope, messages: { channels: [], contacts: [] } }); + } else { + onChange({ + ...scope, + messages: { channels: { except: [] }, contacts: { except: [] } }, + }); + } + }; + + const toggleChannel = (key: string) => { + const current = [...selectedChannels]; + const idx = current.indexOf(key); + if (idx >= 0) current.splice(idx, 1); + else current.push(key); + onChange({ ...scope, messages: buildMessages(current, selectedContacts) }); + }; + + const toggleContact = (key: string) => { + const current = [...selectedContacts]; + const idx = current.indexOf(key); + if (idx >= 0) current.splice(idx, 1); + else current.push(key); + onChange({ ...scope, messages: buildMessages(selectedChannels, current) }); + }; + + // Non-repeater contacts only (type 0) + const filteredContacts = contacts.filter((c) => c.type === 0); + + const modeDescriptions: Record = { + all: 'All messages', + none: 'No messages', + only: 'Only listed channels/contacts', + except: 'All except listed channels/contacts', + }; + + // For "except" mode, checked means the item is in the exclusion list (will be excluded) + const isChannelChecked = (key: string) => + mode === 'except' ? selectedChannels.includes(key) : selectedChannels.includes(key); + const isContactChecked = (key: string) => + mode === 'except' ? selectedContacts.includes(key) : selectedContacts.includes(key); + + const listHint = + mode === 'only' + ? 'Newly added channels or contacts will not be automatically included.' + : 'Newly added channels or contacts will be automatically included unless excluded here.'; + + const checkboxLabel = mode === 'except' ? 'exclude' : 'include'; + + return ( +
+ +
+ {(['all', 'none', 'only', 'except'] as const).map((m) => ( + + ))} +
+ + {isListMode && ( + <> +

{listHint}

+ + {channels.length > 0 && ( +
+
+ + + + / + + +
+
+ {channels.map((ch) => ( + + ))} +
+
+ )} + + {filteredContacts.length > 0 && ( +
+
+ + + + / + + +
+
+ {filteredContacts.map((c) => ( + + ))} +
+
+ )} + + )} +
+ ); +} + +function WebhookConfigEditor({ + config, + scope, + onChange, + onScopeChange, +}: { + config: Record; + scope: Record; + onChange: (config: Record) => void; + onScopeChange: (scope: Record) => void; +}) { + const headersStr = JSON.stringify(config.headers ?? {}, null, 2); + const [headersText, setHeadersText] = useState(headersStr); + const [headersError, setHeadersError] = useState(null); + + const handleHeadersChange = (text: string) => { + setHeadersText(text); + try { + const parsed = JSON.parse(text); + if (typeof parsed !== 'object' || Array.isArray(parsed)) { + setHeadersError('Must be a JSON object'); + return; + } + setHeadersError(null); + onChange({ ...config, headers: parsed }); + } catch { + setHeadersError('Invalid JSON'); + } + }; + + return ( +
+

+ Send message data as JSON to an HTTP endpoint when messages are received. +

+ +
+ + onChange({ ...config, url: e.target.value })} + /> +
+ +
+
+ + +
+ +
+ + onChange({ ...config, secret: e.target.value })} + /> +
+
+ +
+ +