diff --git a/.env.example b/.env.example index 307110f..e76aaea 100644 --- a/.env.example +++ b/.env.example @@ -141,3 +141,35 @@ WEBHOOK_MESSAGE_SECRET= WEBHOOK_TIMEOUT=10.0 WEBHOOK_MAX_RETRIES=3 WEBHOOK_RETRY_BACKOFF=2.0 + +# =================== +# Data Retention Settings +# =================== + +# Enable automatic cleanup of old event data +# When enabled, the collector runs periodic cleanup to delete old events +# Default: true +DATA_RETENTION_ENABLED=true + +# Number of days to retain event data (advertisements, messages, telemetry, etc.) +# Events older than this are deleted during cleanup +# Default: 30 days +DATA_RETENTION_DAYS=30 + +# Hours between automatic cleanup runs (applies to both events and nodes) +# Default: 24 hours (once per day) +DATA_RETENTION_INTERVAL_HOURS=24 + +# =================== +# Node Cleanup Settings +# =================== + +# Enable automatic cleanup of inactive nodes +# Nodes that haven't been seen (last_seen) for the specified period are removed +# Nodes with last_seen=NULL (never seen on network) are NOT removed +# Default: true +NODE_CLEANUP_ENABLED=true + +# Remove nodes not seen for this many days (based on last_seen field) +# Default: 7 days +NODE_CLEANUP_DAYS=7 diff --git a/AGENTS.md b/AGENTS.md index a84ac1f..3351a7b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -264,6 +264,7 @@ meshcore-hub/ │ ├── collector/ │ │ ├── cli.py # Collector CLI with seed commands │ │ ├── subscriber.py # MQTT subscriber +│ │ ├── cleanup.py # Data retention/cleanup service │ │ ├── tag_import.py # Tag import from YAML │ │ ├── member_import.py # Member import from YAML │ │ ├── handlers/ # Event handlers @@ -502,6 +503,48 @@ The collector supports forwarding events to external HTTP endpoints: | `WEBHOOK_MAX_RETRIES` | Max retries on failure (default: 3) | | `WEBHOOK_RETRY_BACKOFF` | Exponential backoff multiplier (default: 2.0) | +### Data Retention / Cleanup Configuration + +The collector supports automatic cleanup of old event data and inactive nodes: + +**Event Data Cleanup:** + +| Variable | Description | +|----------|-------------| +| `DATA_RETENTION_ENABLED` | Enable automatic event data cleanup (default: true) | +| `DATA_RETENTION_DAYS` | Days to retain event data (default: 30) | +| `DATA_RETENTION_INTERVAL_HOURS` | Hours between cleanup runs (default: 24) | + +When enabled, the collector automatically deletes event data older than the retention period: +- Advertisements +- Messages (channel and direct) +- Telemetry +- Trace paths +- Event logs + +**Node Cleanup:** + +| Variable | Description | +|----------|-------------| +| `NODE_CLEANUP_ENABLED` | Enable automatic cleanup of inactive nodes (default: true) | +| `NODE_CLEANUP_DAYS` | Remove nodes not seen for this many days (default: 7) | + +When enabled, the collector automatically removes nodes where: +- `last_seen` is older than the configured number of days +- Nodes with `last_seen=NULL` (never seen on network) are **NOT** removed +- Nodes created via tag import that have never been seen on the mesh are preserved + +**Note:** Both event data and node cleanup run on the same schedule (DATA_RETENTION_INTERVAL_HOURS). + +Manual cleanup can be triggered at any time with: +```bash +# Dry run to see what would be deleted +meshcore-hub collector cleanup --retention-days 30 --dry-run + +# Live cleanup +meshcore-hub collector cleanup --retention-days 30 +``` + Webhook payload structure: ```json { diff --git a/docker-compose.yml b/docker-compose.yml index cb074b0..a558039 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -161,6 +161,12 @@ services: - WEBHOOK_TIMEOUT=${WEBHOOK_TIMEOUT:-10.0} - WEBHOOK_MAX_RETRIES=${WEBHOOK_MAX_RETRIES:-3} - WEBHOOK_RETRY_BACKOFF=${WEBHOOK_RETRY_BACKOFF:-2.0} + # Data retention and cleanup configuration + - DATA_RETENTION_ENABLED=${DATA_RETENTION_ENABLED:-true} + - DATA_RETENTION_DAYS=${DATA_RETENTION_DAYS:-30} + - DATA_RETENTION_INTERVAL_HOURS=${DATA_RETENTION_INTERVAL_HOURS:-24} + - NODE_CLEANUP_ENABLED=${NODE_CLEANUP_ENABLED:-true} + - NODE_CLEANUP_DAYS=${NODE_CLEANUP_DAYS:-7} command: ["collector"] healthcheck: test: ["CMD", "meshcore-hub", "health", "collector"] diff --git a/src/meshcore_hub/collector/cleanup.py b/src/meshcore_hub/collector/cleanup.py new file mode 100644 index 0000000..e05ac41 --- /dev/null +++ b/src/meshcore_hub/collector/cleanup.py @@ -0,0 +1,225 @@ +"""Data retention and cleanup service for MeshCore Hub. + +This module provides functionality to delete old event data and inactive nodes +based on configured retention policies. +""" + +import logging +from datetime import datetime, timedelta, timezone + +from sqlalchemy import delete, func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from meshcore_hub.common.models import ( + Advertisement, + EventLog, + Message, + Node, + Telemetry, + TracePath, +) + +logger = logging.getLogger(__name__) + + +class CleanupStats: + """Statistics from a cleanup operation.""" + + def __init__(self) -> None: + self.advertisements_deleted: int = 0 + self.messages_deleted: int = 0 + self.telemetry_deleted: int = 0 + self.trace_paths_deleted: int = 0 + self.event_logs_deleted: int = 0 + self.nodes_deleted: int = 0 + self.total_deleted: int = 0 + + def __repr__(self) -> str: + return ( + f"CleanupStats(total={self.total_deleted}, " + f"advertisements={self.advertisements_deleted}, " + f"messages={self.messages_deleted}, " + f"telemetry={self.telemetry_deleted}, " + f"trace_paths={self.trace_paths_deleted}, " + f"event_logs={self.event_logs_deleted}, " + f"nodes={self.nodes_deleted})" + ) + + +async def cleanup_old_data( + db: AsyncSession, + retention_days: int, + dry_run: bool = False, +) -> CleanupStats: + """Delete event data older than the retention period. + + Args: + db: Database session + retention_days: Number of days to retain data + dry_run: If True, only count records without deleting + + Returns: + CleanupStats object with deletion counts + """ + stats = CleanupStats() + cutoff_date = datetime.now(timezone.utc) - timedelta(days=retention_days) + + logger.info( + "Starting data cleanup (dry_run=%s, retention_days=%d, cutoff=%s)", + dry_run, + retention_days, + cutoff_date.isoformat(), + ) + + # Clean up advertisements + stats.advertisements_deleted = await _cleanup_table( + db, Advertisement, cutoff_date, "advertisements", dry_run + ) + + # Clean up messages + stats.messages_deleted = await _cleanup_table( + db, Message, cutoff_date, "messages", dry_run + ) + + # Clean up telemetry + stats.telemetry_deleted = await _cleanup_table( + db, Telemetry, cutoff_date, "telemetry", dry_run + ) + + # Clean up trace paths + stats.trace_paths_deleted = await _cleanup_table( + db, TracePath, cutoff_date, "trace_paths", dry_run + ) + + # Clean up event logs + stats.event_logs_deleted = await _cleanup_table( + db, EventLog, cutoff_date, "event_logs", dry_run + ) + + stats.total_deleted = ( + stats.advertisements_deleted + + stats.messages_deleted + + stats.telemetry_deleted + + stats.trace_paths_deleted + + stats.event_logs_deleted + ) + + if not dry_run: + await db.commit() + logger.info("Cleanup completed: %s", stats) + else: + logger.info("Cleanup dry run completed: %s", stats) + + return stats + + +async def _cleanup_table( + db: AsyncSession, + model: type, + cutoff_date: datetime, + table_name: str, + dry_run: bool, +) -> int: + """Delete old records from a specific table. + + Args: + db: Database session + model: SQLAlchemy model class + cutoff_date: Delete records older than this date + table_name: Name of table for logging + dry_run: If True, only count without deleting + + Returns: + Number of records deleted (or would be deleted in dry_run) + """ + from sqlalchemy import select + + if dry_run: + # Count records that would be deleted + stmt = ( + select(func.count()) + .select_from(model) + .where(model.created_at < cutoff_date) # type: ignore[attr-defined] + ) + result = await db.execute(stmt) + count = result.scalar() or 0 + logger.debug( + "[DRY RUN] Would delete %d records from %s older than %s", + count, + table_name, + cutoff_date.isoformat(), + ) + return count + else: + # Delete old records + result = await db.execute(delete(model).where(model.created_at < cutoff_date)) # type: ignore[attr-defined] + count = result.rowcount or 0 # type: ignore[attr-defined] + logger.debug( + "Deleted %d records from %s older than %s", + count, + table_name, + cutoff_date.isoformat(), + ) + return count + + +async def cleanup_inactive_nodes( + db: AsyncSession, + inactivity_days: int, + dry_run: bool = False, +) -> int: + """Delete nodes that haven't been seen for the specified number of days. + + Only deletes nodes where last_seen is older than the cutoff date. + Nodes with last_seen=NULL are NOT deleted (never seen on network). + + Args: + db: Database session + inactivity_days: Delete nodes not seen for this many days + dry_run: If True, only count without deleting + + Returns: + Number of nodes deleted (or would be deleted in dry_run) + """ + cutoff_date = datetime.now(timezone.utc) - timedelta(days=inactivity_days) + + logger.info( + "Starting node cleanup (dry_run=%s, inactivity_days=%d, cutoff=%s)", + dry_run, + inactivity_days, + cutoff_date.isoformat(), + ) + + if dry_run: + # Count nodes that would be deleted + # Only count nodes with last_seen < cutoff (excludes NULL last_seen) + stmt = ( + select(func.count()) + .select_from(Node) + .where(Node.last_seen < cutoff_date) + .where(Node.last_seen.isnot(None)) + ) + result = await db.execute(stmt) + count = result.scalar() or 0 + logger.info( + "[DRY RUN] Would delete %d nodes not seen since %s", + count, + cutoff_date.isoformat(), + ) + return count + else: + # Delete inactive nodes + # Only delete nodes with last_seen < cutoff (excludes NULL last_seen) + result = await db.execute( + delete(Node) + .where(Node.last_seen < cutoff_date) + .where(Node.last_seen.isnot(None)) + ) + await db.commit() + count = result.rowcount or 0 # type: ignore[attr-defined] + logger.info( + "Deleted %d nodes not seen since %s", + count, + cutoff_date.isoformat(), + ) + return count diff --git a/src/meshcore_hub/collector/cli.py b/src/meshcore_hub/collector/cli.py index 655d56f..778353d 100644 --- a/src/meshcore_hub/collector/cli.py +++ b/src/meshcore_hub/collector/cli.py @@ -228,6 +228,26 @@ def _run_collector_service( from meshcore_hub.collector.subscriber import run_collector + # Show cleanup configuration + click.echo("") + click.echo("Cleanup configuration:") + if settings.data_retention_enabled: + click.echo( + f" Event data: Enabled (retention: {settings.data_retention_days} days)" + ) + else: + click.echo(" Event data: Disabled") + + if settings.node_cleanup_enabled: + click.echo( + f" Inactive nodes: Enabled (inactivity: {settings.node_cleanup_days} days)" + ) + else: + click.echo(" Inactive nodes: Disabled") + + if settings.data_retention_enabled or settings.node_cleanup_enabled: + click.echo(f" Interval: {settings.data_retention_interval_hours} hours") + click.echo("") click.echo("Starting MQTT subscriber...") run_collector( @@ -238,6 +258,11 @@ def _run_collector_service( mqtt_prefix=prefix, database_url=database_url, webhook_dispatcher=webhook_dispatcher, + cleanup_enabled=settings.data_retention_enabled, + cleanup_retention_days=settings.data_retention_days, + cleanup_interval_hours=settings.data_retention_interval_hours, + node_cleanup_enabled=settings.node_cleanup_enabled, + node_cleanup_days=settings.node_cleanup_days, ) @@ -549,3 +574,90 @@ def import_members_cmd( click.echo(f" - {error}", err=True) db.dispose() + + +@collector.command("cleanup") +@click.option( + "--retention-days", + type=int, + default=30, + envvar="DATA_RETENTION_DAYS", + help="Number of days to retain data (default: 30)", +) +@click.option( + "--dry-run", + is_flag=True, + default=False, + help="Show what would be deleted without deleting", +) +@click.pass_context +def cleanup_cmd( + ctx: click.Context, + retention_days: int, + dry_run: bool, +) -> None: + """Manually run data cleanup to delete old events. + + Deletes event data older than the retention period: + - Advertisements + - Messages (channel and direct) + - Telemetry + - Trace paths + - Event logs + + Node records are never deleted - only event data. + + Use --dry-run to preview what would be deleted without + actually deleting anything. + """ + import asyncio + + configure_logging(level=ctx.obj["log_level"]) + + click.echo(f"Database: {ctx.obj['database_url']}") + click.echo(f"Retention: {retention_days} days") + click.echo(f"Mode: {'DRY RUN' if dry_run else 'LIVE'}") + click.echo("") + + if dry_run: + click.echo("Running in dry-run mode - no data will be deleted.") + else: + click.echo("WARNING: This will permanently delete old event data!") + if not click.confirm("Continue?"): + click.echo("Aborted.") + return + + click.echo("") + + from meshcore_hub.common.database import DatabaseManager + from meshcore_hub.collector.cleanup import cleanup_old_data + + # Initialize database + db = DatabaseManager(ctx.obj["database_url"]) + + # Run cleanup + async def run_cleanup() -> None: + async with db.async_session() as session: + stats = await cleanup_old_data( + session, + retention_days, + dry_run=dry_run, + ) + + click.echo("") + click.echo("Cleanup results:") + click.echo(f" Advertisements: {stats.advertisements_deleted}") + click.echo(f" Messages: {stats.messages_deleted}") + click.echo(f" Telemetry: {stats.telemetry_deleted}") + click.echo(f" Trace paths: {stats.trace_paths_deleted}") + click.echo(f" Event logs: {stats.event_logs_deleted}") + click.echo(f" Total: {stats.total_deleted}") + + if dry_run: + click.echo("") + click.echo("(Dry run - no data was actually deleted)") + + asyncio.run(run_cleanup()) + db.dispose() + click.echo("") + click.echo("Cleanup complete." if not dry_run else "Dry run complete.") diff --git a/src/meshcore_hub/collector/subscriber.py b/src/meshcore_hub/collector/subscriber.py index 0b4e4e2..1d29d1c 100644 --- a/src/meshcore_hub/collector/subscriber.py +++ b/src/meshcore_hub/collector/subscriber.py @@ -6,6 +6,7 @@ The subscriber: 3. Routes events to appropriate handlers 4. Persists data to database 5. Dispatches events to configured webhooks +6. Performs scheduled data cleanup if enabled """ import asyncio @@ -14,6 +15,7 @@ import signal import threading import time import uuid +from datetime import datetime, timezone from typing import Any, Callable, Optional, TYPE_CHECKING from meshcore_hub.common.database import DatabaseManager @@ -38,6 +40,11 @@ class Subscriber: mqtt_client: MQTTClient, db_manager: DatabaseManager, webhook_dispatcher: Optional["WebhookDispatcher"] = None, + cleanup_enabled: bool = False, + cleanup_retention_days: int = 30, + cleanup_interval_hours: int = 24, + node_cleanup_enabled: bool = False, + node_cleanup_days: int = 90, ): """Initialize subscriber. @@ -45,6 +52,11 @@ class Subscriber: mqtt_client: MQTT client instance db_manager: Database manager instance webhook_dispatcher: Optional webhook dispatcher for event forwarding + cleanup_enabled: Enable automatic event data cleanup + cleanup_retention_days: Number of days to retain event data + cleanup_interval_hours: Hours between cleanup runs + node_cleanup_enabled: Enable automatic cleanup of inactive nodes + node_cleanup_days: Remove nodes not seen for this many days """ self.mqtt = mqtt_client self.db = db_manager @@ -59,6 +71,14 @@ class Subscriber: self._webhook_queue: list[tuple[str, dict[str, Any], str]] = [] self._webhook_lock = threading.Lock() self._webhook_thread: Optional[threading.Thread] = None + # Data cleanup + self._cleanup_enabled = cleanup_enabled + self._cleanup_retention_days = cleanup_retention_days + self._cleanup_interval_hours = cleanup_interval_hours + self._node_cleanup_enabled = node_cleanup_enabled + self._node_cleanup_days = node_cleanup_days + self._cleanup_thread: Optional[threading.Thread] = None + self._last_cleanup: Optional[datetime] = None @property def is_healthy(self) -> bool: @@ -202,6 +222,115 @@ class Subscriber: if self._webhook_thread.is_alive(): logger.warning("Webhook processor thread did not stop cleanly") + def _start_cleanup_scheduler(self) -> None: + """Start background thread for periodic data cleanup.""" + if not self._cleanup_enabled and not self._node_cleanup_enabled: + logger.info("Data cleanup and node cleanup are both disabled") + return + + logger.info( + "Starting cleanup scheduler (interval_hours=%d)", + self._cleanup_interval_hours, + ) + if self._cleanup_enabled: + logger.info( + " Event data cleanup: ENABLED (retention_days=%d)", + self._cleanup_retention_days, + ) + else: + logger.info(" Event data cleanup: DISABLED") + + if self._node_cleanup_enabled: + logger.info( + " Node cleanup: ENABLED (inactivity_days=%d)", self._node_cleanup_days + ) + else: + logger.info(" Node cleanup: DISABLED") + + def run_cleanup_loop() -> None: + """Run async cleanup tasks in background thread.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + while self._running: + # Check if cleanup is due + now = datetime.now(timezone.utc) + should_run = False + + if self._last_cleanup is None: + # First run + should_run = True + else: + # Check if interval has passed + hours_since_last = ( + now - self._last_cleanup + ).total_seconds() / 3600 + should_run = hours_since_last >= self._cleanup_interval_hours + + if should_run: + try: + logger.info("Starting scheduled cleanup") + from meshcore_hub.collector.cleanup import ( + cleanup_old_data, + cleanup_inactive_nodes, + ) + + # Get async session and run cleanup + async def run_cleanup() -> None: + async with self.db.async_session() as session: + # Run event data cleanup if enabled + if self._cleanup_enabled: + stats = await cleanup_old_data( + session, + self._cleanup_retention_days, + dry_run=False, + ) + logger.info( + "Event cleanup completed: %s", stats + ) + + # Run node cleanup if enabled + if self._node_cleanup_enabled: + nodes_deleted = await cleanup_inactive_nodes( + session, + self._node_cleanup_days, + dry_run=False, + ) + logger.info( + "Node cleanup completed: %d nodes deleted", + nodes_deleted, + ) + + loop.run_until_complete(run_cleanup()) + self._last_cleanup = now + + except Exception as e: + logger.error(f"Cleanup error: {e}", exc_info=True) + + # Sleep for 1 hour before next check + for _ in range(3600): + if not self._running: + break + time.sleep(1) + + finally: + loop.close() + logger.info("Cleanup scheduler stopped") + + self._cleanup_thread = threading.Thread( + target=run_cleanup_loop, daemon=True, name="cleanup-scheduler" + ) + self._cleanup_thread.start() + + def _stop_cleanup_scheduler(self) -> None: + """Stop the cleanup scheduler thread.""" + if self._cleanup_thread and self._cleanup_thread.is_alive(): + # Thread will exit when self._running becomes False + self._cleanup_thread.join(timeout=5.0) + if self._cleanup_thread.is_alive(): + logger.warning("Cleanup scheduler thread did not stop cleanly") + def start(self) -> None: """Start the subscriber.""" logger.info("Starting collector subscriber") @@ -239,6 +368,9 @@ class Subscriber: # Start webhook processor if configured self._start_webhook_processor() + # Start cleanup scheduler if configured + self._start_cleanup_scheduler() + # Start health reporter for Docker health checks self._health_reporter = HealthReporter( component="collector", @@ -271,6 +403,9 @@ class Subscriber: self._running = False self._shutdown_event.set() + # Stop cleanup scheduler + self._stop_cleanup_scheduler() + # Stop webhook processor self._stop_webhook_processor() @@ -295,6 +430,11 @@ def create_subscriber( mqtt_prefix: str = "meshcore", database_url: str = "sqlite:///./meshcore.db", webhook_dispatcher: Optional["WebhookDispatcher"] = None, + cleanup_enabled: bool = False, + cleanup_retention_days: int = 30, + cleanup_interval_hours: int = 24, + node_cleanup_enabled: bool = False, + node_cleanup_days: int = 90, ) -> Subscriber: """Create a configured subscriber instance. @@ -306,6 +446,11 @@ def create_subscriber( mqtt_prefix: MQTT topic prefix database_url: Database connection URL webhook_dispatcher: Optional webhook dispatcher for event forwarding + cleanup_enabled: Enable automatic event data cleanup + cleanup_retention_days: Number of days to retain event data + cleanup_interval_hours: Hours between cleanup runs + node_cleanup_enabled: Enable automatic cleanup of inactive nodes + node_cleanup_days: Remove nodes not seen for this many days Returns: Configured Subscriber instance @@ -326,7 +471,16 @@ def create_subscriber( db_manager = DatabaseManager(database_url) # Create subscriber - subscriber = Subscriber(mqtt_client, db_manager, webhook_dispatcher) + subscriber = Subscriber( + mqtt_client, + db_manager, + webhook_dispatcher, + cleanup_enabled=cleanup_enabled, + cleanup_retention_days=cleanup_retention_days, + cleanup_interval_hours=cleanup_interval_hours, + node_cleanup_enabled=node_cleanup_enabled, + node_cleanup_days=node_cleanup_days, + ) # Register handlers from meshcore_hub.collector.handlers import register_all_handlers @@ -344,6 +498,11 @@ def run_collector( mqtt_prefix: str = "meshcore", database_url: str = "sqlite:///./meshcore.db", webhook_dispatcher: Optional["WebhookDispatcher"] = None, + cleanup_enabled: bool = False, + cleanup_retention_days: int = 30, + cleanup_interval_hours: int = 24, + node_cleanup_enabled: bool = False, + node_cleanup_days: int = 90, ) -> None: """Run the collector (blocking). @@ -355,6 +514,11 @@ def run_collector( mqtt_prefix: MQTT topic prefix database_url: Database connection URL webhook_dispatcher: Optional webhook dispatcher for event forwarding + cleanup_enabled: Enable automatic event data cleanup + cleanup_retention_days: Number of days to retain event data + cleanup_interval_hours: Hours between cleanup runs + node_cleanup_enabled: Enable automatic cleanup of inactive nodes + node_cleanup_days: Remove nodes not seen for this many days """ subscriber = create_subscriber( mqtt_host=mqtt_host, @@ -364,6 +528,11 @@ def run_collector( mqtt_prefix=mqtt_prefix, database_url=database_url, webhook_dispatcher=webhook_dispatcher, + cleanup_enabled=cleanup_enabled, + cleanup_retention_days=cleanup_retention_days, + cleanup_interval_hours=cleanup_interval_hours, + node_cleanup_enabled=node_cleanup_enabled, + node_cleanup_days=node_cleanup_days, ) # Set up signal handlers diff --git a/src/meshcore_hub/collector/tag_import.py b/src/meshcore_hub/collector/tag_import.py index 791c288..9c1d68e 100644 --- a/src/meshcore_hub/collector/tag_import.py +++ b/src/meshcore_hub/collector/tag_import.py @@ -210,7 +210,8 @@ def import_tags( node = Node( public_key=public_key, first_seen=now, - last_seen=now, + # last_seen is intentionally left unset (None) + # It will be set when the node is actually seen via events ) session.add(node) session.flush() diff --git a/src/meshcore_hub/common/config.py b/src/meshcore_hub/common/config.py index 1b936b9..b1efd13 100644 --- a/src/meshcore_hub/common/config.py +++ b/src/meshcore_hub/common/config.py @@ -126,6 +126,29 @@ class CollectorSettings(CommonSettings): default=2.0, description="Retry backoff multiplier" ) + # Data retention / cleanup settings + data_retention_enabled: bool = Field( + default=True, description="Enable automatic event data cleanup" + ) + data_retention_days: int = Field( + default=30, description="Number of days to retain event data", ge=1 + ) + data_retention_interval_hours: int = Field( + default=24, + description="Hours between automatic cleanup runs (applies to both events and nodes)", + ge=1, + ) + + # Node cleanup settings + node_cleanup_enabled: bool = Field( + default=True, description="Enable automatic cleanup of inactive nodes" + ) + node_cleanup_days: int = Field( + default=7, + description="Remove nodes not seen for this many days (last_seen)", + ge=1, + ) + @property def collector_data_dir(self) -> str: """Get the collector data directory path.""" diff --git a/src/meshcore_hub/common/database.py b/src/meshcore_hub/common/database.py index 13b2503..dee22f9 100644 --- a/src/meshcore_hub/common/database.py +++ b/src/meshcore_hub/common/database.py @@ -1,10 +1,11 @@ """Database connection and session management.""" -from contextlib import contextmanager -from typing import Generator +from contextlib import asynccontextmanager, contextmanager +from typing import AsyncGenerator, Generator from sqlalchemy import create_engine, event from sqlalchemy.engine import Engine +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import Session, sessionmaker from meshcore_hub.common.models.base import Base @@ -100,6 +101,17 @@ class DatabaseManager: self.engine = create_database_engine(database_url, echo=echo) self.session_factory = create_session_factory(self.engine) + # Create async engine for async operations + async_url = database_url.replace("sqlite://", "sqlite+aiosqlite://") + self.async_engine = create_async_engine(async_url, echo=echo) + from sqlalchemy.ext.asyncio import async_sessionmaker + + self.async_session_factory = async_sessionmaker( + self.async_engine, + class_=AsyncSession, + expire_on_commit=False, + ) + def create_tables(self) -> None: """Create all database tables.""" create_tables(self.engine) @@ -138,6 +150,21 @@ class DatabaseManager: finally: session.close() + @asynccontextmanager + async def async_session(self) -> AsyncGenerator[AsyncSession, None]: + """Provide an async session context manager. + + Yields: + AsyncSession instance + + Example: + async with db.async_session() as session: + result = await session.execute(select(Node)) + await session.commit() + """ + async with self.async_session_factory() as session: + yield session + def dispose(self) -> None: """Dispose of the database engine and connection pool.""" self.engine.dispose() diff --git a/tests/test_collector/conftest.py b/tests/test_collector/conftest.py index 7026b0a..1503d1c 100644 --- a/tests/test_collector/conftest.py +++ b/tests/test_collector/conftest.py @@ -20,3 +20,16 @@ def db_session(db_manager): session = db_manager.get_session() yield session session.close() + + +@pytest.fixture +async def async_db_session(db_manager): + """Create an async database session for testing.""" + # Create tables in async engine + async with db_manager.async_engine.begin() as conn: + await conn.run_sync(db_manager.engine.pool.echo) + # Tables already created by db_manager fixture with sync engine + # Async engine shares same database file, so tables exist + + async with db_manager.async_session() as session: + yield session diff --git a/tests/test_collector/test_cleanup.py b/tests/test_collector/test_cleanup.py new file mode 100644 index 0000000..7220e5e --- /dev/null +++ b/tests/test_collector/test_cleanup.py @@ -0,0 +1,254 @@ +"""Tests for data cleanup functionality.""" + +import pytest +from datetime import datetime, timedelta, timezone +from sqlalchemy.ext.asyncio import AsyncSession + +from meshcore_hub.collector.cleanup import cleanup_old_data, CleanupStats +from meshcore_hub.common.models import ( + Advertisement, + EventLog, + Message, + Node, + Telemetry, + TracePath, +) + + +@pytest.mark.asyncio +async def test_cleanup_old_data_dry_run(async_db_session: AsyncSession) -> None: + """Test cleanup in dry-run mode.""" + # Create test node + node = Node( + public_key="a" * 64, + name="Test Node", + ) + async_db_session.add(node) + await async_db_session.flush() + + # Create old advertisement (60 days ago) + old_date = datetime.now(timezone.utc) - timedelta(days=60) + old_adv = Advertisement( + node_id=node.id, + payload={}, + created_at=old_date, + updated_at=old_date, + ) + async_db_session.add(old_adv) + + # Create recent advertisement (10 days ago) + recent_date = datetime.now(timezone.utc) - timedelta(days=10) + recent_adv = Advertisement( + node_id=node.id, + payload={}, + created_at=recent_date, + updated_at=recent_date, + ) + async_db_session.add(recent_adv) + + await async_db_session.commit() + + # Run cleanup in dry-run mode with 30-day retention + stats = await cleanup_old_data(async_db_session, retention_days=30, dry_run=True) + + # Should report 1 advertisement would be deleted + assert stats.advertisements_deleted == 1 + assert stats.total_deleted == 1 + + # Verify no data was actually deleted + await async_db_session.rollback() # Refresh from DB + from sqlalchemy import select, func + + count = await async_db_session.scalar( + select(func.count()).select_from(Advertisement) + ) + assert count == 2 # Both still exist + + +@pytest.mark.asyncio +async def test_cleanup_old_data_live(async_db_session: AsyncSession) -> None: + """Test cleanup in live mode.""" + # Create test node + node = Node( + public_key="b" * 64, + name="Test Node", + ) + async_db_session.add(node) + await async_db_session.flush() + + # Create old records (60 days ago) + old_date = datetime.now(timezone.utc) - timedelta(days=60) + + old_adv = Advertisement( + node_id=node.id, + payload={}, + created_at=old_date, + updated_at=old_date, + ) + async_db_session.add(old_adv) + + old_msg = Message( + node_id=node.id, + direction="recv", + message_type="channel", + text="old message", + payload={}, + created_at=old_date, + updated_at=old_date, + ) + async_db_session.add(old_msg) + + old_telemetry = Telemetry( + node_id=node.id, + payload={}, + created_at=old_date, + updated_at=old_date, + ) + async_db_session.add(old_telemetry) + + old_trace = TracePath( + node_id=node.id, + destination="c" * 64, + path_hashes=[], + payload={}, + created_at=old_date, + updated_at=old_date, + ) + async_db_session.add(old_trace) + + old_event = EventLog( + node_id=node.id, + event_type="test_event", + payload={}, + created_at=old_date, + updated_at=old_date, + ) + async_db_session.add(old_event) + + # Create recent records (10 days ago) + recent_date = datetime.now(timezone.utc) - timedelta(days=10) + + recent_adv = Advertisement( + node_id=node.id, + payload={}, + created_at=recent_date, + updated_at=recent_date, + ) + async_db_session.add(recent_adv) + + await async_db_session.commit() + + # Run cleanup with 30-day retention + stats = await cleanup_old_data(async_db_session, retention_days=30, dry_run=False) + + # Verify statistics + assert stats.advertisements_deleted == 1 + assert stats.messages_deleted == 1 + assert stats.telemetry_deleted == 1 + assert stats.trace_paths_deleted == 1 + assert stats.event_logs_deleted == 1 + assert stats.total_deleted == 5 + + # Verify old data was deleted + from sqlalchemy import select, func + + adv_count = await async_db_session.scalar( + select(func.count()).select_from(Advertisement) + ) + assert adv_count == 1 # Only recent one remains + + msg_count = await async_db_session.scalar(select(func.count()).select_from(Message)) + assert msg_count == 0 # Old one deleted + + # Verify node still exists + from sqlalchemy import select + + node_result = await async_db_session.scalar(select(Node).where(Node.id == node.id)) + assert node_result is not None + + +@pytest.mark.asyncio +async def test_cleanup_respects_retention_period( + async_db_session: AsyncSession, +) -> None: + """Test that cleanup respects the retention period.""" + # Create test node + node = Node( + public_key="d" * 64, + name="Test Node", + ) + async_db_session.add(node) + await async_db_session.flush() + + # Create advertisements at different ages + now = datetime.now(timezone.utc) + + # 90 days old - should be deleted with 30-day retention + very_old = Advertisement( + node_id=node.id, + payload={}, + created_at=now - timedelta(days=90), + updated_at=now - timedelta(days=90), + ) + async_db_session.add(very_old) + + # 40 days old - should be deleted with 30-day retention + old = Advertisement( + node_id=node.id, + payload={}, + created_at=now - timedelta(days=40), + updated_at=now - timedelta(days=40), + ) + async_db_session.add(old) + + # 20 days old - should be kept + recent = Advertisement( + node_id=node.id, + payload={}, + created_at=now - timedelta(days=20), + updated_at=now - timedelta(days=20), + ) + async_db_session.add(recent) + + # 5 days old - should be kept + very_recent = Advertisement( + node_id=node.id, + payload={}, + created_at=now - timedelta(days=5), + updated_at=now - timedelta(days=5), + ) + async_db_session.add(very_recent) + + await async_db_session.commit() + + # Run cleanup with 30-day retention + stats = await cleanup_old_data(async_db_session, retention_days=30, dry_run=False) + + # Should delete the 2 old ones, keep the 2 recent ones + assert stats.advertisements_deleted == 2 + assert stats.total_deleted == 2 + + # Verify count + from sqlalchemy import select, func + + adv_count = await async_db_session.scalar( + select(func.count()).select_from(Advertisement) + ) + assert adv_count == 2 + + +@pytest.mark.asyncio +async def test_cleanup_stats_repr() -> None: + """Test CleanupStats string representation.""" + stats = CleanupStats() + stats.advertisements_deleted = 10 + stats.messages_deleted = 5 + stats.telemetry_deleted = 3 + stats.trace_paths_deleted = 2 + stats.event_logs_deleted = 1 + stats.total_deleted = 21 + + repr_str = repr(stats) + assert "total=21" in repr_str + assert "advertisements=10" in repr_str + assert "messages=5" in repr_str