feat(retention): add message retention scheduling (Task 2.6)

- Add daily retention job that deletes old channel messages, DMs, and
  advertisements based on configurable age threshold
- Add GET/POST /api/retention-settings endpoints
- Extend cleanup_old_messages() to optionally include DMs and adverts
- Wire up APScheduler in create_app() (also enables existing archiving
  and contact cleanup schedulers that were never started in v2)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
MarekWo
2026-03-01 17:28:54 +01:00
parent d89e276054
commit b034a181ce
5 changed files with 230 additions and 5 deletions

View File

@@ -6,12 +6,16 @@ from app.archiver.manager import (
archive_messages,
list_archives,
get_archive_path,
schedule_daily_archiving
schedule_daily_archiving,
schedule_retention,
init_retention_schedule
)
__all__ = [
'archive_messages',
'list_archives',
'get_archive_path',
'schedule_daily_archiving'
'schedule_daily_archiving',
'schedule_retention',
'init_retention_schedule'
]

View File

@@ -20,6 +20,10 @@ _scheduler: Optional[BackgroundScheduler] = None
# Job IDs
CLEANUP_JOB_ID = 'daily_cleanup'
RETENTION_JOB_ID = 'daily_retention'
# Module-level db reference (set by init_retention_schedule)
_db = None
def get_local_timezone_name() -> str:
@@ -460,6 +464,103 @@ def init_cleanup_schedule():
logger.error(f"Error initializing cleanup schedule: {e}", exc_info=True)
def _retention_job():
"""Background job that runs daily to delete old messages from DB."""
logger.info("Running daily retention job...")
try:
from app.routes.api import get_retention_settings
settings = get_retention_settings()
if not settings.get('enabled'):
logger.info("Message retention is disabled, skipping")
return
if _db is None:
logger.error("Database not available for retention job")
return
days = settings.get('days', 90)
include_dms = settings.get('include_dms', False)
include_adverts = settings.get('include_adverts', False)
result = _db.cleanup_old_messages(
days=days,
include_dms=include_dms,
include_adverts=include_adverts
)
total = sum(result.values())
logger.info(f"Retention job completed: {total} rows deleted ({result})")
except Exception as e:
logger.error(f"Retention job failed: {e}", exc_info=True)
def schedule_retention(enabled: bool, hour: int = 2) -> bool:
"""Add or remove the retention job from the scheduler."""
global _scheduler
if _scheduler is None:
logger.warning("Scheduler not initialized, cannot schedule retention")
return False
try:
if enabled:
if not isinstance(hour, int) or hour < 0 or hour > 23:
hour = 2
trigger = CronTrigger(hour=hour, minute=30)
_scheduler.add_job(
func=_retention_job,
trigger=trigger,
id=RETENTION_JOB_ID,
name='Daily Message Retention',
replace_existing=True
)
tz_name = get_local_timezone_name()
logger.info(f"Retention job scheduled - will run daily at {hour:02d}:30 ({tz_name})")
else:
try:
_scheduler.remove_job(RETENTION_JOB_ID)
logger.info("Retention job removed from scheduler")
except Exception:
pass
return True
except Exception as e:
logger.error(f"Error scheduling retention: {e}", exc_info=True)
return False
def init_retention_schedule(db=None):
"""Initialize retention schedule from saved settings. Call at startup."""
global _db
if db is not None:
_db = db
try:
from app.routes.api import get_retention_settings
settings = get_retention_settings()
if settings.get('enabled'):
hour = settings.get('hour', 2)
schedule_retention(enabled=True, hour=hour)
tz_name = get_local_timezone_name()
logger.info(f"Message retention enabled from saved settings (hour={hour:02d}:30 {tz_name})")
else:
logger.info("Message retention is disabled in saved settings")
except Exception as e:
logger.error(f"Error initializing retention schedule: {e}", exc_info=True)
def schedule_daily_archiving():
"""
Initialize and start the background scheduler for daily archiving.

View File

@@ -515,14 +515,30 @@ class Database:
stats['db_size_bytes'] = self.db_path.stat().st_size if self.db_path.exists() else 0
return stats
def cleanup_old_messages(self, days: int) -> int:
"""Delete channel messages older than N days. Returns count deleted."""
def cleanup_old_messages(self, days: int, include_dms: bool = False,
include_adverts: bool = False) -> dict:
"""Delete messages older than N days. Returns counts per table."""
cutoff = int((datetime.now() - timedelta(days=days)).timestamp())
result = {}
with self._connect() as conn:
cursor = conn.execute(
"DELETE FROM channel_messages WHERE timestamp < ?", (cutoff,)
)
return cursor.rowcount
result['channel_messages'] = cursor.rowcount
if include_dms:
cursor = conn.execute(
"DELETE FROM direct_messages WHERE timestamp < ?", (cutoff,)
)
result['direct_messages'] = cursor.rowcount
if include_adverts:
cursor = conn.execute(
"DELETE FROM advertisements WHERE timestamp < ?", (cutoff,)
)
result['advertisements'] = cursor.rowcount
return result
# ================================================================
# Backup

View File

@@ -110,6 +110,11 @@ def create_app():
threading.Thread(target=_wait_for_device_name, daemon=True).start()
# Start background scheduler (archiving, contact cleanup, message retention)
from app.archiver.manager import schedule_daily_archiving, init_retention_schedule
schedule_daily_archiving()
init_retention_schedule(db=db)
logger.info(f"mc-webui v2 started — transport: {'TCP' if config.use_tcp else 'serial'}")
logger.info(f"Database: {config.db_path}")

View File

@@ -310,6 +310,56 @@ def save_cleanup_settings(cleanup_settings: dict) -> bool:
return False
def get_retention_settings() -> dict:
"""Get message retention settings from .webui_settings.json."""
from pathlib import Path
defaults = {
'enabled': False,
'days': 90,
'include_dms': False,
'include_adverts': False,
'hour': 2
}
settings_path = Path(config.MC_CONFIG_DIR) / ".webui_settings.json"
try:
if not settings_path.exists():
return defaults
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
retention = settings.get('retention_settings', {})
return {**defaults, **retention}
except Exception as e:
logger.error(f"Failed to read retention settings: {e}")
return defaults
def save_retention_settings(retention_settings: dict) -> bool:
"""Save message retention settings to .webui_settings.json (atomic write)."""
from pathlib import Path
settings_path = Path(config.MC_CONFIG_DIR) / ".webui_settings.json"
try:
settings = {}
if settings_path.exists():
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
settings['retention_settings'] = retention_settings
temp_file = settings_path.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=2, ensure_ascii=False)
temp_file.replace(settings_path)
return True
except Exception as e:
logger.error(f"Failed to save retention settings: {e}")
return False
@api_bp.route('/messages', methods=['GET'])
def get_messages():
"""
@@ -2696,6 +2746,55 @@ def update_device_settings_api():
}), 500
# =============================================================================
# Message Retention Settings
# =============================================================================
@api_bp.route('/retention-settings', methods=['GET'])
def get_retention_settings_api():
"""Get current message retention settings."""
try:
from app.archiver.manager import get_local_timezone_name
settings = get_retention_settings()
settings['timezone'] = get_local_timezone_name()
return jsonify({'success': True, **settings})
except Exception as e:
logger.error(f"Error getting retention settings: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@api_bp.route('/retention-settings', methods=['POST'])
def update_retention_settings_api():
"""Update message retention settings and reschedule job."""
try:
data = request.get_json()
if not data:
return jsonify({'success': False, 'error': 'No data provided'}), 400
current = get_retention_settings()
current['enabled'] = data.get('enabled', current['enabled'])
current['days'] = max(1, min(data.get('days', current['days']), 3650))
current['include_dms'] = data.get('include_dms', current['include_dms'])
current['include_adverts'] = data.get('include_adverts', current['include_adverts'])
current['hour'] = max(0, min(data.get('hour', current['hour']), 23))
if not save_retention_settings(current):
return jsonify({'success': False, 'error': 'Failed to save settings'}), 500
from app.archiver.manager import schedule_retention
schedule_retention(enabled=current['enabled'], hour=current['hour'])
return jsonify({
'success': True,
'message': f"Retention {'enabled' if current['enabled'] else 'disabled'}",
'settings': current
})
except Exception as e:
logger.error(f"Error updating retention settings: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
# =============================================================================
# Read Status (Server-side message read tracking)
# =============================================================================