diff --git a/TASKS.md b/TASKS.md index dc8a6a4..96f719f 100644 --- a/TASKS.md +++ b/TASKS.md @@ -267,13 +267,13 @@ This document tracks implementation progress for the MeshCore Hub project. Each ### 3.3 Webhook Dispatcher (Optional based on Q10) -- [ ] Create `collector/webhook.py`: - - [ ] `WebhookDispatcher` class - - [ ] Webhook configuration loading - - [ ] JSONPath filtering support - - [ ] Async HTTP POST sending - - [ ] Retry logic with backoff - - [ ] Error logging +- [x] Create `collector/webhook.py`: + - [x] `WebhookDispatcher` class + - [x] Webhook configuration loading + - [x] JSONPath filtering support + - [x] Async HTTP POST sending + - [x] Retry logic with backoff + - [x] Error logging ### 3.4 Collector CLI @@ -299,10 +299,10 @@ This document tracks implementation progress for the MeshCore Hub project. Each - [x] `test_trace.py` - [x] `test_telemetry.py` - [x] `test_contacts.py` -- [ ] Create `tests/test_collector/test_webhook.py`: - - [ ] Test webhook dispatching - - [ ] Test JSONPath filtering - - [ ] Test retry logic +- [x] Create `tests/test_collector/test_webhook.py`: + - [x] Test webhook dispatching + - [x] Test JSONPath filtering + - [x] Test retry logic --- @@ -672,12 +672,12 @@ This document tracks implementation progress for the MeshCore Hub project. Each - [x] Add health check endpoint to Web: - [x] `GET /health` - basic health - [x] `GET /health/ready` - includes API connectivity -- [ ] Add health check to Interface: - - [ ] Device connection status - - [ ] MQTT connection status -- [ ] Add health check to Collector: - - [ ] MQTT connection status - - [ ] Database connection status +- [x] Add health check to Interface: + - [x] Device connection status + - [x] MQTT connection status +- [x] Add health check to Collector: + - [x] MQTT connection status + - [x] Database connection status ### 6.4 Database CLI Commands ✅ @@ -690,13 +690,13 @@ This document tracks implementation progress for the MeshCore Hub project. Each ### 6.5 Documentation -- [ ] Update `README.md`: - - [ ] Project description - - [ ] Quick start guide - - [ ] Docker deployment instructions - - [ ] Manual installation instructions - - [ ] Configuration reference - - [ ] CLI reference +- [x] Update `README.md`: + - [x] Project description + - [x] Quick start guide + - [x] Docker deployment instructions + - [x] Manual installation instructions + - [x] Configuration reference + - [x] CLI reference - [ ] Create `docs/` directory (optional): - [ ] Architecture overview - [ ] API documentation link @@ -738,11 +738,13 @@ This document tracks implementation progress for the MeshCore Hub project. Each |-------|-------------|-----------|----------| | Phase 1: Foundation | 47 | 47 | 100% | | Phase 2: Interface | 35 | 35 | 100% | -| Phase 3: Collector | 27 | 20 | 74% | +| Phase 3: Collector | 27 | 27 | 100% | | Phase 4: API | 44 | 44 | 100% | | Phase 5: Web Dashboard | 40 | 40 | 100% | -| Phase 6: Docker & Deployment | 28 | 24 | 86% | -| **Total** | **221** | **210** | **95%** | +| Phase 6: Docker & Deployment | 28 | 25 | 89% | +| **Total** | **221** | **218** | **99%** | + +*Note: Remaining 3 tasks are optional (creating a `docs/` directory).* --- diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index aa46885..133b0ba 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -71,7 +71,7 @@ services: - NODE_ADDRESS=${NODE_ADDRESS:-} command: ["interface", "receiver"] healthcheck: - test: ["CMD", "pgrep", "-f", "meshcore-hub"] + test: ["CMD", "meshcore-hub", "health", "interface"] interval: 30s timeout: 10s retries: 3 @@ -107,7 +107,7 @@ services: - NODE_ADDRESS=${NODE_ADDRESS_SENDER:-} command: ["interface", "sender"] healthcheck: - test: ["CMD", "pgrep", "-f", "meshcore-hub"] + test: ["CMD", "meshcore-hub", "health", "interface"] interval: 30s timeout: 10s retries: 3 @@ -138,7 +138,7 @@ services: - NODE_ADDRESS=${NODE_ADDRESS:-0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef} command: ["interface", "receiver", "--mock"] healthcheck: - test: ["CMD", "pgrep", "-f", "meshcore-hub"] + test: ["CMD", "meshcore-hub", "health", "interface"] interval: 30s timeout: 10s retries: 3 @@ -172,7 +172,7 @@ services: - DATABASE_URL=sqlite:////data/meshcore.db command: ["collector"] healthcheck: - test: ["CMD", "pgrep", "-f", "meshcore-hub"] + test: ["CMD", "meshcore-hub", "health", "collector"] interval: 30s timeout: 10s retries: 3 diff --git a/src/meshcore_hub/__main__.py b/src/meshcore_hub/__main__.py index 7e1de6a..96d8fc9 100644 --- a/src/meshcore_hub/__main__.py +++ b/src/meshcore_hub/__main__.py @@ -1,10 +1,13 @@ """MeshCore Hub CLI entry point.""" +import sys + import click from dotenv import load_dotenv from meshcore_hub import __version__ from meshcore_hub.common.config import LogLevel +from meshcore_hub.common.health import check_health from meshcore_hub.common.logging import configure_logging # Load .env file early so Click's envvar parameter picks up values @@ -171,6 +174,62 @@ def db_history() -> None: command.history(alembic_cfg) +# Health check commands for Docker HEALTHCHECK +@cli.group() +def health() -> None: + """Health check commands for component monitoring. + + These commands are used by Docker HEALTHCHECK to monitor + container health. Each running component writes its health + status to a file, and these commands verify that status. + """ + pass + + +@health.command("interface") +@click.option( + "--timeout", + type=int, + default=60, + help="Maximum age of health status in seconds", +) +def health_interface(timeout: int) -> None: + """Check interface component health status. + + Returns exit code 0 if healthy, 1 if not. + """ + is_healthy, message = check_health("interface", stale_threshold=timeout) + + if is_healthy: + click.echo(f"Interface health: {message}") + sys.exit(0) + else: + click.echo(f"Interface unhealthy: {message}", err=True) + sys.exit(1) + + +@health.command("collector") +@click.option( + "--timeout", + type=int, + default=60, + help="Maximum age of health status in seconds", +) +def health_collector(timeout: int) -> None: + """Check collector component health status. + + Returns exit code 0 if healthy, 1 if not. + """ + is_healthy, message = check_health("collector", stale_threshold=timeout) + + if is_healthy: + click.echo(f"Collector health: {message}") + sys.exit(0) + else: + click.echo(f"Collector unhealthy: {message}", err=True) + sys.exit(1) + + def main() -> None: """Main entry point.""" cli() diff --git a/src/meshcore_hub/collector/subscriber.py b/src/meshcore_hub/collector/subscriber.py index 89fcef7..ba834ed 100644 --- a/src/meshcore_hub/collector/subscriber.py +++ b/src/meshcore_hub/collector/subscriber.py @@ -14,6 +14,7 @@ import time from typing import Any, Callable, Optional from meshcore_hub.common.database import DatabaseManager +from meshcore_hub.common.health import HealthReporter from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig logger = logging.getLogger(__name__) @@ -44,6 +45,7 @@ class Subscriber: self._handlers: dict[str, EventHandler] = {} self._mqtt_connected = False self._db_connected = False + self._health_reporter: Optional[HealthReporter] = None @property def is_healthy(self) -> bool: @@ -147,6 +149,14 @@ class Subscriber: self._running = True + # Start health reporter for Docker health checks + self._health_reporter = HealthReporter( + component="collector", + status_fn=self.get_health_status, + interval=10.0, + ) + self._health_reporter.start() + def run(self) -> None: """Run the subscriber event loop (blocking).""" if not self._running: @@ -171,6 +181,11 @@ class Subscriber: self._running = False self._shutdown_event.set() + # Stop health reporter + if self._health_reporter: + self._health_reporter.stop() + self._health_reporter = None + # Stop MQTT self.mqtt.stop() self.mqtt.disconnect() diff --git a/src/meshcore_hub/collector/webhook.py b/src/meshcore_hub/collector/webhook.py new file mode 100644 index 0000000..7153e4b --- /dev/null +++ b/src/meshcore_hub/collector/webhook.py @@ -0,0 +1,438 @@ +"""Webhook Dispatcher for sending events to external services. + +The webhook dispatcher: +1. Receives events from the collector +2. Filters events based on JSONPath expressions +3. Sends HTTP POST requests to configured endpoints +4. Implements retry logic with exponential backoff +""" + +import asyncio +import logging +import re +from dataclasses import dataclass, field +from typing import Any, Callable, Optional + +import httpx + +logger = logging.getLogger(__name__) + + +@dataclass +class WebhookConfig: + """Configuration for a single webhook endpoint.""" + + url: str + name: str = "webhook" + event_types: list[str] = field(default_factory=list) + filter_expression: Optional[str] = None + headers: dict[str, str] = field(default_factory=dict) + timeout: float = 10.0 + max_retries: int = 3 + retry_backoff: float = 2.0 + enabled: bool = True + + def matches_event(self, event_type: str, payload: dict[str, Any]) -> bool: + """Check if this webhook should receive the event. + + Args: + event_type: Event type name + payload: Event payload + + Returns: + True if the event matches this webhook's filters + """ + # Check event type filter + if self.event_types and event_type not in self.event_types: + return False + + # Check JSONPath filter expression + if self.filter_expression: + if not self._evaluate_filter(payload): + return False + + return True + + def _evaluate_filter(self, payload: dict[str, Any]) -> bool: + """Evaluate a simple JSONPath-like filter expression. + + Supports expressions like: + - $.field == "value" + - $.nested.field != null + - $.field exists + - $.field > 10 + + Args: + payload: Event payload + + Returns: + True if the filter matches + """ + if not self.filter_expression: + return True + + expr = self.filter_expression.strip() + + # Parse expression: $.path operator value + # Supports: ==, !=, >, <, >=, <=, exists, not exists + # Note: >= and <= must come before > and < in the alternation + pattern = r'^\$\.([a-zA-Z0-9_.]+)\s+(==|!=|>=|<=|>|<|exists|not exists)\s*(.*)$' + match = re.match(pattern, expr) + + if not match: + logger.warning(f"Invalid filter expression: {expr}") + return True # Pass through if expression is invalid + + path = match.group(1) + operator = match.group(2) + value_str = match.group(3).strip() if match.group(3) else None + + # Navigate the path + current = payload + for part in path.split('.'): + if isinstance(current, dict) and part in current: + current = current[part] + else: + current = None + break + + # Evaluate operator + if operator == "exists": + return current is not None + elif operator == "not exists": + return current is None + elif current is None: + return False + + # Parse value for comparison + if value_str is None: + return False + + # Handle quoted strings + if value_str.startswith('"') and value_str.endswith('"'): + compare_value: Any = value_str[1:-1] + elif value_str.startswith("'") and value_str.endswith("'"): + compare_value = value_str[1:-1] + elif value_str == "null": + compare_value = None + elif value_str == "true": + compare_value = True + elif value_str == "false": + compare_value = False + else: + try: + compare_value = int(value_str) + except ValueError: + try: + compare_value = float(value_str) + except ValueError: + compare_value = value_str + + # Perform comparison + try: + if operator == "==": + return current == compare_value + elif operator == "!=": + return current != compare_value + elif operator == ">": + return current > compare_value + elif operator == "<": + return current < compare_value + elif operator == ">=": + return current >= compare_value + elif operator == "<=": + return current <= compare_value + except TypeError: + return False + + return False + + +class WebhookDispatcher: + """Dispatches events to webhook endpoints.""" + + def __init__(self, webhooks: Optional[list[WebhookConfig]] = None): + """Initialize the webhook dispatcher. + + Args: + webhooks: List of webhook configurations + """ + self.webhooks = webhooks or [] + self._client: Optional[httpx.AsyncClient] = None + self._running = False + + @property + def is_running(self) -> bool: + """Check if the dispatcher is running.""" + return self._running + + def add_webhook(self, webhook: WebhookConfig) -> None: + """Add a webhook configuration. + + Args: + webhook: Webhook configuration + """ + self.webhooks.append(webhook) + logger.info(f"Added webhook: {webhook.name} -> {webhook.url}") + + def remove_webhook(self, name: str) -> bool: + """Remove a webhook by name. + + Args: + name: Webhook name + + Returns: + True if webhook was removed + """ + for i, webhook in enumerate(self.webhooks): + if webhook.name == name: + del self.webhooks[i] + logger.info(f"Removed webhook: {name}") + return True + return False + + async def start(self) -> None: + """Start the webhook dispatcher.""" + if self._running: + return + + self._client = httpx.AsyncClient() + self._running = True + logger.info(f"Webhook dispatcher started with {len(self.webhooks)} webhooks") + + async def stop(self) -> None: + """Stop the webhook dispatcher.""" + if not self._running: + return + + self._running = False + if self._client: + await self._client.aclose() + self._client = None + logger.info("Webhook dispatcher stopped") + + async def dispatch( + self, + event_type: str, + payload: dict[str, Any], + public_key: Optional[str] = None, + ) -> dict[str, bool]: + """Dispatch an event to all matching webhooks. + + Args: + event_type: Event type name + payload: Event payload + public_key: Source node public key + + Returns: + Dictionary mapping webhook names to success status + """ + if not self._running or not self.webhooks: + return {} + + # Build full event data + event_data = { + "event_type": event_type, + "public_key": public_key, + "payload": payload, + } + + results: dict[str, bool] = {} + + # Dispatch to all matching webhooks concurrently + tasks = [] + for webhook in self.webhooks: + if not webhook.enabled: + continue + if webhook.matches_event(event_type, payload): + tasks.append(self._send_webhook(webhook, event_data)) + + if tasks: + task_results = await asyncio.gather(*tasks, return_exceptions=True) + for webhook, result in zip( + [w for w in self.webhooks if w.enabled and w.matches_event(event_type, payload)], + task_results, + ): + if isinstance(result, Exception): + results[webhook.name] = False + logger.error(f"Webhook {webhook.name} failed: {result}") + else: + results[webhook.name] = result + + return results + + async def _send_webhook( + self, + webhook: WebhookConfig, + event_data: dict[str, Any], + ) -> bool: + """Send an event to a webhook endpoint with retry logic. + + Args: + webhook: Webhook configuration + event_data: Event data to send + + Returns: + True if the webhook was sent successfully + """ + if not self._client: + return False + + headers = { + "Content-Type": "application/json", + "User-Agent": "MeshCore-Hub/1.0", + **webhook.headers, + } + + last_error: Optional[Exception] = None + + for attempt in range(webhook.max_retries + 1): + try: + response = await self._client.post( + webhook.url, + json=event_data, + headers=headers, + timeout=webhook.timeout, + ) + + if response.status_code >= 200 and response.status_code < 300: + logger.debug( + f"Webhook {webhook.name} sent successfully " + f"(status={response.status_code})" + ) + return True + else: + logger.warning( + f"Webhook {webhook.name} returned status {response.status_code}" + ) + last_error = Exception(f"HTTP {response.status_code}") + + except httpx.TimeoutException as e: + logger.warning(f"Webhook {webhook.name} timed out: {e}") + last_error = e + except httpx.RequestError as e: + logger.warning(f"Webhook {webhook.name} request error: {e}") + last_error = e + except Exception as e: + logger.error(f"Webhook {webhook.name} unexpected error: {e}") + last_error = e + + # Retry with backoff (but not after the last attempt) + if attempt < webhook.max_retries: + backoff = webhook.retry_backoff * (2 ** attempt) + logger.info( + f"Retrying webhook {webhook.name} in {backoff}s " + f"(attempt {attempt + 2}/{webhook.max_retries + 1})" + ) + await asyncio.sleep(backoff) + + logger.error( + f"Webhook {webhook.name} failed after {webhook.max_retries + 1} attempts: " + f"{last_error}" + ) + return False + + +def create_webhook_dispatcher_from_config( + config: list[dict[str, Any]], +) -> WebhookDispatcher: + """Create a webhook dispatcher from configuration. + + Args: + config: List of webhook configurations as dicts + + Returns: + Configured WebhookDispatcher instance + + Example config: + [ + { + "name": "my-webhook", + "url": "https://example.com/webhook", + "event_types": ["advertisement", "contact_msg_recv"], + "filter_expression": "$.snr > -10", + "headers": {"X-API-Key": "secret"}, + "timeout": 5.0, + "max_retries": 3, + "retry_backoff": 2.0, + "enabled": true + } + ] + """ + webhooks = [] + + for item in config: + try: + webhook = WebhookConfig( + url=item["url"], + name=item.get("name", "webhook"), + event_types=item.get("event_types", []), + filter_expression=item.get("filter_expression"), + headers=item.get("headers", {}), + timeout=item.get("timeout", 10.0), + max_retries=item.get("max_retries", 3), + retry_backoff=item.get("retry_backoff", 2.0), + enabled=item.get("enabled", True), + ) + webhooks.append(webhook) + logger.info(f"Loaded webhook config: {webhook.name}") + except KeyError as e: + logger.error(f"Invalid webhook config (missing {e}): {item}") + except Exception as e: + logger.error(f"Failed to load webhook config: {e}") + + return WebhookDispatcher(webhooks) + + +# Synchronous wrapper for use in non-async handlers +_dispatcher: Optional[WebhookDispatcher] = None +_dispatch_queue: list[tuple[str, dict[str, Any], Optional[str]]] = [] +_dispatch_callback: Optional[Callable[[str, dict[str, Any], Optional[str]], None]] = None + + +def set_dispatch_callback( + callback: Optional[Callable[[str, dict[str, Any], Optional[str]], None]] +) -> None: + """Set a callback for synchronous webhook dispatch. + + This allows the collector to integrate webhook dispatching into its + event handling loop without requiring async handlers. + + Args: + callback: Function to call with (event_type, payload, public_key) + """ + global _dispatch_callback + _dispatch_callback = callback + + +def dispatch_event( + event_type: str, + payload: dict[str, Any], + public_key: Optional[str] = None, +) -> None: + """Queue an event for webhook dispatch. + + This is a synchronous wrapper for use in non-async handlers. + Events are queued and should be processed by an async loop. + + Args: + event_type: Event type name + payload: Event payload + public_key: Source node public key + """ + if _dispatch_callback: + _dispatch_callback(event_type, payload, public_key) + else: + _dispatch_queue.append((event_type, payload, public_key)) + + +def get_queued_events() -> list[tuple[str, dict[str, Any], Optional[str]]]: + """Get and clear queued events. + + Returns: + List of (event_type, payload, public_key) tuples + """ + global _dispatch_queue + events = _dispatch_queue.copy() + _dispatch_queue = [] + return events diff --git a/src/meshcore_hub/common/health.py b/src/meshcore_hub/common/health.py new file mode 100644 index 0000000..de0558d --- /dev/null +++ b/src/meshcore_hub/common/health.py @@ -0,0 +1,290 @@ +"""Health check utilities for MeshCore Hub components. + +This module provides utilities for: +1. Writing health status to a file (for Docker HEALTHCHECK) +2. Reading health status from a file (for health check commands) +3. Running periodic health updates +""" + +import json +import logging +import os +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable, Optional + +logger = logging.getLogger(__name__) + +# Default health file locations +DEFAULT_HEALTH_DIR = "/tmp/meshcore-hub" +DEFAULT_HEALTH_FILE_INTERFACE = "interface-health.json" +DEFAULT_HEALTH_FILE_COLLECTOR = "collector-health.json" + +# Health status is considered stale after this many seconds +HEALTH_STALE_THRESHOLD = 60 + + +@dataclass +class HealthStatus: + """Health status data structure.""" + + healthy: bool + component: str + timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + details: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + return { + "healthy": self.healthy, + "component": self.component, + "timestamp": self.timestamp, + "details": self.details, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "HealthStatus": + """Create from dictionary.""" + return cls( + healthy=data.get("healthy", False), + component=data.get("component", "unknown"), + timestamp=data.get("timestamp", ""), + details=data.get("details", {}), + ) + + def is_stale(self, threshold_seconds: int = HEALTH_STALE_THRESHOLD) -> bool: + """Check if this health status is stale. + + Args: + threshold_seconds: Maximum age in seconds + + Returns: + True if the status is older than threshold + """ + try: + status_time = datetime.fromisoformat(self.timestamp.replace("Z", "+00:00")) + age = (datetime.now(timezone.utc) - status_time).total_seconds() + return age > threshold_seconds + except (ValueError, TypeError): + return True + + +def get_health_dir() -> Path: + """Get the health directory path. + + Returns: + Path to health directory + """ + health_dir = os.environ.get("HEALTH_DIR", DEFAULT_HEALTH_DIR) + return Path(health_dir) + + +def get_health_file(component: str) -> Path: + """Get the health file path for a component. + + Args: + component: Component name (interface, collector) + + Returns: + Path to health file + """ + health_dir = get_health_dir() + if component == "interface": + return health_dir / DEFAULT_HEALTH_FILE_INTERFACE + elif component == "collector": + return health_dir / DEFAULT_HEALTH_FILE_COLLECTOR + else: + return health_dir / f"{component}-health.json" + + +def write_health_status(status: HealthStatus) -> bool: + """Write health status to file. + + Args: + status: Health status to write + + Returns: + True if write was successful + """ + health_file = get_health_file(status.component) + + try: + # Ensure directory exists + health_file.parent.mkdir(parents=True, exist_ok=True) + + # Write status atomically + temp_file = health_file.with_suffix(".tmp") + with open(temp_file, "w") as f: + json.dump(status.to_dict(), f) + + temp_file.replace(health_file) + return True + + except Exception as e: + logger.error(f"Failed to write health status: {e}") + return False + + +def read_health_status(component: str) -> Optional[HealthStatus]: + """Read health status from file. + + Args: + component: Component name + + Returns: + Health status or None if not available + """ + health_file = get_health_file(component) + + try: + if not health_file.exists(): + return None + + with open(health_file) as f: + data = json.load(f) + + return HealthStatus.from_dict(data) + + except Exception as e: + logger.error(f"Failed to read health status: {e}") + return None + + +def check_health(component: str, stale_threshold: int = HEALTH_STALE_THRESHOLD) -> tuple[bool, str]: + """Check health status for a component. + + Args: + component: Component name + stale_threshold: Maximum age of health status in seconds + + Returns: + Tuple of (is_healthy, message) + """ + status = read_health_status(component) + + if status is None: + return False, f"No health status found for {component}" + + if status.is_stale(stale_threshold): + return False, f"Health status is stale (older than {stale_threshold}s)" + + if not status.healthy: + details = status.details + reasons = [] + for key, value in details.items(): + if key.endswith("_connected") and value is False: + reasons.append(f"{key.replace('_', ' ')}") + elif key == "running" and value is False: + reasons.append("not running") + reason_str = ", ".join(reasons) if reasons else "unhealthy" + return False, f"Component is {reason_str}" + + return True, "healthy" + + +def clear_health_status(component: str) -> bool: + """Remove health status file. + + Args: + component: Component name + + Returns: + True if file was removed or didn't exist + """ + health_file = get_health_file(component) + + try: + if health_file.exists(): + health_file.unlink() + return True + except Exception as e: + logger.error(f"Failed to clear health status: {e}") + return False + + +class HealthReporter: + """Background health reporter that periodically updates health status.""" + + def __init__( + self, + component: str, + status_fn: Callable[[], dict[str, Any]], + interval: float = 10.0, + ): + """Initialize health reporter. + + Args: + component: Component name + status_fn: Function that returns health status dict + Should return a dict with at least 'healthy' key + interval: Update interval in seconds + """ + self.component = component + self.status_fn = status_fn + self.interval = interval + self._running = False + self._thread: Optional[threading.Thread] = None + + def start(self) -> None: + """Start the health reporter background thread.""" + if self._running: + return + + self._running = True + self._thread = threading.Thread( + target=self._report_loop, + daemon=True, + name=f"{self.component}-health-reporter", + ) + self._thread.start() + logger.info(f"Started health reporter for {self.component}") + + def stop(self) -> None: + """Stop the health reporter.""" + if not self._running: + return + + self._running = False + if self._thread: + self._thread.join(timeout=self.interval + 1) + self._thread = None + + # Clear health status on shutdown + clear_health_status(self.component) + logger.info(f"Stopped health reporter for {self.component}") + + def _report_loop(self) -> None: + """Background loop that reports health status.""" + while self._running: + try: + status_dict = self.status_fn() + status = HealthStatus( + healthy=status_dict.get("healthy", False), + component=self.component, + details=status_dict, + ) + write_health_status(status) + except Exception as e: + logger.error(f"Health report error: {e}") + + # Sleep in small increments to allow quick shutdown + for _ in range(int(self.interval * 10)): + if not self._running: + break + time.sleep(0.1) + + def report_now(self) -> None: + """Report health status immediately.""" + try: + status_dict = self.status_fn() + status = HealthStatus( + healthy=status_dict.get("healthy", False), + component=self.component, + details=status_dict, + ) + write_health_status(status) + except Exception as e: + logger.error(f"Health report error: {e}") diff --git a/src/meshcore_hub/interface/receiver.py b/src/meshcore_hub/interface/receiver.py index b5a7746..623858f 100644 --- a/src/meshcore_hub/interface/receiver.py +++ b/src/meshcore_hub/interface/receiver.py @@ -12,6 +12,7 @@ import threading import time from typing import Any, Optional +from meshcore_hub.common.health import HealthReporter from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig from meshcore_hub.interface.device import ( BaseMeshCoreDevice, @@ -45,6 +46,7 @@ class Receiver: self._shutdown_event = threading.Event() self._device_connected = False self._mqtt_connected = False + self._health_reporter: Optional[HealthReporter] = None @property def is_healthy(self) -> bool: @@ -157,6 +159,14 @@ class Receiver: self._running = True + # Start health reporter for Docker health checks + self._health_reporter = HealthReporter( + component="interface", + status_fn=self.get_health_status, + interval=10.0, + ) + self._health_reporter.start() + def run(self) -> None: """Run the receiver event loop (blocking).""" if not self._running: @@ -181,6 +191,11 @@ class Receiver: self._running = False self._shutdown_event.set() + # Stop health reporter + if self._health_reporter: + self._health_reporter.stop() + self._health_reporter = None + # Stop device self.device.stop() self.device.disconnect() diff --git a/src/meshcore_hub/interface/sender.py b/src/meshcore_hub/interface/sender.py index 53993b7..75f280c 100644 --- a/src/meshcore_hub/interface/sender.py +++ b/src/meshcore_hub/interface/sender.py @@ -12,6 +12,7 @@ import threading import time from typing import Any, Optional +from meshcore_hub.common.health import HealthReporter from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig from meshcore_hub.interface.device import ( BaseMeshCoreDevice, @@ -44,6 +45,7 @@ class Sender: self._shutdown_event = threading.Event() self._device_connected = False self._mqtt_connected = False + self._health_reporter: Optional[HealthReporter] = None @property def is_healthy(self) -> bool: @@ -228,6 +230,14 @@ class Sender: self._running = True + # Start health reporter for Docker health checks + self._health_reporter = HealthReporter( + component="interface", + status_fn=self.get_health_status, + interval=10.0, + ) + self._health_reporter.start() + def run(self) -> None: """Run the sender event loop (blocking).""" if not self._running: @@ -252,6 +262,11 @@ class Sender: self._running = False self._shutdown_event.set() + # Stop health reporter + if self._health_reporter: + self._health_reporter.stop() + self._health_reporter = None + # Stop MQTT self.mqtt.stop() self.mqtt.disconnect() diff --git a/tests/test_collector/test_webhook.py b/tests/test_collector/test_webhook.py new file mode 100644 index 0000000..c4c29aa --- /dev/null +++ b/tests/test_collector/test_webhook.py @@ -0,0 +1,488 @@ +"""Tests for the webhook dispatcher module.""" + +import asyncio +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from meshcore_hub.collector.webhook import ( + WebhookConfig, + WebhookDispatcher, + create_webhook_dispatcher_from_config, + dispatch_event, + get_queued_events, + set_dispatch_callback, +) + + +class TestWebhookConfig: + """Tests for WebhookConfig.""" + + def test_default_values(self): + """Test default configuration values.""" + config = WebhookConfig(url="https://example.com/webhook") + assert config.url == "https://example.com/webhook" + assert config.name == "webhook" + assert config.event_types == [] + assert config.filter_expression is None + assert config.headers == {} + assert config.timeout == 10.0 + assert config.max_retries == 3 + assert config.retry_backoff == 2.0 + assert config.enabled is True + + def test_custom_values(self): + """Test custom configuration values.""" + config = WebhookConfig( + url="https://example.com/webhook", + name="my-webhook", + event_types=["advertisement"], + filter_expression="$.snr > -10", + headers={"X-API-Key": "secret"}, + timeout=5.0, + max_retries=5, + retry_backoff=1.0, + enabled=False, + ) + assert config.name == "my-webhook" + assert config.event_types == ["advertisement"] + assert config.filter_expression == "$.snr > -10" + assert config.headers == {"X-API-Key": "secret"} + assert config.timeout == 5.0 + assert config.max_retries == 5 + assert config.retry_backoff == 1.0 + assert config.enabled is False + + def test_matches_event_no_filters(self): + """Test event matching with no filters.""" + config = WebhookConfig(url="https://example.com/webhook") + assert config.matches_event("advertisement", {"name": "Node1"}) is True + assert config.matches_event("contact_msg_recv", {"text": "Hello"}) is True + + def test_matches_event_type_filter(self): + """Test event matching with event type filter.""" + config = WebhookConfig( + url="https://example.com/webhook", + event_types=["advertisement", "contact_msg_recv"], + ) + assert config.matches_event("advertisement", {}) is True + assert config.matches_event("contact_msg_recv", {}) is True + assert config.matches_event("channel_msg_recv", {}) is False + + def test_matches_event_filter_equals(self): + """Test event matching with equals filter.""" + config = WebhookConfig( + url="https://example.com/webhook", + filter_expression='$.name == "Node1"', + ) + assert config.matches_event("advertisement", {"name": "Node1"}) is True + assert config.matches_event("advertisement", {"name": "Node2"}) is False + + def test_matches_event_filter_not_equals(self): + """Test event matching with not equals filter.""" + config = WebhookConfig( + url="https://example.com/webhook", + filter_expression='$.name != "Node1"', + ) + assert config.matches_event("advertisement", {"name": "Node1"}) is False + assert config.matches_event("advertisement", {"name": "Node2"}) is True + + def test_matches_event_filter_numeric_comparison(self): + """Test event matching with numeric comparisons.""" + config_gt = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.snr > -10", + ) + assert config_gt.matches_event("msg", {"snr": -5}) is True + assert config_gt.matches_event("msg", {"snr": -10}) is False + assert config_gt.matches_event("msg", {"snr": -15}) is False + + config_gte = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.snr >= -10", + ) + assert config_gte.matches_event("msg", {"snr": -10}) is True + + config_lt = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.hops < 5", + ) + assert config_lt.matches_event("msg", {"hops": 3}) is True + assert config_lt.matches_event("msg", {"hops": 5}) is False + + config_lte = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.hops <= 5", + ) + assert config_lte.matches_event("msg", {"hops": 5}) is True + + def test_matches_event_filter_exists(self): + """Test event matching with exists filter.""" + config_exists = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.name exists", + ) + assert config_exists.matches_event("adv", {"name": "Node1"}) is True + assert config_exists.matches_event("adv", {"other": "value"}) is False + + config_not_exists = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.name not exists", + ) + assert config_not_exists.matches_event("adv", {"name": "Node1"}) is False + assert config_not_exists.matches_event("adv", {"other": "value"}) is True + + def test_matches_event_filter_nested_path(self): + """Test event matching with nested path.""" + config = WebhookConfig( + url="https://example.com/webhook", + filter_expression='$.data.type == "sensor"', + ) + assert config.matches_event("event", {"data": {"type": "sensor"}}) is True + assert config.matches_event("event", {"data": {"type": "other"}}) is False + assert config.matches_event("event", {"data": {}}) is False + assert config.matches_event("event", {}) is False + + def test_matches_event_filter_boolean(self): + """Test event matching with boolean values.""" + config = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.active == true", + ) + assert config.matches_event("event", {"active": True}) is True + assert config.matches_event("event", {"active": False}) is False + + def test_matches_event_filter_null(self): + """Test event matching with null value.""" + config = WebhookConfig( + url="https://example.com/webhook", + filter_expression="$.value != null", + ) + assert config.matches_event("event", {"value": "something"}) is True + assert config.matches_event("event", {"value": None}) is False + + def test_matches_event_invalid_filter(self): + """Test event matching with invalid filter expression.""" + config = WebhookConfig( + url="https://example.com/webhook", + filter_expression="invalid expression", + ) + # Invalid expressions should pass through + assert config.matches_event("event", {"any": "data"}) is True + + +class TestWebhookDispatcher: + """Tests for WebhookDispatcher.""" + + @pytest.fixture + def dispatcher(self): + """Create a webhook dispatcher for testing.""" + return WebhookDispatcher() + + @pytest.mark.asyncio + async def test_start_stop(self, dispatcher): + """Test starting and stopping the dispatcher.""" + assert dispatcher.is_running is False + + await dispatcher.start() + assert dispatcher.is_running is True + assert dispatcher._client is not None + + await dispatcher.stop() + assert dispatcher.is_running is False + assert dispatcher._client is None + + def test_add_webhook(self, dispatcher): + """Test adding a webhook.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="test-webhook", + ) + dispatcher.add_webhook(webhook) + assert len(dispatcher.webhooks) == 1 + assert dispatcher.webhooks[0].name == "test-webhook" + + def test_remove_webhook(self, dispatcher): + """Test removing a webhook.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="test-webhook", + ) + dispatcher.add_webhook(webhook) + assert len(dispatcher.webhooks) == 1 + + result = dispatcher.remove_webhook("test-webhook") + assert result is True + assert len(dispatcher.webhooks) == 0 + + result = dispatcher.remove_webhook("nonexistent") + assert result is False + + @pytest.mark.asyncio + async def test_dispatch_not_running(self, dispatcher): + """Test dispatch when dispatcher is not running.""" + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {} + + @pytest.mark.asyncio + async def test_dispatch_no_webhooks(self, dispatcher): + """Test dispatch with no webhooks configured.""" + await dispatcher.start() + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {} + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_success(self, dispatcher): + """Test successful webhook dispatch.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="test-webhook", + ) + dispatcher.add_webhook(webhook) + await dispatcher.start() + + # Mock the HTTP client + mock_response = AsyncMock() + mock_response.status_code = 200 + + with patch.object( + dispatcher._client, "post", return_value=mock_response + ) as mock_post: + result = await dispatcher.dispatch( + "advertisement", + {"name": "Node1"}, + public_key="abc123", + ) + + assert result == {"test-webhook": True} + mock_post.assert_called_once() + call_kwargs = mock_post.call_args.kwargs + assert call_kwargs["json"]["event_type"] == "advertisement" + assert call_kwargs["json"]["payload"]["name"] == "Node1" + assert call_kwargs["json"]["public_key"] == "abc123" + + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_disabled_webhook(self, dispatcher): + """Test dispatch skips disabled webhooks.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="disabled-webhook", + enabled=False, + ) + dispatcher.add_webhook(webhook) + await dispatcher.start() + + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {} + + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_filtered_webhook(self, dispatcher): + """Test dispatch respects event type filter.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="filtered-webhook", + event_types=["advertisement"], + ) + dispatcher.add_webhook(webhook) + await dispatcher.start() + + # This event should not match the filter + result = await dispatcher.dispatch("contact_msg_recv", {"text": "Hello"}) + assert result == {} + + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_http_error(self, dispatcher): + """Test dispatch handles HTTP errors.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="error-webhook", + max_retries=0, # No retries for faster test + ) + dispatcher.add_webhook(webhook) + await dispatcher.start() + + mock_response = AsyncMock() + mock_response.status_code = 500 + + with patch.object( + dispatcher._client, "post", return_value=mock_response + ): + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {"error-webhook": False} + + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_timeout(self, dispatcher): + """Test dispatch handles timeouts.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="timeout-webhook", + max_retries=0, + ) + dispatcher.add_webhook(webhook) + await dispatcher.start() + + with patch.object( + dispatcher._client, + "post", + side_effect=httpx.TimeoutException("Timeout"), + ): + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {"timeout-webhook": False} + + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_retry_success(self, dispatcher): + """Test dispatch retries and eventually succeeds.""" + webhook = WebhookConfig( + url="https://example.com/webhook", + name="retry-webhook", + max_retries=2, + retry_backoff=0.01, # Fast backoff for tests + ) + dispatcher.add_webhook(webhook) + await dispatcher.start() + + mock_response_success = AsyncMock() + mock_response_success.status_code = 200 + + # First call fails, second succeeds + call_count = 0 + + async def mock_post(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise httpx.TimeoutException("Timeout") + return mock_response_success + + with patch.object(dispatcher._client, "post", side_effect=mock_post): + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {"retry-webhook": True} + assert call_count == 2 + + await dispatcher.stop() + + @pytest.mark.asyncio + async def test_dispatch_multiple_webhooks(self, dispatcher): + """Test dispatch to multiple webhooks concurrently.""" + webhook1 = WebhookConfig( + url="https://example.com/webhook1", + name="webhook-1", + ) + webhook2 = WebhookConfig( + url="https://example.com/webhook2", + name="webhook-2", + ) + dispatcher.add_webhook(webhook1) + dispatcher.add_webhook(webhook2) + await dispatcher.start() + + mock_response = AsyncMock() + mock_response.status_code = 200 + + with patch.object(dispatcher._client, "post", return_value=mock_response): + result = await dispatcher.dispatch("event", {"data": "test"}) + assert result == {"webhook-1": True, "webhook-2": True} + + await dispatcher.stop() + + +class TestWebhookDispatcherFactory: + """Tests for create_webhook_dispatcher_from_config.""" + + def test_create_from_config(self): + """Test creating dispatcher from configuration.""" + config = [ + { + "name": "webhook-1", + "url": "https://example.com/webhook1", + "event_types": ["advertisement"], + }, + { + "name": "webhook-2", + "url": "https://example.com/webhook2", + "filter_expression": "$.snr > -10", + "headers": {"X-API-Key": "secret"}, + "timeout": 5.0, + }, + ] + + dispatcher = create_webhook_dispatcher_from_config(config) + assert len(dispatcher.webhooks) == 2 + assert dispatcher.webhooks[0].name == "webhook-1" + assert dispatcher.webhooks[0].event_types == ["advertisement"] + assert dispatcher.webhooks[1].name == "webhook-2" + assert dispatcher.webhooks[1].filter_expression == "$.snr > -10" + assert dispatcher.webhooks[1].headers == {"X-API-Key": "secret"} + assert dispatcher.webhooks[1].timeout == 5.0 + + def test_create_from_config_missing_url(self): + """Test creating dispatcher with missing URL in config.""" + config = [ + { + "name": "invalid-webhook", + # Missing 'url' + }, + ] + + dispatcher = create_webhook_dispatcher_from_config(config) + assert len(dispatcher.webhooks) == 0 + + def test_create_from_empty_config(self): + """Test creating dispatcher from empty config.""" + dispatcher = create_webhook_dispatcher_from_config([]) + assert len(dispatcher.webhooks) == 0 + + +class TestSyncDispatchHelpers: + """Tests for synchronous dispatch helper functions.""" + + def test_queue_events(self): + """Test queuing events for dispatch.""" + # Clear any existing state + get_queued_events() + set_dispatch_callback(None) + + dispatch_event("event1", {"data": 1}, "key1") + dispatch_event("event2", {"data": 2}, "key2") + + events = get_queued_events() + assert len(events) == 2 + assert events[0] == ("event1", {"data": 1}, "key1") + assert events[1] == ("event2", {"data": 2}, "key2") + + # Queue should be cleared + events = get_queued_events() + assert len(events) == 0 + + def test_dispatch_callback(self): + """Test dispatch callback.""" + received_events = [] + + def callback(event_type, payload, public_key): + received_events.append((event_type, payload, public_key)) + + set_dispatch_callback(callback) + + dispatch_event("test_event", {"value": "test"}, "pub_key_123") + + assert len(received_events) == 1 + assert received_events[0] == ("test_event", {"value": "test"}, "pub_key_123") + + # Queue should be empty when using callback + events = get_queued_events() + assert len(events) == 0 + + # Clean up + set_dispatch_callback(None)