Complete remaining tasks: webhook dispatcher and health checks

- Add WebhookDispatcher for sending events to external services
  - Webhook configuration loading from dict config
  - JSONPath-like filter expression support for event filtering
  - Async HTTP POST sending with httpx
  - Retry logic with exponential backoff
  - Comprehensive test suite

- Add health check infrastructure for Interface and Collector
  - HealthReporter class for periodic status file updates
  - CLI commands: meshcore-hub health interface/collector
  - Updated Docker Compose to use CLI health checks
  - File-based health status for non-HTTP components

- Update TASKS.md progress to 99% (218/221 tasks)
  - Remaining 3 tasks are optional (docs/ directory)
This commit is contained in:
Claude
2025-12-03 16:32:05 +00:00
parent 1ed62e26bd
commit 1588f7bc71
9 changed files with 1353 additions and 31 deletions

View File

@@ -267,13 +267,13 @@ This document tracks implementation progress for the MeshCore Hub project. Each
### 3.3 Webhook Dispatcher (Optional based on Q10)
- [ ] Create `collector/webhook.py`:
- [ ] `WebhookDispatcher` class
- [ ] Webhook configuration loading
- [ ] JSONPath filtering support
- [ ] Async HTTP POST sending
- [ ] Retry logic with backoff
- [ ] Error logging
- [x] Create `collector/webhook.py`:
- [x] `WebhookDispatcher` class
- [x] Webhook configuration loading
- [x] JSONPath filtering support
- [x] Async HTTP POST sending
- [x] Retry logic with backoff
- [x] Error logging
### 3.4 Collector CLI
@@ -299,10 +299,10 @@ This document tracks implementation progress for the MeshCore Hub project. Each
- [x] `test_trace.py`
- [x] `test_telemetry.py`
- [x] `test_contacts.py`
- [ ] Create `tests/test_collector/test_webhook.py`:
- [ ] Test webhook dispatching
- [ ] Test JSONPath filtering
- [ ] Test retry logic
- [x] Create `tests/test_collector/test_webhook.py`:
- [x] Test webhook dispatching
- [x] Test JSONPath filtering
- [x] Test retry logic
---
@@ -672,12 +672,12 @@ This document tracks implementation progress for the MeshCore Hub project. Each
- [x] Add health check endpoint to Web:
- [x] `GET /health` - basic health
- [x] `GET /health/ready` - includes API connectivity
- [ ] Add health check to Interface:
- [ ] Device connection status
- [ ] MQTT connection status
- [ ] Add health check to Collector:
- [ ] MQTT connection status
- [ ] Database connection status
- [x] Add health check to Interface:
- [x] Device connection status
- [x] MQTT connection status
- [x] Add health check to Collector:
- [x] MQTT connection status
- [x] Database connection status
### 6.4 Database CLI Commands ✅
@@ -690,13 +690,13 @@ This document tracks implementation progress for the MeshCore Hub project. Each
### 6.5 Documentation
- [ ] Update `README.md`:
- [ ] Project description
- [ ] Quick start guide
- [ ] Docker deployment instructions
- [ ] Manual installation instructions
- [ ] Configuration reference
- [ ] CLI reference
- [x] Update `README.md`:
- [x] Project description
- [x] Quick start guide
- [x] Docker deployment instructions
- [x] Manual installation instructions
- [x] Configuration reference
- [x] CLI reference
- [ ] Create `docs/` directory (optional):
- [ ] Architecture overview
- [ ] API documentation link
@@ -738,11 +738,13 @@ This document tracks implementation progress for the MeshCore Hub project. Each
|-------|-------------|-----------|----------|
| Phase 1: Foundation | 47 | 47 | 100% |
| Phase 2: Interface | 35 | 35 | 100% |
| Phase 3: Collector | 27 | 20 | 74% |
| Phase 3: Collector | 27 | 27 | 100% |
| Phase 4: API | 44 | 44 | 100% |
| Phase 5: Web Dashboard | 40 | 40 | 100% |
| Phase 6: Docker & Deployment | 28 | 24 | 86% |
| **Total** | **221** | **210** | **95%** |
| Phase 6: Docker & Deployment | 28 | 25 | 89% |
| **Total** | **221** | **218** | **99%** |
*Note: Remaining 3 tasks are optional (creating a `docs/` directory).*
---

View File

@@ -71,7 +71,7 @@ services:
- NODE_ADDRESS=${NODE_ADDRESS:-}
command: ["interface", "receiver"]
healthcheck:
test: ["CMD", "pgrep", "-f", "meshcore-hub"]
test: ["CMD", "meshcore-hub", "health", "interface"]
interval: 30s
timeout: 10s
retries: 3
@@ -107,7 +107,7 @@ services:
- NODE_ADDRESS=${NODE_ADDRESS_SENDER:-}
command: ["interface", "sender"]
healthcheck:
test: ["CMD", "pgrep", "-f", "meshcore-hub"]
test: ["CMD", "meshcore-hub", "health", "interface"]
interval: 30s
timeout: 10s
retries: 3
@@ -138,7 +138,7 @@ services:
- NODE_ADDRESS=${NODE_ADDRESS:-0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef}
command: ["interface", "receiver", "--mock"]
healthcheck:
test: ["CMD", "pgrep", "-f", "meshcore-hub"]
test: ["CMD", "meshcore-hub", "health", "interface"]
interval: 30s
timeout: 10s
retries: 3
@@ -172,7 +172,7 @@ services:
- DATABASE_URL=sqlite:////data/meshcore.db
command: ["collector"]
healthcheck:
test: ["CMD", "pgrep", "-f", "meshcore-hub"]
test: ["CMD", "meshcore-hub", "health", "collector"]
interval: 30s
timeout: 10s
retries: 3

View File

@@ -1,10 +1,13 @@
"""MeshCore Hub CLI entry point."""
import sys
import click
from dotenv import load_dotenv
from meshcore_hub import __version__
from meshcore_hub.common.config import LogLevel
from meshcore_hub.common.health import check_health
from meshcore_hub.common.logging import configure_logging
# Load .env file early so Click's envvar parameter picks up values
@@ -171,6 +174,62 @@ def db_history() -> None:
command.history(alembic_cfg)
# Health check commands for Docker HEALTHCHECK
@cli.group()
def health() -> None:
"""Health check commands for component monitoring.
These commands are used by Docker HEALTHCHECK to monitor
container health. Each running component writes its health
status to a file, and these commands verify that status.
"""
pass
@health.command("interface")
@click.option(
"--timeout",
type=int,
default=60,
help="Maximum age of health status in seconds",
)
def health_interface(timeout: int) -> None:
"""Check interface component health status.
Returns exit code 0 if healthy, 1 if not.
"""
is_healthy, message = check_health("interface", stale_threshold=timeout)
if is_healthy:
click.echo(f"Interface health: {message}")
sys.exit(0)
else:
click.echo(f"Interface unhealthy: {message}", err=True)
sys.exit(1)
@health.command("collector")
@click.option(
"--timeout",
type=int,
default=60,
help="Maximum age of health status in seconds",
)
def health_collector(timeout: int) -> None:
"""Check collector component health status.
Returns exit code 0 if healthy, 1 if not.
"""
is_healthy, message = check_health("collector", stale_threshold=timeout)
if is_healthy:
click.echo(f"Collector health: {message}")
sys.exit(0)
else:
click.echo(f"Collector unhealthy: {message}", err=True)
sys.exit(1)
def main() -> None:
"""Main entry point."""
cli()

View File

@@ -14,6 +14,7 @@ import time
from typing import Any, Callable, Optional
from meshcore_hub.common.database import DatabaseManager
from meshcore_hub.common.health import HealthReporter
from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig
logger = logging.getLogger(__name__)
@@ -44,6 +45,7 @@ class Subscriber:
self._handlers: dict[str, EventHandler] = {}
self._mqtt_connected = False
self._db_connected = False
self._health_reporter: Optional[HealthReporter] = None
@property
def is_healthy(self) -> bool:
@@ -147,6 +149,14 @@ class Subscriber:
self._running = True
# Start health reporter for Docker health checks
self._health_reporter = HealthReporter(
component="collector",
status_fn=self.get_health_status,
interval=10.0,
)
self._health_reporter.start()
def run(self) -> None:
"""Run the subscriber event loop (blocking)."""
if not self._running:
@@ -171,6 +181,11 @@ class Subscriber:
self._running = False
self._shutdown_event.set()
# Stop health reporter
if self._health_reporter:
self._health_reporter.stop()
self._health_reporter = None
# Stop MQTT
self.mqtt.stop()
self.mqtt.disconnect()

View File

@@ -0,0 +1,438 @@
"""Webhook Dispatcher for sending events to external services.
The webhook dispatcher:
1. Receives events from the collector
2. Filters events based on JSONPath expressions
3. Sends HTTP POST requests to configured endpoints
4. Implements retry logic with exponential backoff
"""
import asyncio
import logging
import re
from dataclasses import dataclass, field
from typing import Any, Callable, Optional
import httpx
logger = logging.getLogger(__name__)
@dataclass
class WebhookConfig:
"""Configuration for a single webhook endpoint."""
url: str
name: str = "webhook"
event_types: list[str] = field(default_factory=list)
filter_expression: Optional[str] = None
headers: dict[str, str] = field(default_factory=dict)
timeout: float = 10.0
max_retries: int = 3
retry_backoff: float = 2.0
enabled: bool = True
def matches_event(self, event_type: str, payload: dict[str, Any]) -> bool:
"""Check if this webhook should receive the event.
Args:
event_type: Event type name
payload: Event payload
Returns:
True if the event matches this webhook's filters
"""
# Check event type filter
if self.event_types and event_type not in self.event_types:
return False
# Check JSONPath filter expression
if self.filter_expression:
if not self._evaluate_filter(payload):
return False
return True
def _evaluate_filter(self, payload: dict[str, Any]) -> bool:
"""Evaluate a simple JSONPath-like filter expression.
Supports expressions like:
- $.field == "value"
- $.nested.field != null
- $.field exists
- $.field > 10
Args:
payload: Event payload
Returns:
True if the filter matches
"""
if not self.filter_expression:
return True
expr = self.filter_expression.strip()
# Parse expression: $.path operator value
# Supports: ==, !=, >, <, >=, <=, exists, not exists
# Note: >= and <= must come before > and < in the alternation
pattern = r'^\$\.([a-zA-Z0-9_.]+)\s+(==|!=|>=|<=|>|<|exists|not exists)\s*(.*)$'
match = re.match(pattern, expr)
if not match:
logger.warning(f"Invalid filter expression: {expr}")
return True # Pass through if expression is invalid
path = match.group(1)
operator = match.group(2)
value_str = match.group(3).strip() if match.group(3) else None
# Navigate the path
current = payload
for part in path.split('.'):
if isinstance(current, dict) and part in current:
current = current[part]
else:
current = None
break
# Evaluate operator
if operator == "exists":
return current is not None
elif operator == "not exists":
return current is None
elif current is None:
return False
# Parse value for comparison
if value_str is None:
return False
# Handle quoted strings
if value_str.startswith('"') and value_str.endswith('"'):
compare_value: Any = value_str[1:-1]
elif value_str.startswith("'") and value_str.endswith("'"):
compare_value = value_str[1:-1]
elif value_str == "null":
compare_value = None
elif value_str == "true":
compare_value = True
elif value_str == "false":
compare_value = False
else:
try:
compare_value = int(value_str)
except ValueError:
try:
compare_value = float(value_str)
except ValueError:
compare_value = value_str
# Perform comparison
try:
if operator == "==":
return current == compare_value
elif operator == "!=":
return current != compare_value
elif operator == ">":
return current > compare_value
elif operator == "<":
return current < compare_value
elif operator == ">=":
return current >= compare_value
elif operator == "<=":
return current <= compare_value
except TypeError:
return False
return False
class WebhookDispatcher:
"""Dispatches events to webhook endpoints."""
def __init__(self, webhooks: Optional[list[WebhookConfig]] = None):
"""Initialize the webhook dispatcher.
Args:
webhooks: List of webhook configurations
"""
self.webhooks = webhooks or []
self._client: Optional[httpx.AsyncClient] = None
self._running = False
@property
def is_running(self) -> bool:
"""Check if the dispatcher is running."""
return self._running
def add_webhook(self, webhook: WebhookConfig) -> None:
"""Add a webhook configuration.
Args:
webhook: Webhook configuration
"""
self.webhooks.append(webhook)
logger.info(f"Added webhook: {webhook.name} -> {webhook.url}")
def remove_webhook(self, name: str) -> bool:
"""Remove a webhook by name.
Args:
name: Webhook name
Returns:
True if webhook was removed
"""
for i, webhook in enumerate(self.webhooks):
if webhook.name == name:
del self.webhooks[i]
logger.info(f"Removed webhook: {name}")
return True
return False
async def start(self) -> None:
"""Start the webhook dispatcher."""
if self._running:
return
self._client = httpx.AsyncClient()
self._running = True
logger.info(f"Webhook dispatcher started with {len(self.webhooks)} webhooks")
async def stop(self) -> None:
"""Stop the webhook dispatcher."""
if not self._running:
return
self._running = False
if self._client:
await self._client.aclose()
self._client = None
logger.info("Webhook dispatcher stopped")
async def dispatch(
self,
event_type: str,
payload: dict[str, Any],
public_key: Optional[str] = None,
) -> dict[str, bool]:
"""Dispatch an event to all matching webhooks.
Args:
event_type: Event type name
payload: Event payload
public_key: Source node public key
Returns:
Dictionary mapping webhook names to success status
"""
if not self._running or not self.webhooks:
return {}
# Build full event data
event_data = {
"event_type": event_type,
"public_key": public_key,
"payload": payload,
}
results: dict[str, bool] = {}
# Dispatch to all matching webhooks concurrently
tasks = []
for webhook in self.webhooks:
if not webhook.enabled:
continue
if webhook.matches_event(event_type, payload):
tasks.append(self._send_webhook(webhook, event_data))
if tasks:
task_results = await asyncio.gather(*tasks, return_exceptions=True)
for webhook, result in zip(
[w for w in self.webhooks if w.enabled and w.matches_event(event_type, payload)],
task_results,
):
if isinstance(result, Exception):
results[webhook.name] = False
logger.error(f"Webhook {webhook.name} failed: {result}")
else:
results[webhook.name] = result
return results
async def _send_webhook(
self,
webhook: WebhookConfig,
event_data: dict[str, Any],
) -> bool:
"""Send an event to a webhook endpoint with retry logic.
Args:
webhook: Webhook configuration
event_data: Event data to send
Returns:
True if the webhook was sent successfully
"""
if not self._client:
return False
headers = {
"Content-Type": "application/json",
"User-Agent": "MeshCore-Hub/1.0",
**webhook.headers,
}
last_error: Optional[Exception] = None
for attempt in range(webhook.max_retries + 1):
try:
response = await self._client.post(
webhook.url,
json=event_data,
headers=headers,
timeout=webhook.timeout,
)
if response.status_code >= 200 and response.status_code < 300:
logger.debug(
f"Webhook {webhook.name} sent successfully "
f"(status={response.status_code})"
)
return True
else:
logger.warning(
f"Webhook {webhook.name} returned status {response.status_code}"
)
last_error = Exception(f"HTTP {response.status_code}")
except httpx.TimeoutException as e:
logger.warning(f"Webhook {webhook.name} timed out: {e}")
last_error = e
except httpx.RequestError as e:
logger.warning(f"Webhook {webhook.name} request error: {e}")
last_error = e
except Exception as e:
logger.error(f"Webhook {webhook.name} unexpected error: {e}")
last_error = e
# Retry with backoff (but not after the last attempt)
if attempt < webhook.max_retries:
backoff = webhook.retry_backoff * (2 ** attempt)
logger.info(
f"Retrying webhook {webhook.name} in {backoff}s "
f"(attempt {attempt + 2}/{webhook.max_retries + 1})"
)
await asyncio.sleep(backoff)
logger.error(
f"Webhook {webhook.name} failed after {webhook.max_retries + 1} attempts: "
f"{last_error}"
)
return False
def create_webhook_dispatcher_from_config(
config: list[dict[str, Any]],
) -> WebhookDispatcher:
"""Create a webhook dispatcher from configuration.
Args:
config: List of webhook configurations as dicts
Returns:
Configured WebhookDispatcher instance
Example config:
[
{
"name": "my-webhook",
"url": "https://example.com/webhook",
"event_types": ["advertisement", "contact_msg_recv"],
"filter_expression": "$.snr > -10",
"headers": {"X-API-Key": "secret"},
"timeout": 5.0,
"max_retries": 3,
"retry_backoff": 2.0,
"enabled": true
}
]
"""
webhooks = []
for item in config:
try:
webhook = WebhookConfig(
url=item["url"],
name=item.get("name", "webhook"),
event_types=item.get("event_types", []),
filter_expression=item.get("filter_expression"),
headers=item.get("headers", {}),
timeout=item.get("timeout", 10.0),
max_retries=item.get("max_retries", 3),
retry_backoff=item.get("retry_backoff", 2.0),
enabled=item.get("enabled", True),
)
webhooks.append(webhook)
logger.info(f"Loaded webhook config: {webhook.name}")
except KeyError as e:
logger.error(f"Invalid webhook config (missing {e}): {item}")
except Exception as e:
logger.error(f"Failed to load webhook config: {e}")
return WebhookDispatcher(webhooks)
# Synchronous wrapper for use in non-async handlers
_dispatcher: Optional[WebhookDispatcher] = None
_dispatch_queue: list[tuple[str, dict[str, Any], Optional[str]]] = []
_dispatch_callback: Optional[Callable[[str, dict[str, Any], Optional[str]], None]] = None
def set_dispatch_callback(
callback: Optional[Callable[[str, dict[str, Any], Optional[str]], None]]
) -> None:
"""Set a callback for synchronous webhook dispatch.
This allows the collector to integrate webhook dispatching into its
event handling loop without requiring async handlers.
Args:
callback: Function to call with (event_type, payload, public_key)
"""
global _dispatch_callback
_dispatch_callback = callback
def dispatch_event(
event_type: str,
payload: dict[str, Any],
public_key: Optional[str] = None,
) -> None:
"""Queue an event for webhook dispatch.
This is a synchronous wrapper for use in non-async handlers.
Events are queued and should be processed by an async loop.
Args:
event_type: Event type name
payload: Event payload
public_key: Source node public key
"""
if _dispatch_callback:
_dispatch_callback(event_type, payload, public_key)
else:
_dispatch_queue.append((event_type, payload, public_key))
def get_queued_events() -> list[tuple[str, dict[str, Any], Optional[str]]]:
"""Get and clear queued events.
Returns:
List of (event_type, payload, public_key) tuples
"""
global _dispatch_queue
events = _dispatch_queue.copy()
_dispatch_queue = []
return events

View File

@@ -0,0 +1,290 @@
"""Health check utilities for MeshCore Hub components.
This module provides utilities for:
1. Writing health status to a file (for Docker HEALTHCHECK)
2. Reading health status from a file (for health check commands)
3. Running periodic health updates
"""
import json
import logging
import os
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
# Default health file locations
DEFAULT_HEALTH_DIR = "/tmp/meshcore-hub"
DEFAULT_HEALTH_FILE_INTERFACE = "interface-health.json"
DEFAULT_HEALTH_FILE_COLLECTOR = "collector-health.json"
# Health status is considered stale after this many seconds
HEALTH_STALE_THRESHOLD = 60
@dataclass
class HealthStatus:
"""Health status data structure."""
healthy: bool
component: str
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary."""
return {
"healthy": self.healthy,
"component": self.component,
"timestamp": self.timestamp,
"details": self.details,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "HealthStatus":
"""Create from dictionary."""
return cls(
healthy=data.get("healthy", False),
component=data.get("component", "unknown"),
timestamp=data.get("timestamp", ""),
details=data.get("details", {}),
)
def is_stale(self, threshold_seconds: int = HEALTH_STALE_THRESHOLD) -> bool:
"""Check if this health status is stale.
Args:
threshold_seconds: Maximum age in seconds
Returns:
True if the status is older than threshold
"""
try:
status_time = datetime.fromisoformat(self.timestamp.replace("Z", "+00:00"))
age = (datetime.now(timezone.utc) - status_time).total_seconds()
return age > threshold_seconds
except (ValueError, TypeError):
return True
def get_health_dir() -> Path:
"""Get the health directory path.
Returns:
Path to health directory
"""
health_dir = os.environ.get("HEALTH_DIR", DEFAULT_HEALTH_DIR)
return Path(health_dir)
def get_health_file(component: str) -> Path:
"""Get the health file path for a component.
Args:
component: Component name (interface, collector)
Returns:
Path to health file
"""
health_dir = get_health_dir()
if component == "interface":
return health_dir / DEFAULT_HEALTH_FILE_INTERFACE
elif component == "collector":
return health_dir / DEFAULT_HEALTH_FILE_COLLECTOR
else:
return health_dir / f"{component}-health.json"
def write_health_status(status: HealthStatus) -> bool:
"""Write health status to file.
Args:
status: Health status to write
Returns:
True if write was successful
"""
health_file = get_health_file(status.component)
try:
# Ensure directory exists
health_file.parent.mkdir(parents=True, exist_ok=True)
# Write status atomically
temp_file = health_file.with_suffix(".tmp")
with open(temp_file, "w") as f:
json.dump(status.to_dict(), f)
temp_file.replace(health_file)
return True
except Exception as e:
logger.error(f"Failed to write health status: {e}")
return False
def read_health_status(component: str) -> Optional[HealthStatus]:
"""Read health status from file.
Args:
component: Component name
Returns:
Health status or None if not available
"""
health_file = get_health_file(component)
try:
if not health_file.exists():
return None
with open(health_file) as f:
data = json.load(f)
return HealthStatus.from_dict(data)
except Exception as e:
logger.error(f"Failed to read health status: {e}")
return None
def check_health(component: str, stale_threshold: int = HEALTH_STALE_THRESHOLD) -> tuple[bool, str]:
"""Check health status for a component.
Args:
component: Component name
stale_threshold: Maximum age of health status in seconds
Returns:
Tuple of (is_healthy, message)
"""
status = read_health_status(component)
if status is None:
return False, f"No health status found for {component}"
if status.is_stale(stale_threshold):
return False, f"Health status is stale (older than {stale_threshold}s)"
if not status.healthy:
details = status.details
reasons = []
for key, value in details.items():
if key.endswith("_connected") and value is False:
reasons.append(f"{key.replace('_', ' ')}")
elif key == "running" and value is False:
reasons.append("not running")
reason_str = ", ".join(reasons) if reasons else "unhealthy"
return False, f"Component is {reason_str}"
return True, "healthy"
def clear_health_status(component: str) -> bool:
"""Remove health status file.
Args:
component: Component name
Returns:
True if file was removed or didn't exist
"""
health_file = get_health_file(component)
try:
if health_file.exists():
health_file.unlink()
return True
except Exception as e:
logger.error(f"Failed to clear health status: {e}")
return False
class HealthReporter:
"""Background health reporter that periodically updates health status."""
def __init__(
self,
component: str,
status_fn: Callable[[], dict[str, Any]],
interval: float = 10.0,
):
"""Initialize health reporter.
Args:
component: Component name
status_fn: Function that returns health status dict
Should return a dict with at least 'healthy' key
interval: Update interval in seconds
"""
self.component = component
self.status_fn = status_fn
self.interval = interval
self._running = False
self._thread: Optional[threading.Thread] = None
def start(self) -> None:
"""Start the health reporter background thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._report_loop,
daemon=True,
name=f"{self.component}-health-reporter",
)
self._thread.start()
logger.info(f"Started health reporter for {self.component}")
def stop(self) -> None:
"""Stop the health reporter."""
if not self._running:
return
self._running = False
if self._thread:
self._thread.join(timeout=self.interval + 1)
self._thread = None
# Clear health status on shutdown
clear_health_status(self.component)
logger.info(f"Stopped health reporter for {self.component}")
def _report_loop(self) -> None:
"""Background loop that reports health status."""
while self._running:
try:
status_dict = self.status_fn()
status = HealthStatus(
healthy=status_dict.get("healthy", False),
component=self.component,
details=status_dict,
)
write_health_status(status)
except Exception as e:
logger.error(f"Health report error: {e}")
# Sleep in small increments to allow quick shutdown
for _ in range(int(self.interval * 10)):
if not self._running:
break
time.sleep(0.1)
def report_now(self) -> None:
"""Report health status immediately."""
try:
status_dict = self.status_fn()
status = HealthStatus(
healthy=status_dict.get("healthy", False),
component=self.component,
details=status_dict,
)
write_health_status(status)
except Exception as e:
logger.error(f"Health report error: {e}")

View File

@@ -12,6 +12,7 @@ import threading
import time
from typing import Any, Optional
from meshcore_hub.common.health import HealthReporter
from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig
from meshcore_hub.interface.device import (
BaseMeshCoreDevice,
@@ -45,6 +46,7 @@ class Receiver:
self._shutdown_event = threading.Event()
self._device_connected = False
self._mqtt_connected = False
self._health_reporter: Optional[HealthReporter] = None
@property
def is_healthy(self) -> bool:
@@ -157,6 +159,14 @@ class Receiver:
self._running = True
# Start health reporter for Docker health checks
self._health_reporter = HealthReporter(
component="interface",
status_fn=self.get_health_status,
interval=10.0,
)
self._health_reporter.start()
def run(self) -> None:
"""Run the receiver event loop (blocking)."""
if not self._running:
@@ -181,6 +191,11 @@ class Receiver:
self._running = False
self._shutdown_event.set()
# Stop health reporter
if self._health_reporter:
self._health_reporter.stop()
self._health_reporter = None
# Stop device
self.device.stop()
self.device.disconnect()

View File

@@ -12,6 +12,7 @@ import threading
import time
from typing import Any, Optional
from meshcore_hub.common.health import HealthReporter
from meshcore_hub.common.mqtt import MQTTClient, MQTTConfig
from meshcore_hub.interface.device import (
BaseMeshCoreDevice,
@@ -44,6 +45,7 @@ class Sender:
self._shutdown_event = threading.Event()
self._device_connected = False
self._mqtt_connected = False
self._health_reporter: Optional[HealthReporter] = None
@property
def is_healthy(self) -> bool:
@@ -228,6 +230,14 @@ class Sender:
self._running = True
# Start health reporter for Docker health checks
self._health_reporter = HealthReporter(
component="interface",
status_fn=self.get_health_status,
interval=10.0,
)
self._health_reporter.start()
def run(self) -> None:
"""Run the sender event loop (blocking)."""
if not self._running:
@@ -252,6 +262,11 @@ class Sender:
self._running = False
self._shutdown_event.set()
# Stop health reporter
if self._health_reporter:
self._health_reporter.stop()
self._health_reporter = None
# Stop MQTT
self.mqtt.stop()
self.mqtt.disconnect()

View File

@@ -0,0 +1,488 @@
"""Tests for the webhook dispatcher module."""
import asyncio
from unittest.mock import AsyncMock, patch
import httpx
import pytest
from meshcore_hub.collector.webhook import (
WebhookConfig,
WebhookDispatcher,
create_webhook_dispatcher_from_config,
dispatch_event,
get_queued_events,
set_dispatch_callback,
)
class TestWebhookConfig:
"""Tests for WebhookConfig."""
def test_default_values(self):
"""Test default configuration values."""
config = WebhookConfig(url="https://example.com/webhook")
assert config.url == "https://example.com/webhook"
assert config.name == "webhook"
assert config.event_types == []
assert config.filter_expression is None
assert config.headers == {}
assert config.timeout == 10.0
assert config.max_retries == 3
assert config.retry_backoff == 2.0
assert config.enabled is True
def test_custom_values(self):
"""Test custom configuration values."""
config = WebhookConfig(
url="https://example.com/webhook",
name="my-webhook",
event_types=["advertisement"],
filter_expression="$.snr > -10",
headers={"X-API-Key": "secret"},
timeout=5.0,
max_retries=5,
retry_backoff=1.0,
enabled=False,
)
assert config.name == "my-webhook"
assert config.event_types == ["advertisement"]
assert config.filter_expression == "$.snr > -10"
assert config.headers == {"X-API-Key": "secret"}
assert config.timeout == 5.0
assert config.max_retries == 5
assert config.retry_backoff == 1.0
assert config.enabled is False
def test_matches_event_no_filters(self):
"""Test event matching with no filters."""
config = WebhookConfig(url="https://example.com/webhook")
assert config.matches_event("advertisement", {"name": "Node1"}) is True
assert config.matches_event("contact_msg_recv", {"text": "Hello"}) is True
def test_matches_event_type_filter(self):
"""Test event matching with event type filter."""
config = WebhookConfig(
url="https://example.com/webhook",
event_types=["advertisement", "contact_msg_recv"],
)
assert config.matches_event("advertisement", {}) is True
assert config.matches_event("contact_msg_recv", {}) is True
assert config.matches_event("channel_msg_recv", {}) is False
def test_matches_event_filter_equals(self):
"""Test event matching with equals filter."""
config = WebhookConfig(
url="https://example.com/webhook",
filter_expression='$.name == "Node1"',
)
assert config.matches_event("advertisement", {"name": "Node1"}) is True
assert config.matches_event("advertisement", {"name": "Node2"}) is False
def test_matches_event_filter_not_equals(self):
"""Test event matching with not equals filter."""
config = WebhookConfig(
url="https://example.com/webhook",
filter_expression='$.name != "Node1"',
)
assert config.matches_event("advertisement", {"name": "Node1"}) is False
assert config.matches_event("advertisement", {"name": "Node2"}) is True
def test_matches_event_filter_numeric_comparison(self):
"""Test event matching with numeric comparisons."""
config_gt = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.snr > -10",
)
assert config_gt.matches_event("msg", {"snr": -5}) is True
assert config_gt.matches_event("msg", {"snr": -10}) is False
assert config_gt.matches_event("msg", {"snr": -15}) is False
config_gte = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.snr >= -10",
)
assert config_gte.matches_event("msg", {"snr": -10}) is True
config_lt = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.hops < 5",
)
assert config_lt.matches_event("msg", {"hops": 3}) is True
assert config_lt.matches_event("msg", {"hops": 5}) is False
config_lte = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.hops <= 5",
)
assert config_lte.matches_event("msg", {"hops": 5}) is True
def test_matches_event_filter_exists(self):
"""Test event matching with exists filter."""
config_exists = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.name exists",
)
assert config_exists.matches_event("adv", {"name": "Node1"}) is True
assert config_exists.matches_event("adv", {"other": "value"}) is False
config_not_exists = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.name not exists",
)
assert config_not_exists.matches_event("adv", {"name": "Node1"}) is False
assert config_not_exists.matches_event("adv", {"other": "value"}) is True
def test_matches_event_filter_nested_path(self):
"""Test event matching with nested path."""
config = WebhookConfig(
url="https://example.com/webhook",
filter_expression='$.data.type == "sensor"',
)
assert config.matches_event("event", {"data": {"type": "sensor"}}) is True
assert config.matches_event("event", {"data": {"type": "other"}}) is False
assert config.matches_event("event", {"data": {}}) is False
assert config.matches_event("event", {}) is False
def test_matches_event_filter_boolean(self):
"""Test event matching with boolean values."""
config = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.active == true",
)
assert config.matches_event("event", {"active": True}) is True
assert config.matches_event("event", {"active": False}) is False
def test_matches_event_filter_null(self):
"""Test event matching with null value."""
config = WebhookConfig(
url="https://example.com/webhook",
filter_expression="$.value != null",
)
assert config.matches_event("event", {"value": "something"}) is True
assert config.matches_event("event", {"value": None}) is False
def test_matches_event_invalid_filter(self):
"""Test event matching with invalid filter expression."""
config = WebhookConfig(
url="https://example.com/webhook",
filter_expression="invalid expression",
)
# Invalid expressions should pass through
assert config.matches_event("event", {"any": "data"}) is True
class TestWebhookDispatcher:
"""Tests for WebhookDispatcher."""
@pytest.fixture
def dispatcher(self):
"""Create a webhook dispatcher for testing."""
return WebhookDispatcher()
@pytest.mark.asyncio
async def test_start_stop(self, dispatcher):
"""Test starting and stopping the dispatcher."""
assert dispatcher.is_running is False
await dispatcher.start()
assert dispatcher.is_running is True
assert dispatcher._client is not None
await dispatcher.stop()
assert dispatcher.is_running is False
assert dispatcher._client is None
def test_add_webhook(self, dispatcher):
"""Test adding a webhook."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="test-webhook",
)
dispatcher.add_webhook(webhook)
assert len(dispatcher.webhooks) == 1
assert dispatcher.webhooks[0].name == "test-webhook"
def test_remove_webhook(self, dispatcher):
"""Test removing a webhook."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="test-webhook",
)
dispatcher.add_webhook(webhook)
assert len(dispatcher.webhooks) == 1
result = dispatcher.remove_webhook("test-webhook")
assert result is True
assert len(dispatcher.webhooks) == 0
result = dispatcher.remove_webhook("nonexistent")
assert result is False
@pytest.mark.asyncio
async def test_dispatch_not_running(self, dispatcher):
"""Test dispatch when dispatcher is not running."""
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {}
@pytest.mark.asyncio
async def test_dispatch_no_webhooks(self, dispatcher):
"""Test dispatch with no webhooks configured."""
await dispatcher.start()
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {}
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_success(self, dispatcher):
"""Test successful webhook dispatch."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="test-webhook",
)
dispatcher.add_webhook(webhook)
await dispatcher.start()
# Mock the HTTP client
mock_response = AsyncMock()
mock_response.status_code = 200
with patch.object(
dispatcher._client, "post", return_value=mock_response
) as mock_post:
result = await dispatcher.dispatch(
"advertisement",
{"name": "Node1"},
public_key="abc123",
)
assert result == {"test-webhook": True}
mock_post.assert_called_once()
call_kwargs = mock_post.call_args.kwargs
assert call_kwargs["json"]["event_type"] == "advertisement"
assert call_kwargs["json"]["payload"]["name"] == "Node1"
assert call_kwargs["json"]["public_key"] == "abc123"
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_disabled_webhook(self, dispatcher):
"""Test dispatch skips disabled webhooks."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="disabled-webhook",
enabled=False,
)
dispatcher.add_webhook(webhook)
await dispatcher.start()
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {}
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_filtered_webhook(self, dispatcher):
"""Test dispatch respects event type filter."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="filtered-webhook",
event_types=["advertisement"],
)
dispatcher.add_webhook(webhook)
await dispatcher.start()
# This event should not match the filter
result = await dispatcher.dispatch("contact_msg_recv", {"text": "Hello"})
assert result == {}
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_http_error(self, dispatcher):
"""Test dispatch handles HTTP errors."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="error-webhook",
max_retries=0, # No retries for faster test
)
dispatcher.add_webhook(webhook)
await dispatcher.start()
mock_response = AsyncMock()
mock_response.status_code = 500
with patch.object(
dispatcher._client, "post", return_value=mock_response
):
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {"error-webhook": False}
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_timeout(self, dispatcher):
"""Test dispatch handles timeouts."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="timeout-webhook",
max_retries=0,
)
dispatcher.add_webhook(webhook)
await dispatcher.start()
with patch.object(
dispatcher._client,
"post",
side_effect=httpx.TimeoutException("Timeout"),
):
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {"timeout-webhook": False}
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_retry_success(self, dispatcher):
"""Test dispatch retries and eventually succeeds."""
webhook = WebhookConfig(
url="https://example.com/webhook",
name="retry-webhook",
max_retries=2,
retry_backoff=0.01, # Fast backoff for tests
)
dispatcher.add_webhook(webhook)
await dispatcher.start()
mock_response_success = AsyncMock()
mock_response_success.status_code = 200
# First call fails, second succeeds
call_count = 0
async def mock_post(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
raise httpx.TimeoutException("Timeout")
return mock_response_success
with patch.object(dispatcher._client, "post", side_effect=mock_post):
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {"retry-webhook": True}
assert call_count == 2
await dispatcher.stop()
@pytest.mark.asyncio
async def test_dispatch_multiple_webhooks(self, dispatcher):
"""Test dispatch to multiple webhooks concurrently."""
webhook1 = WebhookConfig(
url="https://example.com/webhook1",
name="webhook-1",
)
webhook2 = WebhookConfig(
url="https://example.com/webhook2",
name="webhook-2",
)
dispatcher.add_webhook(webhook1)
dispatcher.add_webhook(webhook2)
await dispatcher.start()
mock_response = AsyncMock()
mock_response.status_code = 200
with patch.object(dispatcher._client, "post", return_value=mock_response):
result = await dispatcher.dispatch("event", {"data": "test"})
assert result == {"webhook-1": True, "webhook-2": True}
await dispatcher.stop()
class TestWebhookDispatcherFactory:
"""Tests for create_webhook_dispatcher_from_config."""
def test_create_from_config(self):
"""Test creating dispatcher from configuration."""
config = [
{
"name": "webhook-1",
"url": "https://example.com/webhook1",
"event_types": ["advertisement"],
},
{
"name": "webhook-2",
"url": "https://example.com/webhook2",
"filter_expression": "$.snr > -10",
"headers": {"X-API-Key": "secret"},
"timeout": 5.0,
},
]
dispatcher = create_webhook_dispatcher_from_config(config)
assert len(dispatcher.webhooks) == 2
assert dispatcher.webhooks[0].name == "webhook-1"
assert dispatcher.webhooks[0].event_types == ["advertisement"]
assert dispatcher.webhooks[1].name == "webhook-2"
assert dispatcher.webhooks[1].filter_expression == "$.snr > -10"
assert dispatcher.webhooks[1].headers == {"X-API-Key": "secret"}
assert dispatcher.webhooks[1].timeout == 5.0
def test_create_from_config_missing_url(self):
"""Test creating dispatcher with missing URL in config."""
config = [
{
"name": "invalid-webhook",
# Missing 'url'
},
]
dispatcher = create_webhook_dispatcher_from_config(config)
assert len(dispatcher.webhooks) == 0
def test_create_from_empty_config(self):
"""Test creating dispatcher from empty config."""
dispatcher = create_webhook_dispatcher_from_config([])
assert len(dispatcher.webhooks) == 0
class TestSyncDispatchHelpers:
"""Tests for synchronous dispatch helper functions."""
def test_queue_events(self):
"""Test queuing events for dispatch."""
# Clear any existing state
get_queued_events()
set_dispatch_callback(None)
dispatch_event("event1", {"data": 1}, "key1")
dispatch_event("event2", {"data": 2}, "key2")
events = get_queued_events()
assert len(events) == 2
assert events[0] == ("event1", {"data": 1}, "key1")
assert events[1] == ("event2", {"data": 2}, "key2")
# Queue should be cleared
events = get_queued_events()
assert len(events) == 0
def test_dispatch_callback(self):
"""Test dispatch callback."""
received_events = []
def callback(event_type, payload, public_key):
received_events.append((event_type, payload, public_key))
set_dispatch_callback(callback)
dispatch_event("test_event", {"value": "test"}, "pub_key_123")
assert len(received_events) == 1
assert received_events[0] == ("test_event", {"value": "test"}, "pub_key_123")
# Queue should be empty when using callback
events = get_queued_events()
assert len(events) == 0
# Clean up
set_dispatch_callback(None)