From 08d55dec723a485c09bf7315dee89546fe491358 Mon Sep 17 00:00:00 2001 From: Jack Kingsman Date: Sun, 29 Mar 2026 18:47:17 -0700 Subject: [PATCH] Show last error status on integrations. Closes #122. --- app/fanout/apprise_mod.py | 7 +- app/fanout/base.py | 21 ++++++ app/fanout/manager.py | 71 +++++++++++++++++-- app/fanout/map_upload.py | 9 ++- app/fanout/mqtt_base.py | 19 +++++ app/fanout/mqtt_community.py | 6 ++ app/fanout/mqtt_private.py | 6 ++ app/fanout/sqs.py | 9 ++- app/fanout/webhook.py | 9 ++- app/routers/health.py | 11 ++- .../settings/SettingsFanoutSection.tsx | 62 +++++++++++++++- frontend/src/test/fanoutSection.test.tsx | 50 +++++++++++++ frontend/src/types.ts | 1 + tests/test_fanout.py | 29 ++++++++ tests/test_health_mqtt_status.py | 10 ++- tests/test_mqtt.py | 1 + 16 files changed, 290 insertions(+), 31 deletions(-) diff --git a/app/fanout/apprise_mod.py b/app/fanout/apprise_mod.py index c463aee..9b71081 100644 --- a/app/fanout/apprise_mod.py +++ b/app/fanout/apprise_mod.py @@ -95,7 +95,6 @@ class AppriseModule(FanoutModule): def __init__(self, config_id: str, config: dict, *, name: str = "") -> None: super().__init__(config_id, config, name=name) - self._last_error: str | None = None async def on_message(self, data: dict) -> None: # Skip outgoing messages — only notify on incoming @@ -114,17 +113,17 @@ class AppriseModule(FanoutModule): success = await asyncio.to_thread( _send_sync, urls, body, preserve_identity=preserve_identity ) - self._last_error = None if success else "Apprise notify returned failure" + self._set_last_error(None if success else "Apprise notify returned failure") if not success: logger.warning("Apprise notification failed for module %s", self.config_id) except Exception as exc: - self._last_error = str(exc) + self._set_last_error(str(exc)) logger.exception("Apprise send error for module %s", self.config_id) @property def status(self) -> str: if not self.config.get("urls", "").strip(): return "disconnected" - if self._last_error: + if self.last_error: return "error" return "connected" diff --git a/app/fanout/base.py b/app/fanout/base.py index 3ad269f..efe2e49 100644 --- a/app/fanout/base.py +++ b/app/fanout/base.py @@ -3,6 +3,14 @@ from __future__ import annotations +def _broadcast_fanout_health() -> None: + """Push updated fanout status to connected frontend clients.""" + from app.services.radio_runtime import radio_runtime as radio_manager + from app.websocket import broadcast_health + + broadcast_health(radio_manager.is_connected, radio_manager.connection_info) + + class FanoutModule: """Base class for all fanout integrations. @@ -16,6 +24,7 @@ class FanoutModule: self.config_id = config_id self.config = config self.name = name + self._last_error: str | None = None async def start(self) -> None: """Start the module (e.g. connect to broker). Override for persistent connections.""" @@ -34,6 +43,18 @@ class FanoutModule: """Return 'connected', 'disconnected', or 'error'.""" raise NotImplementedError + @property + def last_error(self) -> str | None: + """Return the most recent retained operator-facing error, if any.""" + return self._last_error + + def _set_last_error(self, value: str | None) -> None: + """Update the retained error and broadcast health when it changes.""" + if self._last_error == value: + return + self._last_error = value + _broadcast_fanout_health() + def get_fanout_message_text(data: dict) -> str: """Return the best human-readable message body for fanout consumers. diff --git a/app/fanout/manager.py b/app/fanout/manager.py index bf4ab16..5579b93 100644 --- a/app/fanout/manager.py +++ b/app/fanout/manager.py @@ -15,6 +15,14 @@ _DISPATCH_TIMEOUT_SECONDS = 30.0 _MODULE_TYPES: dict[str, type] = {} +def _format_error_detail(exc: Exception) -> str: + """Return a short operator-facing error string.""" + message = str(exc).strip() + if message: + return f"{type(exc).__name__}: {message}" + return type(exc).__name__ + + def _register_module_types() -> None: """Lazily populate the type registry to avoid circular imports.""" if _MODULE_TYPES: @@ -85,6 +93,23 @@ class FanoutManager: self._modules: dict[str, tuple[FanoutModule, dict]] = {} # id -> (module, scope) self._restart_locks: dict[str, asyncio.Lock] = {} self._bots_disabled_until_restart = False + self._module_errors: dict[str, str] = {} + + def _broadcast_health_update(self) -> None: + from app.services.radio_runtime import radio_runtime as radio_manager + from app.websocket import broadcast_health + + broadcast_health(radio_manager.is_connected, radio_manager.connection_info) + + def _set_module_error(self, config_id: str, error: str) -> None: + if self._module_errors.get(config_id) == error: + return + self._module_errors[config_id] = error + self._broadcast_health_update() + + def _clear_module_error(self, config_id: str) -> None: + if self._module_errors.pop(config_id, None) is not None: + self._broadcast_health_update() def get_bots_disabled_source(self) -> str | None: """Return why bot modules are unavailable, if at all.""" @@ -134,11 +159,13 @@ class FanoutManager: module = cls(config_id, config_blob, name=cfg.get("name", "")) await module.start() self._modules[config_id] = (module, scope) + self._clear_module_error(config_id) logger.info( "Started fanout module %s (type=%s)", cfg.get("name", config_id), config_type ) - except Exception: + except Exception as exc: logger.exception("Failed to start fanout module %s", config_id) + self._set_module_error(config_id, _format_error_detail(exc)) async def reload_config(self, config_id: str) -> None: """Stop old module (if any) and start updated config.""" @@ -162,6 +189,7 @@ class FanoutManager: await module.stop() except Exception: logger.exception("Error stopping fanout module %s", config_id) + self._clear_module_error(config_id) async def _dispatch_matching( self, @@ -191,7 +219,12 @@ class FanoutManager: try: handler = getattr(module, handler_name) await asyncio.wait_for(handler(data), timeout=_DISPATCH_TIMEOUT_SECONDS) + self._clear_module_error(config_id) except asyncio.TimeoutError: + timeout_error = ( + f"{handler_name} timed out after {_DISPATCH_TIMEOUT_SECONDS:.1f}s" + ) + self._set_module_error(config_id, timeout_error) logger.error( "Fanout %s %s timed out after %.1fs; restarting module", config_id, @@ -199,7 +232,8 @@ class FanoutManager: _DISPATCH_TIMEOUT_SECONDS, ) await self._restart_module(config_id, module) - except Exception: + except Exception as exc: + self._set_module_error(config_id, _format_error_detail(exc)) logger.exception("Fanout %s %s error", config_id, log_label) async def _restart_module(self, config_id: str, module: FanoutModule) -> None: @@ -215,6 +249,10 @@ class FanoutManager: except Exception: logger.exception("Failed to restart timed-out fanout module %s", config_id) self._modules.pop(config_id, None) + self._set_module_error( + config_id, + "Module restart failed after timeout", + ) async def broadcast_message(self, data: dict) -> None: """Dispatch a decoded message to modules whose scope matches.""" @@ -243,18 +281,39 @@ class FanoutManager: logger.exception("Error stopping fanout module %s", config_id) self._modules.clear() self._restart_locks.clear() + self._module_errors.clear() - def get_statuses(self) -> dict[str, dict[str, str]]: + def get_statuses(self) -> dict[str, dict[str, str | None]]: """Return status info for each active module.""" from app.repository.fanout import _configs_cache - result: dict[str, dict[str, str]] = {} - for config_id, (module, _) in self._modules.items(): + result: dict[str, dict[str, str | None]] = {} + all_ids = set(_configs_cache) | set(self._modules) | set(self._module_errors) + for config_id in all_ids: info = _configs_cache.get(config_id, {}) + if info.get("enabled") is False: + continue + + module_entry = self._modules.get(config_id) + module = module_entry[0] if module_entry is not None else None + last_error = module.last_error if module is not None else None + status = module.status if module is not None else "error" + + manager_error = self._module_errors.get(config_id) + if manager_error is not None: + status = "error" + last_error = manager_error + elif last_error is not None and status != "error": + status = "error" + + if module is None and last_error is None: + continue + result[config_id] = { "name": info.get("name", config_id), "type": info.get("type", "unknown"), - "status": module.status, + "status": status, + "last_error": last_error, } return result diff --git a/app/fanout/map_upload.py b/app/fanout/map_upload.py index fdfd808..6876d17 100644 --- a/app/fanout/map_upload.py +++ b/app/fanout/map_upload.py @@ -106,7 +106,6 @@ class MapUploadModule(FanoutModule): def __init__(self, config_id: str, config: dict, *, name: str = "") -> None: super().__init__(config_id, config, name=name) self._client: httpx.AsyncClient | None = None - self._last_error: str | None = None # Per-pubkey rate limiting: pubkey_hex -> last_uploaded_advert_timestamp self._seen: dict[str, int] = {} @@ -295,7 +294,7 @@ class MapUploadModule(FanoutModule): ) resp.raise_for_status() self._seen[pubkey] = advert_timestamp - self._last_error = None + self._set_last_error(None) logger.info( "MapUpload: uploaded %s (%s) → HTTP %d", pubkey[:12], @@ -303,7 +302,7 @@ class MapUploadModule(FanoutModule): resp.status_code, ) except httpx.HTTPStatusError as exc: - self._last_error = f"HTTP {exc.response.status_code}" + self._set_last_error(f"HTTP {exc.response.status_code}") logger.warning( "MapUpload: server returned %d for %s: %s", exc.response.status_code, @@ -311,13 +310,13 @@ class MapUploadModule(FanoutModule): exc.response.text[:200], ) except httpx.RequestError as exc: - self._last_error = str(exc) + self._set_last_error(str(exc)) logger.warning("MapUpload: request error for %s: %s", pubkey[:12], exc) @property def status(self) -> str: if self._client is None: return "disconnected" - if self._last_error: + if self.last_error: return "error" return "connected" diff --git a/app/fanout/mqtt_base.py b/app/fanout/mqtt_base.py index ec6f973..467b3b2 100644 --- a/app/fanout/mqtt_base.py +++ b/app/fanout/mqtt_base.py @@ -23,6 +23,14 @@ logger = logging.getLogger(__name__) _BACKOFF_MIN = 5 +def _format_error_detail(exc: Exception) -> str: + """Return a short operator-facing error string.""" + message = str(exc).strip() + if message: + return message + return type(exc).__name__ + + def _broadcast_health() -> None: """Push updated health (including MQTT status) to all WS clients.""" from app.services.radio_runtime import radio_runtime as radio_manager @@ -55,6 +63,7 @@ class BaseMqttPublisher(ABC): self._version_event: asyncio.Event = asyncio.Event() self.connected: bool = False self.integration_name: str = "" + self._last_error: str | None = None def set_integration_name(self, name: str) -> None: """Attach the configured fanout-module name for operator-facing logs.""" @@ -66,11 +75,17 @@ class BaseMqttPublisher(ABC): return f"{self._log_prefix} [{self.integration_name}]" return self._log_prefix + @property + def last_error(self) -> str | None: + """Return the most recent retained connection/publish error.""" + return self._last_error + # ── Lifecycle ────────────────────────────────────────────────────── async def start(self, settings: object) -> None: """Start the background connection loop.""" self._settings = settings + self._last_error = None self._settings_version += 1 self._version_event.set() if self._task is None or self._task.done(): @@ -87,6 +102,7 @@ class BaseMqttPublisher(ABC): self._task = None self._client = None self.connected = False + self._last_error = None async def restart(self, settings: object) -> None: """Called when settings change — stop + start.""" @@ -109,6 +125,7 @@ class BaseMqttPublisher(ABC): exc_info=True, ) self.connected = False + self._last_error = _format_error_detail(e) # Wake the connection loop so it exits the wait and reconnects self._settings_version += 1 self._version_event.set() @@ -198,6 +215,7 @@ class BaseMqttPublisher(ABC): async with aiomqtt.Client(**client_kwargs) as client: self._client = client self.connected = True + self._last_error = None backoff = _BACKOFF_MIN title, detail = self._on_connected(settings) @@ -232,6 +250,7 @@ class BaseMqttPublisher(ABC): except Exception as e: self.connected = False self._client = None + self._last_error = _format_error_detail(e) title, detail = self._on_error() broadcast_error(title, detail) diff --git a/app/fanout/mqtt_community.py b/app/fanout/mqtt_community.py index d17971b..9c4dc13 100644 --- a/app/fanout/mqtt_community.py +++ b/app/fanout/mqtt_community.py @@ -98,9 +98,15 @@ class MqttCommunityModule(FanoutModule): @property def status(self) -> str: if self._publisher._is_configured(): + if self._publisher.last_error: + return "error" return "connected" if self._publisher.connected else "disconnected" return "disconnected" + @property + def last_error(self) -> str | None: + return self._publisher.last_error + async def _publish_community_packet( publisher: CommunityMqttPublisher, diff --git a/app/fanout/mqtt_private.py b/app/fanout/mqtt_private.py index 19e49ae..a679f01 100644 --- a/app/fanout/mqtt_private.py +++ b/app/fanout/mqtt_private.py @@ -59,4 +59,10 @@ class MqttPrivateModule(FanoutModule): def status(self) -> str: if not self.config.get("broker_host"): return "disconnected" + if self._publisher.last_error: + return "error" return "connected" if self._publisher.connected else "disconnected" + + @property + def last_error(self) -> str | None: + return self._publisher.last_error diff --git a/app/fanout/sqs.py b/app/fanout/sqs.py index 79f3822..3f06e3f 100644 --- a/app/fanout/sqs.py +++ b/app/fanout/sqs.py @@ -84,7 +84,6 @@ class SqsModule(FanoutModule): def __init__(self, config_id: str, config: dict, *, name: str = "") -> None: super().__init__(config_id, config, name=name) self._client = None - self._last_error: str | None = None async def start(self) -> None: kwargs: dict[str, str] = {} @@ -147,18 +146,18 @@ class SqsModule(FanoutModule): try: await asyncio.to_thread(partial(self._client.send_message, **request_kwargs)) - self._last_error = None + self._set_last_error(None) except (ClientError, BotoCoreError) as exc: - self._last_error = str(exc) + self._set_last_error(str(exc)) logger.warning("SQS %s send error: %s", self.config_id, exc) except Exception as exc: - self._last_error = str(exc) + self._set_last_error(str(exc)) logger.exception("Unexpected SQS send error for %s", self.config_id) @property def status(self) -> str: if not str(self.config.get("queue_url", "")).strip(): return "disconnected" - if self._last_error: + if self.last_error: return "error" return "connected" diff --git a/app/fanout/webhook.py b/app/fanout/webhook.py index 0ec9c28..84b7846 100644 --- a/app/fanout/webhook.py +++ b/app/fanout/webhook.py @@ -20,7 +20,6 @@ class WebhookModule(FanoutModule): def __init__(self, config_id: str, config: dict, *, name: str = "") -> None: super().__init__(config_id, config, name=name) self._client: httpx.AsyncClient | None = None - self._last_error: str | None = None async def start(self) -> None: self._client = httpx.AsyncClient(timeout=httpx.Timeout(10.0)) @@ -62,9 +61,9 @@ class WebhookModule(FanoutModule): try: resp = await self._client.request(method, url, content=body_bytes, headers=headers) resp.raise_for_status() - self._last_error = None + self._set_last_error(None) except httpx.HTTPStatusError as exc: - self._last_error = f"HTTP {exc.response.status_code}" + self._set_last_error(f"HTTP {exc.response.status_code}") logger.warning( "Webhook %s returned %s for %s", self.config_id, @@ -72,13 +71,13 @@ class WebhookModule(FanoutModule): url, ) except httpx.RequestError as exc: - self._last_error = str(exc) + self._set_last_error(str(exc)) logger.warning("Webhook %s request error: %s", self.config_id, exc) @property def status(self) -> str: if not self.config.get("url"): return "disconnected" - if self._last_error: + if self.last_error: return "error" return "connected" diff --git a/app/routers/health.py b/app/routers/health.py index 7b62564..744e327 100644 --- a/app/routers/health.py +++ b/app/routers/health.py @@ -2,7 +2,7 @@ import os from typing import Any, Literal from fastapi import APIRouter -from pydantic import BaseModel +from pydantic import BaseModel, Field from app.config import settings from app.repository import RawPacketRepository @@ -25,6 +25,13 @@ class AppInfoResponse(BaseModel): commit_hash: str | None = None +class FanoutStatusResponse(BaseModel): + name: str + type: str + status: str + last_error: str | None = None + + class HealthResponse(BaseModel): status: str radio_connected: bool @@ -35,7 +42,7 @@ class HealthResponse(BaseModel): radio_device_info: RadioDeviceInfoResponse | None = None database_size_mb: float oldest_undecrypted_timestamp: int | None - fanout_statuses: dict[str, dict[str, str]] = {} + fanout_statuses: dict[str, FanoutStatusResponse] = Field(default_factory=dict) bots_disabled: bool = False bots_disabled_source: Literal["env", "until_restart"] | None = None basic_auth_enabled: bool = False diff --git a/frontend/src/components/settings/SettingsFanoutSection.tsx b/frontend/src/components/settings/SettingsFanoutSection.tsx index ec1ffd4..71911f8 100644 --- a/frontend/src/components/settings/SettingsFanoutSection.tsx +++ b/frontend/src/components/settings/SettingsFanoutSection.tsx @@ -1,10 +1,17 @@ import { useState, useEffect, useCallback, useMemo, useRef, lazy, Suspense } from 'react'; -import { ChevronDown } from 'lucide-react'; +import { ChevronDown, Info } from 'lucide-react'; import { Input } from '../ui/input'; import { Label } from '../ui/label'; import { Button } from '../ui/button'; import { Separator } from '../ui/separator'; -import { Dialog, DialogContent, DialogFooter, DialogHeader, DialogTitle } from '../ui/dialog'; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from '../ui/dialog'; import { toast } from '../ui/sonner'; import { cn } from '@/lib/utils'; import { api } from '../../api'; @@ -1854,6 +1861,10 @@ export function SettingsFanoutSection({ const [inlineEditName, setInlineEditName] = useState(''); const [createDialogOpen, setCreateDialogOpen] = useState(false); const [selectedCreateType, setSelectedCreateType] = useState(null); + const [errorDialogState, setErrorDialogState] = useState<{ + integrationName: string; + error: string; + } | null>(null); const [busy, setBusy] = useState(false); const loadConfigs = useCallback(async () => { @@ -2207,6 +2218,33 @@ export function SettingsFanoutSection({ }} /> + { + if (!open) { + setErrorDialogState(null); + } + }} + > + + + + {errorDialogState + ? `${errorDialogState.integrationName} Error` + : 'Integration Error'} + + + Most recent backend error retained for this integration. + + +
+

+ {errorDialogState?.error} +

+
+
+
+ {configGroups.length > 0 && (
{configGroups.map((group) => ( @@ -2220,6 +2258,7 @@ export function SettingsFanoutSection({ {group.configs.map((cfg) => { const statusEntry = health?.fanout_statuses?.[cfg.id]; const status = cfg.enabled ? statusEntry?.status : undefined; + const lastError = cfg.enabled ? statusEntry?.last_error : null; const communityConfig = cfg.config as Record; return (
+ {lastError && ( + + )} +