mirror of
https://github.com/pablorevilla-meshtastic/meshview.git
synced 2026-03-04 23:27:46 +01:00
139 lines
5.3 KiB
Python
139 lines
5.3 KiB
Python
import asyncio
|
|
import json
|
|
import datetime
|
|
import logging
|
|
import aiosqlite
|
|
from meshview import mqtt_reader
|
|
from meshview import mqtt_database
|
|
from meshview import mqtt_store
|
|
from meshview.config import CONFIG # <-- use your existing config.py
|
|
|
|
# -------------------------
|
|
# Logging for cleanup
|
|
# -------------------------
|
|
cleanup_logger = logging.getLogger("dbcleanup")
|
|
cleanup_logger.setLevel(logging.INFO)
|
|
file_handler = logging.FileHandler("dbcleanup.log")
|
|
file_handler.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
|
|
file_handler.setFormatter(formatter)
|
|
cleanup_logger.addHandler(file_handler)
|
|
|
|
# -------------------------
|
|
# Helper functions
|
|
# -------------------------
|
|
def get_bool(config, section, key, default=False):
|
|
return config.get(section, {}).get(key, str(default)).lower() in ("1", "true", "yes", "on")
|
|
|
|
def get_int(config, section, key, default=0):
|
|
try:
|
|
return int(config.get(section, {}).get(key, default))
|
|
except ValueError:
|
|
return default
|
|
|
|
# -------------------------
|
|
# Database cleanup using aiosqlite with batching
|
|
# -------------------------
|
|
async def daily_cleanup_at(db_file: str, hour: int = 2, minute: int = 0, days_to_keep: int = 14, vacuum_db: bool = True, batch_size: int = 100):
|
|
tables = {
|
|
"packet": "import_time",
|
|
"packet_seen": "import_time",
|
|
"traceroute": "import_time",
|
|
"node": "last_update"
|
|
}
|
|
|
|
while True:
|
|
now = datetime.datetime.now()
|
|
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
|
if next_run <= now:
|
|
next_run += datetime.timedelta(days=1)
|
|
delay = (next_run - now).total_seconds()
|
|
cleanup_logger.info(f"Next cleanup scheduled at {next_run}")
|
|
await asyncio.sleep(delay)
|
|
|
|
try:
|
|
cleanup_logger.info(f"Running cleanup for records older than {days_to_keep} days...")
|
|
|
|
async with aiosqlite.connect(db_file) as db:
|
|
for table, time_column in tables.items():
|
|
total_deleted = 0 # Initialize total counter
|
|
while True:
|
|
if table == "node":
|
|
query = f"""
|
|
DELETE FROM node
|
|
WHERE {time_column} < datetime('now', '-{days_to_keep} day')
|
|
OR {time_column} IS NULL
|
|
OR {time_column} = ''
|
|
LIMIT {batch_size};
|
|
"""
|
|
else:
|
|
query = f"""
|
|
DELETE FROM {table}
|
|
WHERE {time_column} < datetime('now', '-{days_to_keep} day')
|
|
LIMIT {batch_size};
|
|
"""
|
|
cursor = await db.execute(query)
|
|
await db.commit()
|
|
deleted = cursor.rowcount or 0
|
|
total_deleted += deleted
|
|
if deleted == 0:
|
|
break
|
|
await asyncio.sleep(0) # yield to event loop
|
|
|
|
cleanup_logger.info(f"Deleted a total of {total_deleted} rows from {table}")
|
|
|
|
if vacuum_db:
|
|
async with aiosqlite.connect(db_file) as db:
|
|
await db.execute("VACUUM;")
|
|
|
|
cleanup_logger.info("Cleanup completed successfully.")
|
|
|
|
except Exception as e:
|
|
cleanup_logger.error(f"Error during cleanup: {e}")
|
|
|
|
|
|
# -------------------------
|
|
# MQTT loading
|
|
# -------------------------
|
|
async def load_database_from_mqtt(mqtt_server: str, mqtt_port: int, topics: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None):
|
|
async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd):
|
|
await mqtt_store.process_envelope(topic, env)
|
|
|
|
# -------------------------
|
|
# Main function
|
|
# -------------------------
|
|
async def main():
|
|
# Initialize database
|
|
mqtt_database.init_database(CONFIG["database"]["connection_string"])
|
|
await mqtt_database.create_tables()
|
|
|
|
mqtt_user = CONFIG["mqtt"].get("username") or None
|
|
mqtt_passwd = CONFIG["mqtt"].get("password") or None
|
|
mqtt_topics = json.loads(CONFIG["mqtt"]["topics"])
|
|
|
|
db_file = CONFIG["database"]["connection_string"].replace("sqlite+aiosqlite:///", "")
|
|
cleanup_enabled = get_bool(CONFIG, "cleanup", "enabled", True)
|
|
cleanup_days = get_int(CONFIG, "cleanup", "days_to_keep", 14)
|
|
vacuum_db = get_bool(CONFIG, "cleanup", "vacuum", True)
|
|
cleanup_hour = get_int(CONFIG, "cleanup", "hour", 2)
|
|
cleanup_minute = get_int(CONFIG, "cleanup", "minute", 0)
|
|
batch_size = get_int(CONFIG, "cleanup", "batch_size", 100)
|
|
|
|
async with asyncio.TaskGroup() as tg:
|
|
tg.create_task(
|
|
load_database_from_mqtt(CONFIG["mqtt"]["server"], int(CONFIG["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd)
|
|
)
|
|
|
|
if cleanup_enabled:
|
|
tg.create_task(
|
|
daily_cleanup_at(db_file, cleanup_hour, cleanup_minute, cleanup_days, vacuum_db, batch_size)
|
|
)
|
|
else:
|
|
cleanup_logger.info("Daily cleanup is disabled by configuration.")
|
|
|
|
# -------------------------
|
|
# Entry point
|
|
# -------------------------
|
|
if __name__ == '__main__':
|
|
asyncio.run(main())
|