Files
mc-webui/app/archiver/manager.py
MarekWo 33a71bed17 refactor(ui): rename contact type label CLI to COM (companion)
The MeshCore community uses "companion" not "client" for type 1 nodes.
Rename the CLI label to COM across all UI, API, JS, and docs to align
with official terminology. Includes cache migration for old CLI entries.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 14:37:30 +01:00

676 lines
21 KiB
Python

"""
Archive manager - handles message archiving and scheduling
"""
import os
import shutil
import logging
from pathlib import Path
from datetime import datetime, time
from typing import List, Dict, Optional
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from app.config import config, runtime_config
logger = logging.getLogger(__name__)
# Global scheduler instance
_scheduler: Optional[BackgroundScheduler] = None
# Job IDs
CLEANUP_JOB_ID = 'daily_cleanup'
RETENTION_JOB_ID = 'daily_retention'
BACKUP_JOB_ID = 'daily_backup'
# Module-level db reference (set by init_retention_schedule)
_db = None
def get_local_timezone_name() -> str:
"""
Get the local timezone name for display purposes.
Uses TZ environment variable if set, otherwise detects from system.
Returns:
Timezone name (e.g., 'Europe/Warsaw', 'UTC', 'CET')
"""
import os
from datetime import datetime
# First check TZ environment variable
tz_env = os.environ.get('TZ')
if tz_env:
return tz_env
# Fall back to system timezone detection
try:
# Try to get timezone name from datetime
local_tz = datetime.now().astimezone().tzinfo
if local_tz:
tz_name = str(local_tz)
# Clean up timezone name if needed
if tz_name and tz_name != 'None':
return tz_name
except Exception:
pass
return 'local'
def get_archive_path(archive_date: str) -> Path:
"""
Get the path to an archive file for a specific date.
Args:
archive_date: Date in YYYY-MM-DD format
Returns:
Path to archive file
"""
archive_dir = config.archive_dir_path
filename = f"{runtime_config.get_device_name()}.{archive_date}.msgs"
return archive_dir / filename
def archive_messages(archive_date: Optional[str] = None) -> Dict[str, any]:
"""
Archive messages for a specific date by copying the .msgs file.
Args:
archive_date: Date to archive in YYYY-MM-DD format.
If None, uses yesterday's date.
Returns:
Dict with success status and details
"""
try:
# Determine date to archive
if archive_date is None:
from datetime import date, timedelta
yesterday = date.today() - timedelta(days=1)
archive_date = yesterday.strftime('%Y-%m-%d')
# Validate date format
try:
datetime.strptime(archive_date, '%Y-%m-%d')
except ValueError:
return {
'success': False,
'error': f'Invalid date format: {archive_date}. Expected YYYY-MM-DD'
}
# Ensure archive directory exists
archive_dir = config.archive_dir_path
archive_dir.mkdir(parents=True, exist_ok=True)
# Get source .msgs file
source_file = runtime_config.get_msgs_file_path()
if not source_file.exists():
logger.warning(f"Source messages file not found: {source_file}")
return {
'success': False,
'error': f'Messages file not found: {source_file}'
}
# Get destination archive file
dest_file = get_archive_path(archive_date)
# Check if archive already exists
if dest_file.exists():
logger.info(f"Archive already exists: {dest_file}")
return {
'success': True,
'message': f'Archive already exists for {archive_date}',
'archive_file': str(dest_file),
'exists': True
}
# Copy the file
shutil.copy2(source_file, dest_file)
# Get file size
file_size = dest_file.stat().st_size
logger.info(f"Archived messages to {dest_file} ({file_size} bytes)")
return {
'success': True,
'message': f'Successfully archived messages for {archive_date}',
'archive_file': str(dest_file),
'file_size': file_size,
'archive_date': archive_date
}
except Exception as e:
logger.error(f"Error archiving messages: {e}", exc_info=True)
return {
'success': False,
'error': str(e)
}
def list_archives() -> List[Dict]:
"""
List all available archive files with metadata.
Returns:
List of archive info dicts, sorted by date (newest first)
"""
archives = []
try:
archive_dir = config.archive_dir_path
# Check if archive directory exists
if not archive_dir.exists():
logger.info(f"Archive directory does not exist: {archive_dir}")
return []
# Pattern: {device_name}.YYYY-MM-DD.msgs
pattern = f"{runtime_config.get_device_name()}.*.msgs"
for archive_file in archive_dir.glob(pattern):
try:
# Extract date from filename
# Format: DeviceName.YYYY-MM-DD.msgs
filename = archive_file.name
date_part = filename.replace(f"{runtime_config.get_device_name()}.", "").replace(".msgs", "")
# Validate date format
try:
datetime.strptime(date_part, '%Y-%m-%d')
except ValueError:
logger.warning(f"Invalid archive filename format: {filename}")
continue
# Get file stats
stats = archive_file.stat()
file_size = stats.st_size
# Count messages (read file)
message_count = _count_messages_in_file(archive_file)
archives.append({
'date': date_part,
'file_size': file_size,
'message_count': message_count,
'file_path': str(archive_file)
})
except Exception as e:
logger.warning(f"Error processing archive file {archive_file}: {e}")
continue
# Sort by date, newest first
archives.sort(key=lambda x: x['date'], reverse=True)
logger.info(f"Found {len(archives)} archive files")
except Exception as e:
logger.error(f"Error listing archives: {e}", exc_info=True)
return archives
def _count_messages_in_file(file_path: Path) -> int:
"""
Count the number of valid message lines in a file.
Args:
file_path: Path to the .msgs file
Returns:
Number of messages
"""
import json
count = 0
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
# Only count Public channel messages
if data.get('channel_idx', 0) == 0 and data.get('type') in ['CHAN', 'SENT_CHAN']:
count += 1
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning(f"Error counting messages in {file_path}: {e}")
return count
def _archive_job():
"""
Background job that runs daily to archive messages.
This is called by the scheduler at midnight.
"""
logger.info("Running daily archive job...")
if not config.MC_ARCHIVE_ENABLED:
logger.info("Archiving is disabled, skipping")
return
result = archive_messages()
if result['success']:
logger.info(f"Archive job completed: {result.get('message', 'Success')}")
else:
logger.error(f"Archive job failed: {result.get('error', 'Unknown error')}")
def _cleanup_job():
"""
Background job that runs daily to clean up contacts.
Uses saved cleanup settings to filter and delete contacts.
"""
logger.info("Running daily cleanup job...")
try:
# Import here to avoid circular imports
from app.routes.api import (
get_cleanup_settings,
_filter_contacts_by_criteria
)
from app.meshcore import cli
# Get cleanup settings
settings = get_cleanup_settings()
if not settings.get('enabled'):
logger.info("Auto-cleanup is disabled, skipping")
return
# Get contacts using the same method as preview-cleanup
logger.info("Fetching contacts from device...")
success, contacts_detailed, error = cli.get_contacts_with_last_seen()
logger.info(f"get_contacts_with_last_seen returned: success={success}, contacts_count={len(contacts_detailed)}, error={error}")
if not success:
logger.error(f"Failed to get contacts: {error}")
return
# Convert to list format (same as preview-cleanup endpoint)
type_labels = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}
contacts = []
for public_key, details in contacts_detailed.items():
contacts.append({
'public_key': public_key,
'name': details.get('adv_name', ''),
'type': details.get('type'),
'type_label': type_labels.get(details.get('type'), 'UNKNOWN'),
'last_advert': details.get('last_advert'),
'lastmod': details.get('lastmod'),
'out_path_len': details.get('out_path_len', -1),
'out_path': details.get('out_path', ''),
'adv_lat': details.get('adv_lat'),
'adv_lon': details.get('adv_lon')
})
logger.info(f"Converted {len(contacts)} contacts to list format")
if not contacts:
logger.info("No contacts found, nothing to clean up")
return
# Filter contacts using saved criteria
criteria = {
'types': settings.get('types', [1, 2, 3, 4]),
'date_field': settings.get('date_field', 'last_advert'),
'days': settings.get('days', 30),
'name_filter': settings.get('name_filter', '')
}
logger.info(f"Filter criteria: types={criteria['types']}, date_field={criteria['date_field']}, days={criteria['days']}, name_filter='{criteria['name_filter']}'")
# Filter contacts (this function internally excludes protected contacts)
matching_contacts = _filter_contacts_by_criteria(contacts, criteria)
if not matching_contacts:
logger.info("No contacts match cleanup criteria")
return
logger.info(f"Found {len(matching_contacts)} contacts to clean up")
# Delete matching contacts using cli.delete_contact()
# Add delay between deletions to avoid overwhelming the bridge on slower hardware
import time
DELETE_DELAY = 0.5 # seconds between deletions
MAX_RETRIES = 2 # retry failed deletions
deleted_count = 0
failed_contacts = []
for i, contact in enumerate(matching_contacts):
# Prefer public_key for deletion (more reliable than name)
selector = contact.get('public_key') or contact.get('name', '')
if not selector:
continue
contact_name = contact.get('name', selector)
# Try deletion with retries
for attempt in range(MAX_RETRIES + 1):
try:
success, message = cli.delete_contact(selector)
if success:
deleted_count += 1
logger.debug(f"Deleted contact: {contact_name}")
break
else:
if attempt < MAX_RETRIES:
logger.debug(f"Retry {attempt + 1} for {contact_name}: {message}")
time.sleep(DELETE_DELAY * 2) # longer delay before retry
else:
logger.warning(f"Failed to delete contact {contact_name}: {message}")
failed_contacts.append(contact_name)
except Exception as e:
if attempt < MAX_RETRIES and "Broken pipe" in str(e):
logger.debug(f"Retry {attempt + 1} for {contact_name} after Broken pipe")
time.sleep(DELETE_DELAY * 2) # longer delay before retry
else:
logger.warning(f"Error deleting contact {contact_name}: {e}")
failed_contacts.append(contact_name)
break
# Delay between deletions (skip after last one)
if i < len(matching_contacts) - 1:
time.sleep(DELETE_DELAY)
if failed_contacts:
logger.info(f"Cleanup job completed: deleted {deleted_count}/{len(matching_contacts)} contacts, {len(failed_contacts)} failed")
else:
logger.info(f"Cleanup job completed: deleted {deleted_count}/{len(matching_contacts)} contacts")
except Exception as e:
logger.error(f"Cleanup job failed: {e}", exc_info=True)
def schedule_cleanup(enabled: bool, hour: int = 1) -> bool:
"""
Add or remove the cleanup job from the scheduler.
Args:
enabled: True to enable cleanup job, False to disable
hour: Hour (0-23, local time) at which to run cleanup job
Returns:
True if successful, False otherwise
"""
global _scheduler
if _scheduler is None:
logger.warning("Scheduler not initialized, cannot schedule cleanup")
return False
try:
if enabled:
# Validate hour
if not isinstance(hour, int) or hour < 0 or hour > 23:
hour = 1
# Add cleanup job at specified hour (local time)
trigger = CronTrigger(hour=hour, minute=0)
_scheduler.add_job(
func=_cleanup_job,
trigger=trigger,
id=CLEANUP_JOB_ID,
name='Daily Contact Cleanup',
replace_existing=True
)
tz_name = get_local_timezone_name()
logger.info(f"Cleanup job scheduled - will run daily at {hour:02d}:00 ({tz_name})")
else:
# Remove cleanup job if it exists
try:
_scheduler.remove_job(CLEANUP_JOB_ID)
logger.info("Cleanup job removed from scheduler")
except Exception:
# Job might not exist, that's OK
pass
return True
except Exception as e:
logger.error(f"Error scheduling cleanup: {e}", exc_info=True)
return False
def init_cleanup_schedule():
"""
Initialize cleanup schedule from saved settings.
Called at startup after scheduler is started.
"""
try:
# Import here to avoid circular imports
from app.routes.api import get_cleanup_settings
settings = get_cleanup_settings()
if settings.get('enabled'):
hour = settings.get('hour', 1)
schedule_cleanup(enabled=True, hour=hour)
tz_name = get_local_timezone_name()
logger.info(f"Auto-cleanup enabled from saved settings (hour={hour:02d}:00 {tz_name})")
else:
logger.info("Auto-cleanup is disabled in saved settings")
except Exception as e:
logger.error(f"Error initializing cleanup schedule: {e}", exc_info=True)
def _retention_job():
"""Background job that runs daily to delete old messages from DB."""
logger.info("Running daily retention job...")
try:
from app.routes.api import get_retention_settings
settings = get_retention_settings()
if not settings.get('enabled'):
logger.info("Message retention is disabled, skipping")
return
if _db is None:
logger.error("Database not available for retention job")
return
days = settings.get('days', 90)
include_dms = settings.get('include_dms', False)
include_adverts = settings.get('include_adverts', False)
result = _db.cleanup_old_messages(
days=days,
include_dms=include_dms,
include_adverts=include_adverts
)
total = sum(result.values())
logger.info(f"Retention job completed: {total} rows deleted ({result})")
except Exception as e:
logger.error(f"Retention job failed: {e}", exc_info=True)
def schedule_retention(enabled: bool, hour: int = 2) -> bool:
"""Add or remove the retention job from the scheduler."""
global _scheduler
if _scheduler is None:
logger.warning("Scheduler not initialized, cannot schedule retention")
return False
try:
if enabled:
if not isinstance(hour, int) or hour < 0 or hour > 23:
hour = 2
trigger = CronTrigger(hour=hour, minute=30)
_scheduler.add_job(
func=_retention_job,
trigger=trigger,
id=RETENTION_JOB_ID,
name='Daily Message Retention',
replace_existing=True
)
tz_name = get_local_timezone_name()
logger.info(f"Retention job scheduled - will run daily at {hour:02d}:30 ({tz_name})")
else:
try:
_scheduler.remove_job(RETENTION_JOB_ID)
logger.info("Retention job removed from scheduler")
except Exception:
pass
return True
except Exception as e:
logger.error(f"Error scheduling retention: {e}", exc_info=True)
return False
def init_retention_schedule(db=None):
"""Initialize retention schedule from saved settings. Call at startup."""
global _db
if db is not None:
_db = db
try:
from app.routes.api import get_retention_settings
settings = get_retention_settings()
if settings.get('enabled'):
hour = settings.get('hour', 2)
schedule_retention(enabled=True, hour=hour)
tz_name = get_local_timezone_name()
logger.info(f"Message retention enabled from saved settings (hour={hour:02d}:30 {tz_name})")
else:
logger.info("Message retention is disabled in saved settings")
except Exception as e:
logger.error(f"Error initializing retention schedule: {e}", exc_info=True)
def schedule_daily_archiving():
"""
Initialize and start the background scheduler for daily archiving.
Runs at midnight (00:00) local time.
"""
global _scheduler
if not config.MC_ARCHIVE_ENABLED:
logger.info("Archiving is disabled in configuration")
return
if _scheduler is not None:
logger.warning("Scheduler already initialized")
return
try:
# Use local timezone (from TZ env variable or system default)
tz_name = get_local_timezone_name()
_scheduler = BackgroundScheduler(
daemon=True
# No timezone specified = uses system local timezone
)
# Schedule job for midnight every day (local time)
trigger = CronTrigger(hour=0, minute=0)
_scheduler.add_job(
func=_archive_job,
trigger=trigger,
id='daily_archive',
name='Daily Message Archive',
replace_existing=True
)
_scheduler.start()
logger.info(f"Archive scheduler started - will run daily at 00:00 ({tz_name})")
# Initialize cleanup schedule from saved settings
init_cleanup_schedule()
# Initialize backup schedule
init_backup_schedule()
except Exception as e:
logger.error(f"Failed to start archive scheduler: {e}", exc_info=True)
def init_backup_schedule():
"""Initialize daily backup job from config."""
global _scheduler
if _scheduler is None:
return
if not config.MC_BACKUP_ENABLED:
logger.info("Backup is disabled in configuration")
return
try:
backup_hour = config.MC_BACKUP_HOUR
trigger = CronTrigger(hour=backup_hour, minute=0)
backup_dir = Path(config.MC_CONFIG_DIR) / 'backups'
_scheduler.add_job(
func=_backup_job,
trigger=trigger,
id=BACKUP_JOB_ID,
name='Daily Database Backup',
replace_existing=True,
args=[backup_dir]
)
logger.info(f"Backup schedule initialized: daily at {backup_hour:02d}:00")
except Exception as e:
logger.error(f"Error scheduling backup: {e}", exc_info=True)
def _backup_job(backup_dir):
"""Execute daily backup and cleanup old backups."""
global _db
if _db is None:
logger.warning("No database reference for backup")
return
try:
backup_path = _db.create_backup(backup_dir)
logger.info(f"Daily backup completed: {backup_path}")
removed = _db.cleanup_old_backups(backup_dir, config.MC_BACKUP_RETENTION_DAYS)
if removed > 0:
logger.info(f"Cleaned up {removed} old backup(s)")
except Exception as e:
logger.error(f"Backup job failed: {e}", exc_info=True)
def stop_scheduler():
"""
Stop the background scheduler.
Called during application shutdown.
"""
global _scheduler
if _scheduler is not None:
try:
_scheduler.shutdown(wait=False)
logger.info("Archive scheduler stopped")
except Exception as e:
logger.error(f"Error stopping scheduler: {e}")
finally:
_scheduler = None