mirror of
https://github.com/jkingsman/Remote-Terminal-for-MeshCore.git
synced 2026-03-28 17:43:05 +01:00
85 lines
2.6 KiB
Python
85 lines
2.6 KiB
Python
"""Fanout module for webhook (HTTP POST) delivery."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import logging
|
|
|
|
import httpx
|
|
|
|
from app.fanout.base import FanoutModule
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WebhookModule(FanoutModule):
|
|
"""Delivers message data to an HTTP endpoint via POST (or configurable method)."""
|
|
|
|
def __init__(self, config_id: str, config: dict, *, name: str = "") -> None:
|
|
super().__init__(config_id, config, name=name)
|
|
self._client: httpx.AsyncClient | None = None
|
|
self._last_error: str | None = None
|
|
|
|
async def start(self) -> None:
|
|
self._client = httpx.AsyncClient(timeout=httpx.Timeout(10.0))
|
|
self._last_error = None
|
|
|
|
async def stop(self) -> None:
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
|
|
async def on_message(self, data: dict) -> None:
|
|
await self._send(data, event_type="message")
|
|
|
|
async def _send(self, data: dict, *, event_type: str) -> None:
|
|
if not self._client:
|
|
return
|
|
|
|
url = self.config.get("url", "")
|
|
if not url:
|
|
return
|
|
|
|
method = self.config.get("method", "POST").upper()
|
|
extra_headers = self.config.get("headers", {})
|
|
hmac_secret = self.config.get("hmac_secret", "")
|
|
hmac_header = self.config.get("hmac_header", "X-Webhook-Signature")
|
|
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
"X-Webhook-Event": event_type,
|
|
**extra_headers,
|
|
}
|
|
|
|
body_bytes = json.dumps(data, separators=(",", ":"), sort_keys=True).encode()
|
|
|
|
if hmac_secret:
|
|
sig = hmac.new(hmac_secret.encode(), body_bytes, hashlib.sha256).hexdigest()
|
|
headers[hmac_header or "X-Webhook-Signature"] = f"sha256={sig}"
|
|
|
|
try:
|
|
resp = await self._client.request(method, url, content=body_bytes, headers=headers)
|
|
resp.raise_for_status()
|
|
self._last_error = None
|
|
except httpx.HTTPStatusError as exc:
|
|
self._last_error = f"HTTP {exc.response.status_code}"
|
|
logger.warning(
|
|
"Webhook %s returned %s for %s",
|
|
self.config_id,
|
|
exc.response.status_code,
|
|
url,
|
|
)
|
|
except httpx.RequestError as exc:
|
|
self._last_error = str(exc)
|
|
logger.warning("Webhook %s request error: %s", self.config_id, exc)
|
|
|
|
@property
|
|
def status(self) -> str:
|
|
if not self.config.get("url"):
|
|
return "disconnected"
|
|
if self._last_error:
|
|
return "error"
|
|
return "connected"
|