mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-06-28 05:51:36 +02:00
Added data retention and node cleanup
This commit is contained in:
@@ -141,3 +141,35 @@ WEBHOOK_MESSAGE_SECRET=
|
||||
WEBHOOK_TIMEOUT=10.0
|
||||
WEBHOOK_MAX_RETRIES=3
|
||||
WEBHOOK_RETRY_BACKOFF=2.0
|
||||
|
||||
# ===================
|
||||
# Data Retention Settings
|
||||
# ===================
|
||||
|
||||
# Enable automatic cleanup of old event data
|
||||
# When enabled, the collector runs periodic cleanup to delete old events
|
||||
# Default: true
|
||||
DATA_RETENTION_ENABLED=true
|
||||
|
||||
# Number of days to retain event data (advertisements, messages, telemetry, etc.)
|
||||
# Events older than this are deleted during cleanup
|
||||
# Default: 30 days
|
||||
DATA_RETENTION_DAYS=30
|
||||
|
||||
# Hours between automatic cleanup runs (applies to both events and nodes)
|
||||
# Default: 24 hours (once per day)
|
||||
DATA_RETENTION_INTERVAL_HOURS=24
|
||||
|
||||
# ===================
|
||||
# Node Cleanup Settings
|
||||
# ===================
|
||||
|
||||
# Enable automatic cleanup of inactive nodes
|
||||
# Nodes that haven't been seen (last_seen) for the specified period are removed
|
||||
# Nodes with last_seen=NULL (never seen on network) are NOT removed
|
||||
# Default: true
|
||||
NODE_CLEANUP_ENABLED=true
|
||||
|
||||
# Remove nodes not seen for this many days (based on last_seen field)
|
||||
# Default: 7 days
|
||||
NODE_CLEANUP_DAYS=7
|
||||
|
||||
@@ -264,6 +264,7 @@ meshcore-hub/
|
||||
│ ├── collector/
|
||||
│ │ ├── cli.py # Collector CLI with seed commands
|
||||
│ │ ├── subscriber.py # MQTT subscriber
|
||||
│ │ ├── cleanup.py # Data retention/cleanup service
|
||||
│ │ ├── tag_import.py # Tag import from YAML
|
||||
│ │ ├── member_import.py # Member import from YAML
|
||||
│ │ ├── handlers/ # Event handlers
|
||||
@@ -502,6 +503,48 @@ The collector supports forwarding events to external HTTP endpoints:
|
||||
| `WEBHOOK_MAX_RETRIES` | Max retries on failure (default: 3) |
|
||||
| `WEBHOOK_RETRY_BACKOFF` | Exponential backoff multiplier (default: 2.0) |
|
||||
|
||||
### Data Retention / Cleanup Configuration
|
||||
|
||||
The collector supports automatic cleanup of old event data and inactive nodes:
|
||||
|
||||
**Event Data Cleanup:**
|
||||
|
||||
| Variable | Description |
|
||||
|----------|-------------|
|
||||
| `DATA_RETENTION_ENABLED` | Enable automatic event data cleanup (default: true) |
|
||||
| `DATA_RETENTION_DAYS` | Days to retain event data (default: 30) |
|
||||
| `DATA_RETENTION_INTERVAL_HOURS` | Hours between cleanup runs (default: 24) |
|
||||
|
||||
When enabled, the collector automatically deletes event data older than the retention period:
|
||||
- Advertisements
|
||||
- Messages (channel and direct)
|
||||
- Telemetry
|
||||
- Trace paths
|
||||
- Event logs
|
||||
|
||||
**Node Cleanup:**
|
||||
|
||||
| Variable | Description |
|
||||
|----------|-------------|
|
||||
| `NODE_CLEANUP_ENABLED` | Enable automatic cleanup of inactive nodes (default: true) |
|
||||
| `NODE_CLEANUP_DAYS` | Remove nodes not seen for this many days (default: 7) |
|
||||
|
||||
When enabled, the collector automatically removes nodes where:
|
||||
- `last_seen` is older than the configured number of days
|
||||
- Nodes with `last_seen=NULL` (never seen on network) are **NOT** removed
|
||||
- Nodes created via tag import that have never been seen on the mesh are preserved
|
||||
|
||||
**Note:** Both event data and node cleanup run on the same schedule (DATA_RETENTION_INTERVAL_HOURS).
|
||||
|
||||
Manual cleanup can be triggered at any time with:
|
||||
```bash
|
||||
# Dry run to see what would be deleted
|
||||
meshcore-hub collector cleanup --retention-days 30 --dry-run
|
||||
|
||||
# Live cleanup
|
||||
meshcore-hub collector cleanup --retention-days 30
|
||||
```
|
||||
|
||||
Webhook payload structure:
|
||||
```json
|
||||
{
|
||||
|
||||
@@ -161,6 +161,12 @@ services:
|
||||
- WEBHOOK_TIMEOUT=${WEBHOOK_TIMEOUT:-10.0}
|
||||
- WEBHOOK_MAX_RETRIES=${WEBHOOK_MAX_RETRIES:-3}
|
||||
- WEBHOOK_RETRY_BACKOFF=${WEBHOOK_RETRY_BACKOFF:-2.0}
|
||||
# Data retention and cleanup configuration
|
||||
- DATA_RETENTION_ENABLED=${DATA_RETENTION_ENABLED:-true}
|
||||
- DATA_RETENTION_DAYS=${DATA_RETENTION_DAYS:-30}
|
||||
- DATA_RETENTION_INTERVAL_HOURS=${DATA_RETENTION_INTERVAL_HOURS:-24}
|
||||
- NODE_CLEANUP_ENABLED=${NODE_CLEANUP_ENABLED:-true}
|
||||
- NODE_CLEANUP_DAYS=${NODE_CLEANUP_DAYS:-7}
|
||||
command: ["collector"]
|
||||
healthcheck:
|
||||
test: ["CMD", "meshcore-hub", "health", "collector"]
|
||||
|
||||
@@ -0,0 +1,225 @@
|
||||
"""Data retention and cleanup service for MeshCore Hub.
|
||||
|
||||
This module provides functionality to delete old event data and inactive nodes
|
||||
based on configured retention policies.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from sqlalchemy import delete, func, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from meshcore_hub.common.models import (
|
||||
Advertisement,
|
||||
EventLog,
|
||||
Message,
|
||||
Node,
|
||||
Telemetry,
|
||||
TracePath,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CleanupStats:
|
||||
"""Statistics from a cleanup operation."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.advertisements_deleted: int = 0
|
||||
self.messages_deleted: int = 0
|
||||
self.telemetry_deleted: int = 0
|
||||
self.trace_paths_deleted: int = 0
|
||||
self.event_logs_deleted: int = 0
|
||||
self.nodes_deleted: int = 0
|
||||
self.total_deleted: int = 0
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"CleanupStats(total={self.total_deleted}, "
|
||||
f"advertisements={self.advertisements_deleted}, "
|
||||
f"messages={self.messages_deleted}, "
|
||||
f"telemetry={self.telemetry_deleted}, "
|
||||
f"trace_paths={self.trace_paths_deleted}, "
|
||||
f"event_logs={self.event_logs_deleted}, "
|
||||
f"nodes={self.nodes_deleted})"
|
||||
)
|
||||
|
||||
|
||||
async def cleanup_old_data(
|
||||
db: AsyncSession,
|
||||
retention_days: int,
|
||||
dry_run: bool = False,
|
||||
) -> CleanupStats:
|
||||
"""Delete event data older than the retention period.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
retention_days: Number of days to retain data
|
||||
dry_run: If True, only count records without deleting
|
||||
|
||||
Returns:
|
||||
CleanupStats object with deletion counts
|
||||
"""
|
||||
stats = CleanupStats()
|
||||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=retention_days)
|
||||
|
||||
logger.info(
|
||||
"Starting data cleanup (dry_run=%s, retention_days=%d, cutoff=%s)",
|
||||
dry_run,
|
||||
retention_days,
|
||||
cutoff_date.isoformat(),
|
||||
)
|
||||
|
||||
# Clean up advertisements
|
||||
stats.advertisements_deleted = await _cleanup_table(
|
||||
db, Advertisement, cutoff_date, "advertisements", dry_run
|
||||
)
|
||||
|
||||
# Clean up messages
|
||||
stats.messages_deleted = await _cleanup_table(
|
||||
db, Message, cutoff_date, "messages", dry_run
|
||||
)
|
||||
|
||||
# Clean up telemetry
|
||||
stats.telemetry_deleted = await _cleanup_table(
|
||||
db, Telemetry, cutoff_date, "telemetry", dry_run
|
||||
)
|
||||
|
||||
# Clean up trace paths
|
||||
stats.trace_paths_deleted = await _cleanup_table(
|
||||
db, TracePath, cutoff_date, "trace_paths", dry_run
|
||||
)
|
||||
|
||||
# Clean up event logs
|
||||
stats.event_logs_deleted = await _cleanup_table(
|
||||
db, EventLog, cutoff_date, "event_logs", dry_run
|
||||
)
|
||||
|
||||
stats.total_deleted = (
|
||||
stats.advertisements_deleted
|
||||
+ stats.messages_deleted
|
||||
+ stats.telemetry_deleted
|
||||
+ stats.trace_paths_deleted
|
||||
+ stats.event_logs_deleted
|
||||
)
|
||||
|
||||
if not dry_run:
|
||||
await db.commit()
|
||||
logger.info("Cleanup completed: %s", stats)
|
||||
else:
|
||||
logger.info("Cleanup dry run completed: %s", stats)
|
||||
|
||||
return stats
|
||||
|
||||
|
||||
async def _cleanup_table(
|
||||
db: AsyncSession,
|
||||
model: type,
|
||||
cutoff_date: datetime,
|
||||
table_name: str,
|
||||
dry_run: bool,
|
||||
) -> int:
|
||||
"""Delete old records from a specific table.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
model: SQLAlchemy model class
|
||||
cutoff_date: Delete records older than this date
|
||||
table_name: Name of table for logging
|
||||
dry_run: If True, only count without deleting
|
||||
|
||||
Returns:
|
||||
Number of records deleted (or would be deleted in dry_run)
|
||||
"""
|
||||
from sqlalchemy import select
|
||||
|
||||
if dry_run:
|
||||
# Count records that would be deleted
|
||||
stmt = (
|
||||
select(func.count())
|
||||
.select_from(model)
|
||||
.where(model.created_at < cutoff_date) # type: ignore[attr-defined]
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
count = result.scalar() or 0
|
||||
logger.debug(
|
||||
"[DRY RUN] Would delete %d records from %s older than %s",
|
||||
count,
|
||||
table_name,
|
||||
cutoff_date.isoformat(),
|
||||
)
|
||||
return count
|
||||
else:
|
||||
# Delete old records
|
||||
result = await db.execute(delete(model).where(model.created_at < cutoff_date)) # type: ignore[attr-defined]
|
||||
count = result.rowcount or 0 # type: ignore[attr-defined]
|
||||
logger.debug(
|
||||
"Deleted %d records from %s older than %s",
|
||||
count,
|
||||
table_name,
|
||||
cutoff_date.isoformat(),
|
||||
)
|
||||
return count
|
||||
|
||||
|
||||
async def cleanup_inactive_nodes(
|
||||
db: AsyncSession,
|
||||
inactivity_days: int,
|
||||
dry_run: bool = False,
|
||||
) -> int:
|
||||
"""Delete nodes that haven't been seen for the specified number of days.
|
||||
|
||||
Only deletes nodes where last_seen is older than the cutoff date.
|
||||
Nodes with last_seen=NULL are NOT deleted (never seen on network).
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
inactivity_days: Delete nodes not seen for this many days
|
||||
dry_run: If True, only count without deleting
|
||||
|
||||
Returns:
|
||||
Number of nodes deleted (or would be deleted in dry_run)
|
||||
"""
|
||||
cutoff_date = datetime.now(timezone.utc) - timedelta(days=inactivity_days)
|
||||
|
||||
logger.info(
|
||||
"Starting node cleanup (dry_run=%s, inactivity_days=%d, cutoff=%s)",
|
||||
dry_run,
|
||||
inactivity_days,
|
||||
cutoff_date.isoformat(),
|
||||
)
|
||||
|
||||
if dry_run:
|
||||
# Count nodes that would be deleted
|
||||
# Only count nodes with last_seen < cutoff (excludes NULL last_seen)
|
||||
stmt = (
|
||||
select(func.count())
|
||||
.select_from(Node)
|
||||
.where(Node.last_seen < cutoff_date)
|
||||
.where(Node.last_seen.isnot(None))
|
||||
)
|
||||
result = await db.execute(stmt)
|
||||
count = result.scalar() or 0
|
||||
logger.info(
|
||||
"[DRY RUN] Would delete %d nodes not seen since %s",
|
||||
count,
|
||||
cutoff_date.isoformat(),
|
||||
)
|
||||
return count
|
||||
else:
|
||||
# Delete inactive nodes
|
||||
# Only delete nodes with last_seen < cutoff (excludes NULL last_seen)
|
||||
result = await db.execute(
|
||||
delete(Node)
|
||||
.where(Node.last_seen < cutoff_date)
|
||||
.where(Node.last_seen.isnot(None))
|
||||
)
|
||||
await db.commit()
|
||||
count = result.rowcount or 0 # type: ignore[attr-defined]
|
||||
logger.info(
|
||||
"Deleted %d nodes not seen since %s",
|
||||
count,
|
||||
cutoff_date.isoformat(),
|
||||
)
|
||||
return count
|
||||
@@ -228,6 +228,26 @@ def _run_collector_service(
|
||||
|
||||
from meshcore_hub.collector.subscriber import run_collector
|
||||
|
||||
# Show cleanup configuration
|
||||
click.echo("")
|
||||
click.echo("Cleanup configuration:")
|
||||
if settings.data_retention_enabled:
|
||||
click.echo(
|
||||
f" Event data: Enabled (retention: {settings.data_retention_days} days)"
|
||||
)
|
||||
else:
|
||||
click.echo(" Event data: Disabled")
|
||||
|
||||
if settings.node_cleanup_enabled:
|
||||
click.echo(
|
||||
f" Inactive nodes: Enabled (inactivity: {settings.node_cleanup_days} days)"
|
||||
)
|
||||
else:
|
||||
click.echo(" Inactive nodes: Disabled")
|
||||
|
||||
if settings.data_retention_enabled or settings.node_cleanup_enabled:
|
||||
click.echo(f" Interval: {settings.data_retention_interval_hours} hours")
|
||||
|
||||
click.echo("")
|
||||
click.echo("Starting MQTT subscriber...")
|
||||
run_collector(
|
||||
@@ -238,6 +258,11 @@ def _run_collector_service(
|
||||
mqtt_prefix=prefix,
|
||||
database_url=database_url,
|
||||
webhook_dispatcher=webhook_dispatcher,
|
||||
cleanup_enabled=settings.data_retention_enabled,
|
||||
cleanup_retention_days=settings.data_retention_days,
|
||||
cleanup_interval_hours=settings.data_retention_interval_hours,
|
||||
node_cleanup_enabled=settings.node_cleanup_enabled,
|
||||
node_cleanup_days=settings.node_cleanup_days,
|
||||
)
|
||||
|
||||
|
||||
@@ -549,3 +574,90 @@ def import_members_cmd(
|
||||
click.echo(f" - {error}", err=True)
|
||||
|
||||
db.dispose()
|
||||
|
||||
|
||||
@collector.command("cleanup")
|
||||
@click.option(
|
||||
"--retention-days",
|
||||
type=int,
|
||||
default=30,
|
||||
envvar="DATA_RETENTION_DAYS",
|
||||
help="Number of days to retain data (default: 30)",
|
||||
)
|
||||
@click.option(
|
||||
"--dry-run",
|
||||
is_flag=True,
|
||||
default=False,
|
||||
help="Show what would be deleted without deleting",
|
||||
)
|
||||
@click.pass_context
|
||||
def cleanup_cmd(
|
||||
ctx: click.Context,
|
||||
retention_days: int,
|
||||
dry_run: bool,
|
||||
) -> None:
|
||||
"""Manually run data cleanup to delete old events.
|
||||
|
||||
Deletes event data older than the retention period:
|
||||
- Advertisements
|
||||
- Messages (channel and direct)
|
||||
- Telemetry
|
||||
- Trace paths
|
||||
- Event logs
|
||||
|
||||
Node records are never deleted - only event data.
|
||||
|
||||
Use --dry-run to preview what would be deleted without
|
||||
actually deleting anything.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
configure_logging(level=ctx.obj["log_level"])
|
||||
|
||||
click.echo(f"Database: {ctx.obj['database_url']}")
|
||||
click.echo(f"Retention: {retention_days} days")
|
||||
click.echo(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}")
|
||||
click.echo("")
|
||||
|
||||
if dry_run:
|
||||
click.echo("Running in dry-run mode - no data will be deleted.")
|
||||
else:
|
||||
click.echo("WARNING: This will permanently delete old event data!")
|
||||
if not click.confirm("Continue?"):
|
||||
click.echo("Aborted.")
|
||||
return
|
||||
|
||||
click.echo("")
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
from meshcore_hub.collector.cleanup import cleanup_old_data
|
||||
|
||||
# Initialize database
|
||||
db = DatabaseManager(ctx.obj["database_url"])
|
||||
|
||||
# Run cleanup
|
||||
async def run_cleanup() -> None:
|
||||
async with db.async_session() as session:
|
||||
stats = await cleanup_old_data(
|
||||
session,
|
||||
retention_days,
|
||||
dry_run=dry_run,
|
||||
)
|
||||
|
||||
click.echo("")
|
||||
click.echo("Cleanup results:")
|
||||
click.echo(f" Advertisements: {stats.advertisements_deleted}")
|
||||
click.echo(f" Messages: {stats.messages_deleted}")
|
||||
click.echo(f" Telemetry: {stats.telemetry_deleted}")
|
||||
click.echo(f" Trace paths: {stats.trace_paths_deleted}")
|
||||
click.echo(f" Event logs: {stats.event_logs_deleted}")
|
||||
click.echo(f" Total: {stats.total_deleted}")
|
||||
|
||||
if dry_run:
|
||||
click.echo("")
|
||||
click.echo("(Dry run - no data was actually deleted)")
|
||||
|
||||
asyncio.run(run_cleanup())
|
||||
db.dispose()
|
||||
click.echo("")
|
||||
click.echo("Cleanup complete." if not dry_run else "Dry run complete.")
|
||||
|
||||
@@ -6,6 +6,7 @@ The subscriber:
|
||||
3. Routes events to appropriate handlers
|
||||
4. Persists data to database
|
||||
5. Dispatches events to configured webhooks
|
||||
6. Performs scheduled data cleanup if enabled
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -14,6 +15,7 @@ import signal
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Callable, Optional, TYPE_CHECKING
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
@@ -38,6 +40,11 @@ class Subscriber:
|
||||
mqtt_client: MQTTClient,
|
||||
db_manager: DatabaseManager,
|
||||
webhook_dispatcher: Optional["WebhookDispatcher"] = None,
|
||||
cleanup_enabled: bool = False,
|
||||
cleanup_retention_days: int = 30,
|
||||
cleanup_interval_hours: int = 24,
|
||||
node_cleanup_enabled: bool = False,
|
||||
node_cleanup_days: int = 90,
|
||||
):
|
||||
"""Initialize subscriber.
|
||||
|
||||
@@ -45,6 +52,11 @@ class Subscriber:
|
||||
mqtt_client: MQTT client instance
|
||||
db_manager: Database manager instance
|
||||
webhook_dispatcher: Optional webhook dispatcher for event forwarding
|
||||
cleanup_enabled: Enable automatic event data cleanup
|
||||
cleanup_retention_days: Number of days to retain event data
|
||||
cleanup_interval_hours: Hours between cleanup runs
|
||||
node_cleanup_enabled: Enable automatic cleanup of inactive nodes
|
||||
node_cleanup_days: Remove nodes not seen for this many days
|
||||
"""
|
||||
self.mqtt = mqtt_client
|
||||
self.db = db_manager
|
||||
@@ -59,6 +71,14 @@ class Subscriber:
|
||||
self._webhook_queue: list[tuple[str, dict[str, Any], str]] = []
|
||||
self._webhook_lock = threading.Lock()
|
||||
self._webhook_thread: Optional[threading.Thread] = None
|
||||
# Data cleanup
|
||||
self._cleanup_enabled = cleanup_enabled
|
||||
self._cleanup_retention_days = cleanup_retention_days
|
||||
self._cleanup_interval_hours = cleanup_interval_hours
|
||||
self._node_cleanup_enabled = node_cleanup_enabled
|
||||
self._node_cleanup_days = node_cleanup_days
|
||||
self._cleanup_thread: Optional[threading.Thread] = None
|
||||
self._last_cleanup: Optional[datetime] = None
|
||||
|
||||
@property
|
||||
def is_healthy(self) -> bool:
|
||||
@@ -202,6 +222,115 @@ class Subscriber:
|
||||
if self._webhook_thread.is_alive():
|
||||
logger.warning("Webhook processor thread did not stop cleanly")
|
||||
|
||||
def _start_cleanup_scheduler(self) -> None:
|
||||
"""Start background thread for periodic data cleanup."""
|
||||
if not self._cleanup_enabled and not self._node_cleanup_enabled:
|
||||
logger.info("Data cleanup and node cleanup are both disabled")
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Starting cleanup scheduler (interval_hours=%d)",
|
||||
self._cleanup_interval_hours,
|
||||
)
|
||||
if self._cleanup_enabled:
|
||||
logger.info(
|
||||
" Event data cleanup: ENABLED (retention_days=%d)",
|
||||
self._cleanup_retention_days,
|
||||
)
|
||||
else:
|
||||
logger.info(" Event data cleanup: DISABLED")
|
||||
|
||||
if self._node_cleanup_enabled:
|
||||
logger.info(
|
||||
" Node cleanup: ENABLED (inactivity_days=%d)", self._node_cleanup_days
|
||||
)
|
||||
else:
|
||||
logger.info(" Node cleanup: DISABLED")
|
||||
|
||||
def run_cleanup_loop() -> None:
|
||||
"""Run async cleanup tasks in background thread."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
try:
|
||||
while self._running:
|
||||
# Check if cleanup is due
|
||||
now = datetime.now(timezone.utc)
|
||||
should_run = False
|
||||
|
||||
if self._last_cleanup is None:
|
||||
# First run
|
||||
should_run = True
|
||||
else:
|
||||
# Check if interval has passed
|
||||
hours_since_last = (
|
||||
now - self._last_cleanup
|
||||
).total_seconds() / 3600
|
||||
should_run = hours_since_last >= self._cleanup_interval_hours
|
||||
|
||||
if should_run:
|
||||
try:
|
||||
logger.info("Starting scheduled cleanup")
|
||||
from meshcore_hub.collector.cleanup import (
|
||||
cleanup_old_data,
|
||||
cleanup_inactive_nodes,
|
||||
)
|
||||
|
||||
# Get async session and run cleanup
|
||||
async def run_cleanup() -> None:
|
||||
async with self.db.async_session() as session:
|
||||
# Run event data cleanup if enabled
|
||||
if self._cleanup_enabled:
|
||||
stats = await cleanup_old_data(
|
||||
session,
|
||||
self._cleanup_retention_days,
|
||||
dry_run=False,
|
||||
)
|
||||
logger.info(
|
||||
"Event cleanup completed: %s", stats
|
||||
)
|
||||
|
||||
# Run node cleanup if enabled
|
||||
if self._node_cleanup_enabled:
|
||||
nodes_deleted = await cleanup_inactive_nodes(
|
||||
session,
|
||||
self._node_cleanup_days,
|
||||
dry_run=False,
|
||||
)
|
||||
logger.info(
|
||||
"Node cleanup completed: %d nodes deleted",
|
||||
nodes_deleted,
|
||||
)
|
||||
|
||||
loop.run_until_complete(run_cleanup())
|
||||
self._last_cleanup = now
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Cleanup error: {e}", exc_info=True)
|
||||
|
||||
# Sleep for 1 hour before next check
|
||||
for _ in range(3600):
|
||||
if not self._running:
|
||||
break
|
||||
time.sleep(1)
|
||||
|
||||
finally:
|
||||
loop.close()
|
||||
logger.info("Cleanup scheduler stopped")
|
||||
|
||||
self._cleanup_thread = threading.Thread(
|
||||
target=run_cleanup_loop, daemon=True, name="cleanup-scheduler"
|
||||
)
|
||||
self._cleanup_thread.start()
|
||||
|
||||
def _stop_cleanup_scheduler(self) -> None:
|
||||
"""Stop the cleanup scheduler thread."""
|
||||
if self._cleanup_thread and self._cleanup_thread.is_alive():
|
||||
# Thread will exit when self._running becomes False
|
||||
self._cleanup_thread.join(timeout=5.0)
|
||||
if self._cleanup_thread.is_alive():
|
||||
logger.warning("Cleanup scheduler thread did not stop cleanly")
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start the subscriber."""
|
||||
logger.info("Starting collector subscriber")
|
||||
@@ -239,6 +368,9 @@ class Subscriber:
|
||||
# Start webhook processor if configured
|
||||
self._start_webhook_processor()
|
||||
|
||||
# Start cleanup scheduler if configured
|
||||
self._start_cleanup_scheduler()
|
||||
|
||||
# Start health reporter for Docker health checks
|
||||
self._health_reporter = HealthReporter(
|
||||
component="collector",
|
||||
@@ -271,6 +403,9 @@ class Subscriber:
|
||||
self._running = False
|
||||
self._shutdown_event.set()
|
||||
|
||||
# Stop cleanup scheduler
|
||||
self._stop_cleanup_scheduler()
|
||||
|
||||
# Stop webhook processor
|
||||
self._stop_webhook_processor()
|
||||
|
||||
@@ -295,6 +430,11 @@ def create_subscriber(
|
||||
mqtt_prefix: str = "meshcore",
|
||||
database_url: str = "sqlite:///./meshcore.db",
|
||||
webhook_dispatcher: Optional["WebhookDispatcher"] = None,
|
||||
cleanup_enabled: bool = False,
|
||||
cleanup_retention_days: int = 30,
|
||||
cleanup_interval_hours: int = 24,
|
||||
node_cleanup_enabled: bool = False,
|
||||
node_cleanup_days: int = 90,
|
||||
) -> Subscriber:
|
||||
"""Create a configured subscriber instance.
|
||||
|
||||
@@ -306,6 +446,11 @@ def create_subscriber(
|
||||
mqtt_prefix: MQTT topic prefix
|
||||
database_url: Database connection URL
|
||||
webhook_dispatcher: Optional webhook dispatcher for event forwarding
|
||||
cleanup_enabled: Enable automatic event data cleanup
|
||||
cleanup_retention_days: Number of days to retain event data
|
||||
cleanup_interval_hours: Hours between cleanup runs
|
||||
node_cleanup_enabled: Enable automatic cleanup of inactive nodes
|
||||
node_cleanup_days: Remove nodes not seen for this many days
|
||||
|
||||
Returns:
|
||||
Configured Subscriber instance
|
||||
@@ -326,7 +471,16 @@ def create_subscriber(
|
||||
db_manager = DatabaseManager(database_url)
|
||||
|
||||
# Create subscriber
|
||||
subscriber = Subscriber(mqtt_client, db_manager, webhook_dispatcher)
|
||||
subscriber = Subscriber(
|
||||
mqtt_client,
|
||||
db_manager,
|
||||
webhook_dispatcher,
|
||||
cleanup_enabled=cleanup_enabled,
|
||||
cleanup_retention_days=cleanup_retention_days,
|
||||
cleanup_interval_hours=cleanup_interval_hours,
|
||||
node_cleanup_enabled=node_cleanup_enabled,
|
||||
node_cleanup_days=node_cleanup_days,
|
||||
)
|
||||
|
||||
# Register handlers
|
||||
from meshcore_hub.collector.handlers import register_all_handlers
|
||||
@@ -344,6 +498,11 @@ def run_collector(
|
||||
mqtt_prefix: str = "meshcore",
|
||||
database_url: str = "sqlite:///./meshcore.db",
|
||||
webhook_dispatcher: Optional["WebhookDispatcher"] = None,
|
||||
cleanup_enabled: bool = False,
|
||||
cleanup_retention_days: int = 30,
|
||||
cleanup_interval_hours: int = 24,
|
||||
node_cleanup_enabled: bool = False,
|
||||
node_cleanup_days: int = 90,
|
||||
) -> None:
|
||||
"""Run the collector (blocking).
|
||||
|
||||
@@ -355,6 +514,11 @@ def run_collector(
|
||||
mqtt_prefix: MQTT topic prefix
|
||||
database_url: Database connection URL
|
||||
webhook_dispatcher: Optional webhook dispatcher for event forwarding
|
||||
cleanup_enabled: Enable automatic event data cleanup
|
||||
cleanup_retention_days: Number of days to retain event data
|
||||
cleanup_interval_hours: Hours between cleanup runs
|
||||
node_cleanup_enabled: Enable automatic cleanup of inactive nodes
|
||||
node_cleanup_days: Remove nodes not seen for this many days
|
||||
"""
|
||||
subscriber = create_subscriber(
|
||||
mqtt_host=mqtt_host,
|
||||
@@ -364,6 +528,11 @@ def run_collector(
|
||||
mqtt_prefix=mqtt_prefix,
|
||||
database_url=database_url,
|
||||
webhook_dispatcher=webhook_dispatcher,
|
||||
cleanup_enabled=cleanup_enabled,
|
||||
cleanup_retention_days=cleanup_retention_days,
|
||||
cleanup_interval_hours=cleanup_interval_hours,
|
||||
node_cleanup_enabled=node_cleanup_enabled,
|
||||
node_cleanup_days=node_cleanup_days,
|
||||
)
|
||||
|
||||
# Set up signal handlers
|
||||
|
||||
@@ -210,7 +210,8 @@ def import_tags(
|
||||
node = Node(
|
||||
public_key=public_key,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
# last_seen is intentionally left unset (None)
|
||||
# It will be set when the node is actually seen via events
|
||||
)
|
||||
session.add(node)
|
||||
session.flush()
|
||||
|
||||
@@ -126,6 +126,29 @@ class CollectorSettings(CommonSettings):
|
||||
default=2.0, description="Retry backoff multiplier"
|
||||
)
|
||||
|
||||
# Data retention / cleanup settings
|
||||
data_retention_enabled: bool = Field(
|
||||
default=True, description="Enable automatic event data cleanup"
|
||||
)
|
||||
data_retention_days: int = Field(
|
||||
default=30, description="Number of days to retain event data", ge=1
|
||||
)
|
||||
data_retention_interval_hours: int = Field(
|
||||
default=24,
|
||||
description="Hours between automatic cleanup runs (applies to both events and nodes)",
|
||||
ge=1,
|
||||
)
|
||||
|
||||
# Node cleanup settings
|
||||
node_cleanup_enabled: bool = Field(
|
||||
default=True, description="Enable automatic cleanup of inactive nodes"
|
||||
)
|
||||
node_cleanup_days: int = Field(
|
||||
default=7,
|
||||
description="Remove nodes not seen for this many days (last_seen)",
|
||||
ge=1,
|
||||
)
|
||||
|
||||
@property
|
||||
def collector_data_dir(self) -> str:
|
||||
"""Get the collector data directory path."""
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
"""Database connection and session management."""
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Generator
|
||||
from contextlib import asynccontextmanager, contextmanager
|
||||
from typing import AsyncGenerator, Generator
|
||||
|
||||
from sqlalchemy import create_engine, event
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
|
||||
from meshcore_hub.common.models.base import Base
|
||||
@@ -100,6 +101,17 @@ class DatabaseManager:
|
||||
self.engine = create_database_engine(database_url, echo=echo)
|
||||
self.session_factory = create_session_factory(self.engine)
|
||||
|
||||
# Create async engine for async operations
|
||||
async_url = database_url.replace("sqlite://", "sqlite+aiosqlite://")
|
||||
self.async_engine = create_async_engine(async_url, echo=echo)
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker
|
||||
|
||||
self.async_session_factory = async_sessionmaker(
|
||||
self.async_engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False,
|
||||
)
|
||||
|
||||
def create_tables(self) -> None:
|
||||
"""Create all database tables."""
|
||||
create_tables(self.engine)
|
||||
@@ -138,6 +150,21 @@ class DatabaseManager:
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
@asynccontextmanager
|
||||
async def async_session(self) -> AsyncGenerator[AsyncSession, None]:
|
||||
"""Provide an async session context manager.
|
||||
|
||||
Yields:
|
||||
AsyncSession instance
|
||||
|
||||
Example:
|
||||
async with db.async_session() as session:
|
||||
result = await session.execute(select(Node))
|
||||
await session.commit()
|
||||
"""
|
||||
async with self.async_session_factory() as session:
|
||||
yield session
|
||||
|
||||
def dispose(self) -> None:
|
||||
"""Dispose of the database engine and connection pool."""
|
||||
self.engine.dispose()
|
||||
|
||||
@@ -20,3 +20,16 @@ def db_session(db_manager):
|
||||
session = db_manager.get_session()
|
||||
yield session
|
||||
session.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def async_db_session(db_manager):
|
||||
"""Create an async database session for testing."""
|
||||
# Create tables in async engine
|
||||
async with db_manager.async_engine.begin() as conn:
|
||||
await conn.run_sync(db_manager.engine.pool.echo)
|
||||
# Tables already created by db_manager fixture with sync engine
|
||||
# Async engine shares same database file, so tables exist
|
||||
|
||||
async with db_manager.async_session() as session:
|
||||
yield session
|
||||
|
||||
@@ -0,0 +1,254 @@
|
||||
"""Tests for data cleanup functionality."""
|
||||
|
||||
import pytest
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from meshcore_hub.collector.cleanup import cleanup_old_data, CleanupStats
|
||||
from meshcore_hub.common.models import (
|
||||
Advertisement,
|
||||
EventLog,
|
||||
Message,
|
||||
Node,
|
||||
Telemetry,
|
||||
TracePath,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_old_data_dry_run(async_db_session: AsyncSession) -> None:
|
||||
"""Test cleanup in dry-run mode."""
|
||||
# Create test node
|
||||
node = Node(
|
||||
public_key="a" * 64,
|
||||
name="Test Node",
|
||||
)
|
||||
async_db_session.add(node)
|
||||
await async_db_session.flush()
|
||||
|
||||
# Create old advertisement (60 days ago)
|
||||
old_date = datetime.now(timezone.utc) - timedelta(days=60)
|
||||
old_adv = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=old_date,
|
||||
updated_at=old_date,
|
||||
)
|
||||
async_db_session.add(old_adv)
|
||||
|
||||
# Create recent advertisement (10 days ago)
|
||||
recent_date = datetime.now(timezone.utc) - timedelta(days=10)
|
||||
recent_adv = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=recent_date,
|
||||
updated_at=recent_date,
|
||||
)
|
||||
async_db_session.add(recent_adv)
|
||||
|
||||
await async_db_session.commit()
|
||||
|
||||
# Run cleanup in dry-run mode with 30-day retention
|
||||
stats = await cleanup_old_data(async_db_session, retention_days=30, dry_run=True)
|
||||
|
||||
# Should report 1 advertisement would be deleted
|
||||
assert stats.advertisements_deleted == 1
|
||||
assert stats.total_deleted == 1
|
||||
|
||||
# Verify no data was actually deleted
|
||||
await async_db_session.rollback() # Refresh from DB
|
||||
from sqlalchemy import select, func
|
||||
|
||||
count = await async_db_session.scalar(
|
||||
select(func.count()).select_from(Advertisement)
|
||||
)
|
||||
assert count == 2 # Both still exist
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_old_data_live(async_db_session: AsyncSession) -> None:
|
||||
"""Test cleanup in live mode."""
|
||||
# Create test node
|
||||
node = Node(
|
||||
public_key="b" * 64,
|
||||
name="Test Node",
|
||||
)
|
||||
async_db_session.add(node)
|
||||
await async_db_session.flush()
|
||||
|
||||
# Create old records (60 days ago)
|
||||
old_date = datetime.now(timezone.utc) - timedelta(days=60)
|
||||
|
||||
old_adv = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=old_date,
|
||||
updated_at=old_date,
|
||||
)
|
||||
async_db_session.add(old_adv)
|
||||
|
||||
old_msg = Message(
|
||||
node_id=node.id,
|
||||
direction="recv",
|
||||
message_type="channel",
|
||||
text="old message",
|
||||
payload={},
|
||||
created_at=old_date,
|
||||
updated_at=old_date,
|
||||
)
|
||||
async_db_session.add(old_msg)
|
||||
|
||||
old_telemetry = Telemetry(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=old_date,
|
||||
updated_at=old_date,
|
||||
)
|
||||
async_db_session.add(old_telemetry)
|
||||
|
||||
old_trace = TracePath(
|
||||
node_id=node.id,
|
||||
destination="c" * 64,
|
||||
path_hashes=[],
|
||||
payload={},
|
||||
created_at=old_date,
|
||||
updated_at=old_date,
|
||||
)
|
||||
async_db_session.add(old_trace)
|
||||
|
||||
old_event = EventLog(
|
||||
node_id=node.id,
|
||||
event_type="test_event",
|
||||
payload={},
|
||||
created_at=old_date,
|
||||
updated_at=old_date,
|
||||
)
|
||||
async_db_session.add(old_event)
|
||||
|
||||
# Create recent records (10 days ago)
|
||||
recent_date = datetime.now(timezone.utc) - timedelta(days=10)
|
||||
|
||||
recent_adv = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=recent_date,
|
||||
updated_at=recent_date,
|
||||
)
|
||||
async_db_session.add(recent_adv)
|
||||
|
||||
await async_db_session.commit()
|
||||
|
||||
# Run cleanup with 30-day retention
|
||||
stats = await cleanup_old_data(async_db_session, retention_days=30, dry_run=False)
|
||||
|
||||
# Verify statistics
|
||||
assert stats.advertisements_deleted == 1
|
||||
assert stats.messages_deleted == 1
|
||||
assert stats.telemetry_deleted == 1
|
||||
assert stats.trace_paths_deleted == 1
|
||||
assert stats.event_logs_deleted == 1
|
||||
assert stats.total_deleted == 5
|
||||
|
||||
# Verify old data was deleted
|
||||
from sqlalchemy import select, func
|
||||
|
||||
adv_count = await async_db_session.scalar(
|
||||
select(func.count()).select_from(Advertisement)
|
||||
)
|
||||
assert adv_count == 1 # Only recent one remains
|
||||
|
||||
msg_count = await async_db_session.scalar(select(func.count()).select_from(Message))
|
||||
assert msg_count == 0 # Old one deleted
|
||||
|
||||
# Verify node still exists
|
||||
from sqlalchemy import select
|
||||
|
||||
node_result = await async_db_session.scalar(select(Node).where(Node.id == node.id))
|
||||
assert node_result is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_respects_retention_period(
|
||||
async_db_session: AsyncSession,
|
||||
) -> None:
|
||||
"""Test that cleanup respects the retention period."""
|
||||
# Create test node
|
||||
node = Node(
|
||||
public_key="d" * 64,
|
||||
name="Test Node",
|
||||
)
|
||||
async_db_session.add(node)
|
||||
await async_db_session.flush()
|
||||
|
||||
# Create advertisements at different ages
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# 90 days old - should be deleted with 30-day retention
|
||||
very_old = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=now - timedelta(days=90),
|
||||
updated_at=now - timedelta(days=90),
|
||||
)
|
||||
async_db_session.add(very_old)
|
||||
|
||||
# 40 days old - should be deleted with 30-day retention
|
||||
old = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=now - timedelta(days=40),
|
||||
updated_at=now - timedelta(days=40),
|
||||
)
|
||||
async_db_session.add(old)
|
||||
|
||||
# 20 days old - should be kept
|
||||
recent = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=now - timedelta(days=20),
|
||||
updated_at=now - timedelta(days=20),
|
||||
)
|
||||
async_db_session.add(recent)
|
||||
|
||||
# 5 days old - should be kept
|
||||
very_recent = Advertisement(
|
||||
node_id=node.id,
|
||||
payload={},
|
||||
created_at=now - timedelta(days=5),
|
||||
updated_at=now - timedelta(days=5),
|
||||
)
|
||||
async_db_session.add(very_recent)
|
||||
|
||||
await async_db_session.commit()
|
||||
|
||||
# Run cleanup with 30-day retention
|
||||
stats = await cleanup_old_data(async_db_session, retention_days=30, dry_run=False)
|
||||
|
||||
# Should delete the 2 old ones, keep the 2 recent ones
|
||||
assert stats.advertisements_deleted == 2
|
||||
assert stats.total_deleted == 2
|
||||
|
||||
# Verify count
|
||||
from sqlalchemy import select, func
|
||||
|
||||
adv_count = await async_db_session.scalar(
|
||||
select(func.count()).select_from(Advertisement)
|
||||
)
|
||||
assert adv_count == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_stats_repr() -> None:
|
||||
"""Test CleanupStats string representation."""
|
||||
stats = CleanupStats()
|
||||
stats.advertisements_deleted = 10
|
||||
stats.messages_deleted = 5
|
||||
stats.telemetry_deleted = 3
|
||||
stats.trace_paths_deleted = 2
|
||||
stats.event_logs_deleted = 1
|
||||
stats.total_deleted = 21
|
||||
|
||||
repr_str = repr(stats)
|
||||
assert "total=21" in repr_str
|
||||
assert "advertisements=10" in repr_str
|
||||
assert "messages=5" in repr_str
|
||||
Reference in New Issue
Block a user