Added Database cleanup feature to startdb.py

This commit is contained in:
Pablo Revilla
2025-09-17 22:12:04 -07:00
parent d5272c1aec
commit a69d1a5729
2 changed files with 140 additions and 36 deletions

View File

@@ -85,3 +85,20 @@ password = large4cats
[database]
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
connection_string = sqlite+aiosqlite:///packets.db
# -------------------------
# Database Cleanup Configuration
# -------------------------
[cleanup]
# Enable or disable daily cleanup
enabled = True
# Number of days to keep records in the database
days_to_keep = 14
# Time to run daily cleanup (24-hour format)
hour = 2
minute = 00
# Number of rows to delete per batch
batch_size = 100
# Run VACUUM after cleanup
vacuum = True

View File

@@ -1,51 +1,138 @@
import asyncio
import argparse
import configparser
import json
import datetime
import logging
import aiosqlite
from meshview import mqtt_reader
from meshview import mqtt_database
from meshview import mqtt_store
import json
from meshview.config import CONFIG # <-- use your existing config.py
# -------------------------
# Logging for cleanup
# -------------------------
cleanup_logger = logging.getLogger("dbcleanup")
cleanup_logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("dbcleanup.log")
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')
file_handler.setFormatter(formatter)
cleanup_logger.addHandler(file_handler)
# -------------------------
# Helper functions
# -------------------------
def get_bool(config, section, key, default=False):
return config.get(section, {}).get(key, str(default)).lower() in ("1", "true", "yes", "on")
def get_int(config, section, key, default=0):
try:
return int(config.get(section, {}).get(key, default))
except ValueError:
return default
# -------------------------
# Database cleanup using aiosqlite with batching
# -------------------------
async def daily_cleanup_at(db_file: str, hour: int = 2, minute: int = 0, days_to_keep: int = 14, vacuum_db: bool = True, batch_size: int = 100):
tables = {
"packet": "import_time",
"packet_seen": "import_time",
"traceroute": "import_time",
"node": "last_update"
}
while True:
now = datetime.datetime.now()
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
next_run += datetime.timedelta(days=1)
delay = (next_run - now).total_seconds()
cleanup_logger.info(f"Next cleanup scheduled at {next_run}")
await asyncio.sleep(delay)
try:
cleanup_logger.info(f"Running cleanup for records older than {days_to_keep} days...")
async with aiosqlite.connect(db_file) as db:
for table, time_column in tables.items():
total_deleted = 0 # Initialize total counter
while True:
if table == "node":
query = f"""
DELETE FROM node
WHERE {time_column} < datetime('now', '-{days_to_keep} day')
OR {time_column} IS NULL
OR {time_column} = ''
LIMIT {batch_size};
"""
else:
query = f"""
DELETE FROM {table}
WHERE {time_column} < datetime('now', '-{days_to_keep} day')
LIMIT {batch_size};
"""
cursor = await db.execute(query)
await db.commit()
deleted = cursor.rowcount or 0
total_deleted += deleted
if deleted == 0:
break
await asyncio.sleep(0) # yield to event loop
cleanup_logger.info(f"Deleted a total of {total_deleted} rows from {table}")
if vacuum_db:
async with aiosqlite.connect(db_file) as db:
await db.execute("VACUUM;")
cleanup_logger.info("Cleanup completed successfully.")
except Exception as e:
cleanup_logger.error(f"Error during cleanup: {e}")
async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None):
async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd):
# -------------------------
# MQTT loading
# -------------------------
async def load_database_from_mqtt(mqtt_server: str, mqtt_port: int, topics: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None):
async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd):
await mqtt_store.process_envelope(topic, env)
async def main(config):
mqtt_database.init_database(config["database"]["connection_string"])
# -------------------------
# Main function
# -------------------------
async def main():
# Initialize database
mqtt_database.init_database(CONFIG["database"]["connection_string"])
await mqtt_database.create_tables()
mqtt_user = None
mqtt_passwd = None
if config["mqtt"]["username"] != "":
mqtt_user: str = config["mqtt"]["username"]
if config["mqtt"]["password"] != "":
mqtt_passwd: str = config["mqtt"]["password"]
mqtt_topics = json.loads(config["mqtt"]["topics"])
mqtt_user = CONFIG["mqtt"].get("username") or None
mqtt_passwd = CONFIG["mqtt"].get("password") or None
mqtt_topics = json.loads(CONFIG["mqtt"]["topics"])
db_file = CONFIG["database"]["connection_string"].replace("sqlite+aiosqlite:///", "")
cleanup_enabled = get_bool(CONFIG, "cleanup", "enabled", True)
cleanup_days = get_int(CONFIG, "cleanup", "days_to_keep", 14)
vacuum_db = get_bool(CONFIG, "cleanup", "vacuum", True)
cleanup_hour = get_int(CONFIG, "cleanup", "hour", 2)
cleanup_minute = get_int(CONFIG, "cleanup", "minute", 0)
batch_size = get_int(CONFIG, "cleanup", "batch_size", 100)
async with asyncio.TaskGroup() as tg:
tg.create_task(
load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd)
load_database_from_mqtt(CONFIG["mqtt"]["server"], int(CONFIG["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd)
)
def load_config(file_path):
"""Load configuration from an INI-style text file."""
config_parser = configparser.ConfigParser()
config_parser.read(file_path)
# Convert to a dictionary for easier access
config = {section: dict(config_parser.items(section)) for section in config_parser.sections()}
return config
if cleanup_enabled:
tg.create_task(
daily_cleanup_at(db_file, cleanup_hour, cleanup_minute, cleanup_days, vacuum_db, batch_size)
)
else:
cleanup_logger.info("Daily cleanup is disabled by configuration.")
# -------------------------
# Entry point
# -------------------------
if __name__ == '__main__':
parser = argparse.ArgumentParser("meshview")
parser.add_argument("--config", help="Path to the configuration file.", default='config.ini')
args = parser.parse_args()
config = load_config(args.config)
asyncio.run(main(config))
asyncio.run(main())