mirror of
https://github.com/MarekWo/mc-webui.git
synced 2026-03-28 17:42:45 +01:00
The MeshCore community uses "companion" not "client" for type 1 nodes. Rename the CLI label to COM across all UI, API, JS, and docs to align with official terminology. Includes cache migration for old CLI entries. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
676 lines
21 KiB
Python
676 lines
21 KiB
Python
"""
|
|
Archive manager - handles message archiving and scheduling
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime, time
|
|
from typing import List, Dict, Optional
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from app.config import config, runtime_config
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global scheduler instance
|
|
_scheduler: Optional[BackgroundScheduler] = None
|
|
|
|
# Job IDs
|
|
CLEANUP_JOB_ID = 'daily_cleanup'
|
|
RETENTION_JOB_ID = 'daily_retention'
|
|
BACKUP_JOB_ID = 'daily_backup'
|
|
|
|
# Module-level db reference (set by init_retention_schedule)
|
|
_db = None
|
|
|
|
|
|
def get_local_timezone_name() -> str:
|
|
"""
|
|
Get the local timezone name for display purposes.
|
|
Uses TZ environment variable if set, otherwise detects from system.
|
|
|
|
Returns:
|
|
Timezone name (e.g., 'Europe/Warsaw', 'UTC', 'CET')
|
|
"""
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# First check TZ environment variable
|
|
tz_env = os.environ.get('TZ')
|
|
if tz_env:
|
|
return tz_env
|
|
|
|
# Fall back to system timezone detection
|
|
try:
|
|
# Try to get timezone name from datetime
|
|
local_tz = datetime.now().astimezone().tzinfo
|
|
if local_tz:
|
|
tz_name = str(local_tz)
|
|
# Clean up timezone name if needed
|
|
if tz_name and tz_name != 'None':
|
|
return tz_name
|
|
except Exception:
|
|
pass
|
|
|
|
return 'local'
|
|
|
|
|
|
def get_archive_path(archive_date: str) -> Path:
|
|
"""
|
|
Get the path to an archive file for a specific date.
|
|
|
|
Args:
|
|
archive_date: Date in YYYY-MM-DD format
|
|
|
|
Returns:
|
|
Path to archive file
|
|
"""
|
|
archive_dir = config.archive_dir_path
|
|
filename = f"{runtime_config.get_device_name()}.{archive_date}.msgs"
|
|
return archive_dir / filename
|
|
|
|
|
|
def archive_messages(archive_date: Optional[str] = None) -> Dict[str, any]:
|
|
"""
|
|
Archive messages for a specific date by copying the .msgs file.
|
|
|
|
Args:
|
|
archive_date: Date to archive in YYYY-MM-DD format.
|
|
If None, uses yesterday's date.
|
|
|
|
Returns:
|
|
Dict with success status and details
|
|
"""
|
|
try:
|
|
# Determine date to archive
|
|
if archive_date is None:
|
|
from datetime import date, timedelta
|
|
yesterday = date.today() - timedelta(days=1)
|
|
archive_date = yesterday.strftime('%Y-%m-%d')
|
|
|
|
# Validate date format
|
|
try:
|
|
datetime.strptime(archive_date, '%Y-%m-%d')
|
|
except ValueError:
|
|
return {
|
|
'success': False,
|
|
'error': f'Invalid date format: {archive_date}. Expected YYYY-MM-DD'
|
|
}
|
|
|
|
# Ensure archive directory exists
|
|
archive_dir = config.archive_dir_path
|
|
archive_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Get source .msgs file
|
|
source_file = runtime_config.get_msgs_file_path()
|
|
if not source_file.exists():
|
|
logger.warning(f"Source messages file not found: {source_file}")
|
|
return {
|
|
'success': False,
|
|
'error': f'Messages file not found: {source_file}'
|
|
}
|
|
|
|
# Get destination archive file
|
|
dest_file = get_archive_path(archive_date)
|
|
|
|
# Check if archive already exists
|
|
if dest_file.exists():
|
|
logger.info(f"Archive already exists: {dest_file}")
|
|
return {
|
|
'success': True,
|
|
'message': f'Archive already exists for {archive_date}',
|
|
'archive_file': str(dest_file),
|
|
'exists': True
|
|
}
|
|
|
|
# Copy the file
|
|
shutil.copy2(source_file, dest_file)
|
|
|
|
# Get file size
|
|
file_size = dest_file.stat().st_size
|
|
|
|
logger.info(f"Archived messages to {dest_file} ({file_size} bytes)")
|
|
|
|
return {
|
|
'success': True,
|
|
'message': f'Successfully archived messages for {archive_date}',
|
|
'archive_file': str(dest_file),
|
|
'file_size': file_size,
|
|
'archive_date': archive_date
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error archiving messages: {e}", exc_info=True)
|
|
return {
|
|
'success': False,
|
|
'error': str(e)
|
|
}
|
|
|
|
|
|
def list_archives() -> List[Dict]:
|
|
"""
|
|
List all available archive files with metadata.
|
|
|
|
Returns:
|
|
List of archive info dicts, sorted by date (newest first)
|
|
"""
|
|
archives = []
|
|
|
|
try:
|
|
archive_dir = config.archive_dir_path
|
|
|
|
# Check if archive directory exists
|
|
if not archive_dir.exists():
|
|
logger.info(f"Archive directory does not exist: {archive_dir}")
|
|
return []
|
|
|
|
# Pattern: {device_name}.YYYY-MM-DD.msgs
|
|
pattern = f"{runtime_config.get_device_name()}.*.msgs"
|
|
|
|
for archive_file in archive_dir.glob(pattern):
|
|
try:
|
|
# Extract date from filename
|
|
# Format: DeviceName.YYYY-MM-DD.msgs
|
|
filename = archive_file.name
|
|
date_part = filename.replace(f"{runtime_config.get_device_name()}.", "").replace(".msgs", "")
|
|
|
|
# Validate date format
|
|
try:
|
|
datetime.strptime(date_part, '%Y-%m-%d')
|
|
except ValueError:
|
|
logger.warning(f"Invalid archive filename format: {filename}")
|
|
continue
|
|
|
|
# Get file stats
|
|
stats = archive_file.stat()
|
|
file_size = stats.st_size
|
|
|
|
# Count messages (read file)
|
|
message_count = _count_messages_in_file(archive_file)
|
|
|
|
archives.append({
|
|
'date': date_part,
|
|
'file_size': file_size,
|
|
'message_count': message_count,
|
|
'file_path': str(archive_file)
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error processing archive file {archive_file}: {e}")
|
|
continue
|
|
|
|
# Sort by date, newest first
|
|
archives.sort(key=lambda x: x['date'], reverse=True)
|
|
|
|
logger.info(f"Found {len(archives)} archive files")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing archives: {e}", exc_info=True)
|
|
|
|
return archives
|
|
|
|
|
|
def _count_messages_in_file(file_path: Path) -> int:
|
|
"""
|
|
Count the number of valid message lines in a file.
|
|
|
|
Args:
|
|
file_path: Path to the .msgs file
|
|
|
|
Returns:
|
|
Number of messages
|
|
"""
|
|
import json
|
|
|
|
count = 0
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
# Only count Public channel messages
|
|
if data.get('channel_idx', 0) == 0 and data.get('type') in ['CHAN', 'SENT_CHAN']:
|
|
count += 1
|
|
except json.JSONDecodeError:
|
|
continue
|
|
except Exception as e:
|
|
logger.warning(f"Error counting messages in {file_path}: {e}")
|
|
|
|
return count
|
|
|
|
|
|
def _archive_job():
|
|
"""
|
|
Background job that runs daily to archive messages.
|
|
This is called by the scheduler at midnight.
|
|
"""
|
|
logger.info("Running daily archive job...")
|
|
|
|
if not config.MC_ARCHIVE_ENABLED:
|
|
logger.info("Archiving is disabled, skipping")
|
|
return
|
|
|
|
result = archive_messages()
|
|
|
|
if result['success']:
|
|
logger.info(f"Archive job completed: {result.get('message', 'Success')}")
|
|
else:
|
|
logger.error(f"Archive job failed: {result.get('error', 'Unknown error')}")
|
|
|
|
|
|
def _cleanup_job():
|
|
"""
|
|
Background job that runs daily to clean up contacts.
|
|
Uses saved cleanup settings to filter and delete contacts.
|
|
"""
|
|
logger.info("Running daily cleanup job...")
|
|
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from app.routes.api import (
|
|
get_cleanup_settings,
|
|
_filter_contacts_by_criteria
|
|
)
|
|
from app.meshcore import cli
|
|
|
|
# Get cleanup settings
|
|
settings = get_cleanup_settings()
|
|
|
|
if not settings.get('enabled'):
|
|
logger.info("Auto-cleanup is disabled, skipping")
|
|
return
|
|
|
|
# Get contacts using the same method as preview-cleanup
|
|
logger.info("Fetching contacts from device...")
|
|
success, contacts_detailed, error = cli.get_contacts_with_last_seen()
|
|
logger.info(f"get_contacts_with_last_seen returned: success={success}, contacts_count={len(contacts_detailed)}, error={error}")
|
|
|
|
if not success:
|
|
logger.error(f"Failed to get contacts: {error}")
|
|
return
|
|
|
|
# Convert to list format (same as preview-cleanup endpoint)
|
|
type_labels = {1: 'COM', 2: 'REP', 3: 'ROOM', 4: 'SENS'}
|
|
contacts = []
|
|
for public_key, details in contacts_detailed.items():
|
|
contacts.append({
|
|
'public_key': public_key,
|
|
'name': details.get('adv_name', ''),
|
|
'type': details.get('type'),
|
|
'type_label': type_labels.get(details.get('type'), 'UNKNOWN'),
|
|
'last_advert': details.get('last_advert'),
|
|
'lastmod': details.get('lastmod'),
|
|
'out_path_len': details.get('out_path_len', -1),
|
|
'out_path': details.get('out_path', ''),
|
|
'adv_lat': details.get('adv_lat'),
|
|
'adv_lon': details.get('adv_lon')
|
|
})
|
|
|
|
logger.info(f"Converted {len(contacts)} contacts to list format")
|
|
|
|
if not contacts:
|
|
logger.info("No contacts found, nothing to clean up")
|
|
return
|
|
|
|
# Filter contacts using saved criteria
|
|
criteria = {
|
|
'types': settings.get('types', [1, 2, 3, 4]),
|
|
'date_field': settings.get('date_field', 'last_advert'),
|
|
'days': settings.get('days', 30),
|
|
'name_filter': settings.get('name_filter', '')
|
|
}
|
|
logger.info(f"Filter criteria: types={criteria['types']}, date_field={criteria['date_field']}, days={criteria['days']}, name_filter='{criteria['name_filter']}'")
|
|
|
|
# Filter contacts (this function internally excludes protected contacts)
|
|
matching_contacts = _filter_contacts_by_criteria(contacts, criteria)
|
|
|
|
if not matching_contacts:
|
|
logger.info("No contacts match cleanup criteria")
|
|
return
|
|
|
|
logger.info(f"Found {len(matching_contacts)} contacts to clean up")
|
|
|
|
# Delete matching contacts using cli.delete_contact()
|
|
# Add delay between deletions to avoid overwhelming the bridge on slower hardware
|
|
import time
|
|
DELETE_DELAY = 0.5 # seconds between deletions
|
|
MAX_RETRIES = 2 # retry failed deletions
|
|
|
|
deleted_count = 0
|
|
failed_contacts = []
|
|
|
|
for i, contact in enumerate(matching_contacts):
|
|
# Prefer public_key for deletion (more reliable than name)
|
|
selector = contact.get('public_key') or contact.get('name', '')
|
|
if not selector:
|
|
continue
|
|
|
|
contact_name = contact.get('name', selector)
|
|
|
|
# Try deletion with retries
|
|
for attempt in range(MAX_RETRIES + 1):
|
|
try:
|
|
success, message = cli.delete_contact(selector)
|
|
if success:
|
|
deleted_count += 1
|
|
logger.debug(f"Deleted contact: {contact_name}")
|
|
break
|
|
else:
|
|
if attempt < MAX_RETRIES:
|
|
logger.debug(f"Retry {attempt + 1} for {contact_name}: {message}")
|
|
time.sleep(DELETE_DELAY * 2) # longer delay before retry
|
|
else:
|
|
logger.warning(f"Failed to delete contact {contact_name}: {message}")
|
|
failed_contacts.append(contact_name)
|
|
except Exception as e:
|
|
if attempt < MAX_RETRIES and "Broken pipe" in str(e):
|
|
logger.debug(f"Retry {attempt + 1} for {contact_name} after Broken pipe")
|
|
time.sleep(DELETE_DELAY * 2) # longer delay before retry
|
|
else:
|
|
logger.warning(f"Error deleting contact {contact_name}: {e}")
|
|
failed_contacts.append(contact_name)
|
|
break
|
|
|
|
# Delay between deletions (skip after last one)
|
|
if i < len(matching_contacts) - 1:
|
|
time.sleep(DELETE_DELAY)
|
|
|
|
if failed_contacts:
|
|
logger.info(f"Cleanup job completed: deleted {deleted_count}/{len(matching_contacts)} contacts, {len(failed_contacts)} failed")
|
|
else:
|
|
logger.info(f"Cleanup job completed: deleted {deleted_count}/{len(matching_contacts)} contacts")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Cleanup job failed: {e}", exc_info=True)
|
|
|
|
|
|
def schedule_cleanup(enabled: bool, hour: int = 1) -> bool:
|
|
"""
|
|
Add or remove the cleanup job from the scheduler.
|
|
|
|
Args:
|
|
enabled: True to enable cleanup job, False to disable
|
|
hour: Hour (0-23, local time) at which to run cleanup job
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
global _scheduler
|
|
|
|
if _scheduler is None:
|
|
logger.warning("Scheduler not initialized, cannot schedule cleanup")
|
|
return False
|
|
|
|
try:
|
|
if enabled:
|
|
# Validate hour
|
|
if not isinstance(hour, int) or hour < 0 or hour > 23:
|
|
hour = 1
|
|
|
|
# Add cleanup job at specified hour (local time)
|
|
trigger = CronTrigger(hour=hour, minute=0)
|
|
|
|
_scheduler.add_job(
|
|
func=_cleanup_job,
|
|
trigger=trigger,
|
|
id=CLEANUP_JOB_ID,
|
|
name='Daily Contact Cleanup',
|
|
replace_existing=True
|
|
)
|
|
|
|
tz_name = get_local_timezone_name()
|
|
logger.info(f"Cleanup job scheduled - will run daily at {hour:02d}:00 ({tz_name})")
|
|
else:
|
|
# Remove cleanup job if it exists
|
|
try:
|
|
_scheduler.remove_job(CLEANUP_JOB_ID)
|
|
logger.info("Cleanup job removed from scheduler")
|
|
except Exception:
|
|
# Job might not exist, that's OK
|
|
pass
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scheduling cleanup: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def init_cleanup_schedule():
|
|
"""
|
|
Initialize cleanup schedule from saved settings.
|
|
Called at startup after scheduler is started.
|
|
"""
|
|
try:
|
|
# Import here to avoid circular imports
|
|
from app.routes.api import get_cleanup_settings
|
|
|
|
settings = get_cleanup_settings()
|
|
|
|
if settings.get('enabled'):
|
|
hour = settings.get('hour', 1)
|
|
schedule_cleanup(enabled=True, hour=hour)
|
|
tz_name = get_local_timezone_name()
|
|
logger.info(f"Auto-cleanup enabled from saved settings (hour={hour:02d}:00 {tz_name})")
|
|
else:
|
|
logger.info("Auto-cleanup is disabled in saved settings")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing cleanup schedule: {e}", exc_info=True)
|
|
|
|
|
|
def _retention_job():
|
|
"""Background job that runs daily to delete old messages from DB."""
|
|
logger.info("Running daily retention job...")
|
|
|
|
try:
|
|
from app.routes.api import get_retention_settings
|
|
|
|
settings = get_retention_settings()
|
|
|
|
if not settings.get('enabled'):
|
|
logger.info("Message retention is disabled, skipping")
|
|
return
|
|
|
|
if _db is None:
|
|
logger.error("Database not available for retention job")
|
|
return
|
|
|
|
days = settings.get('days', 90)
|
|
include_dms = settings.get('include_dms', False)
|
|
include_adverts = settings.get('include_adverts', False)
|
|
|
|
result = _db.cleanup_old_messages(
|
|
days=days,
|
|
include_dms=include_dms,
|
|
include_adverts=include_adverts
|
|
)
|
|
|
|
total = sum(result.values())
|
|
logger.info(f"Retention job completed: {total} rows deleted ({result})")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Retention job failed: {e}", exc_info=True)
|
|
|
|
|
|
def schedule_retention(enabled: bool, hour: int = 2) -> bool:
|
|
"""Add or remove the retention job from the scheduler."""
|
|
global _scheduler
|
|
|
|
if _scheduler is None:
|
|
logger.warning("Scheduler not initialized, cannot schedule retention")
|
|
return False
|
|
|
|
try:
|
|
if enabled:
|
|
if not isinstance(hour, int) or hour < 0 or hour > 23:
|
|
hour = 2
|
|
|
|
trigger = CronTrigger(hour=hour, minute=30)
|
|
|
|
_scheduler.add_job(
|
|
func=_retention_job,
|
|
trigger=trigger,
|
|
id=RETENTION_JOB_ID,
|
|
name='Daily Message Retention',
|
|
replace_existing=True
|
|
)
|
|
|
|
tz_name = get_local_timezone_name()
|
|
logger.info(f"Retention job scheduled - will run daily at {hour:02d}:30 ({tz_name})")
|
|
else:
|
|
try:
|
|
_scheduler.remove_job(RETENTION_JOB_ID)
|
|
logger.info("Retention job removed from scheduler")
|
|
except Exception:
|
|
pass
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scheduling retention: {e}", exc_info=True)
|
|
return False
|
|
|
|
|
|
def init_retention_schedule(db=None):
|
|
"""Initialize retention schedule from saved settings. Call at startup."""
|
|
global _db
|
|
|
|
if db is not None:
|
|
_db = db
|
|
|
|
try:
|
|
from app.routes.api import get_retention_settings
|
|
|
|
settings = get_retention_settings()
|
|
|
|
if settings.get('enabled'):
|
|
hour = settings.get('hour', 2)
|
|
schedule_retention(enabled=True, hour=hour)
|
|
tz_name = get_local_timezone_name()
|
|
logger.info(f"Message retention enabled from saved settings (hour={hour:02d}:30 {tz_name})")
|
|
else:
|
|
logger.info("Message retention is disabled in saved settings")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing retention schedule: {e}", exc_info=True)
|
|
|
|
|
|
def schedule_daily_archiving():
|
|
"""
|
|
Initialize and start the background scheduler for daily archiving.
|
|
Runs at midnight (00:00) local time.
|
|
"""
|
|
global _scheduler
|
|
|
|
if not config.MC_ARCHIVE_ENABLED:
|
|
logger.info("Archiving is disabled in configuration")
|
|
return
|
|
|
|
if _scheduler is not None:
|
|
logger.warning("Scheduler already initialized")
|
|
return
|
|
|
|
try:
|
|
# Use local timezone (from TZ env variable or system default)
|
|
tz_name = get_local_timezone_name()
|
|
|
|
_scheduler = BackgroundScheduler(
|
|
daemon=True
|
|
# No timezone specified = uses system local timezone
|
|
)
|
|
|
|
# Schedule job for midnight every day (local time)
|
|
trigger = CronTrigger(hour=0, minute=0)
|
|
|
|
_scheduler.add_job(
|
|
func=_archive_job,
|
|
trigger=trigger,
|
|
id='daily_archive',
|
|
name='Daily Message Archive',
|
|
replace_existing=True
|
|
)
|
|
|
|
_scheduler.start()
|
|
|
|
logger.info(f"Archive scheduler started - will run daily at 00:00 ({tz_name})")
|
|
|
|
# Initialize cleanup schedule from saved settings
|
|
init_cleanup_schedule()
|
|
|
|
# Initialize backup schedule
|
|
init_backup_schedule()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start archive scheduler: {e}", exc_info=True)
|
|
|
|
|
|
def init_backup_schedule():
|
|
"""Initialize daily backup job from config."""
|
|
global _scheduler
|
|
|
|
if _scheduler is None:
|
|
return
|
|
|
|
if not config.MC_BACKUP_ENABLED:
|
|
logger.info("Backup is disabled in configuration")
|
|
return
|
|
|
|
try:
|
|
backup_hour = config.MC_BACKUP_HOUR
|
|
trigger = CronTrigger(hour=backup_hour, minute=0)
|
|
backup_dir = Path(config.MC_CONFIG_DIR) / 'backups'
|
|
|
|
_scheduler.add_job(
|
|
func=_backup_job,
|
|
trigger=trigger,
|
|
id=BACKUP_JOB_ID,
|
|
name='Daily Database Backup',
|
|
replace_existing=True,
|
|
args=[backup_dir]
|
|
)
|
|
logger.info(f"Backup schedule initialized: daily at {backup_hour:02d}:00")
|
|
except Exception as e:
|
|
logger.error(f"Error scheduling backup: {e}", exc_info=True)
|
|
|
|
|
|
def _backup_job(backup_dir):
|
|
"""Execute daily backup and cleanup old backups."""
|
|
global _db
|
|
if _db is None:
|
|
logger.warning("No database reference for backup")
|
|
return
|
|
|
|
try:
|
|
backup_path = _db.create_backup(backup_dir)
|
|
logger.info(f"Daily backup completed: {backup_path}")
|
|
|
|
removed = _db.cleanup_old_backups(backup_dir, config.MC_BACKUP_RETENTION_DAYS)
|
|
if removed > 0:
|
|
logger.info(f"Cleaned up {removed} old backup(s)")
|
|
except Exception as e:
|
|
logger.error(f"Backup job failed: {e}", exc_info=True)
|
|
|
|
|
|
def stop_scheduler():
|
|
"""
|
|
Stop the background scheduler.
|
|
Called during application shutdown.
|
|
"""
|
|
global _scheduler
|
|
|
|
if _scheduler is not None:
|
|
try:
|
|
_scheduler.shutdown(wait=False)
|
|
logger.info("Archive scheduler stopped")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping scheduler: {e}")
|
|
finally:
|
|
_scheduler = None
|