diff --git a/sample.config.ini b/sample.config.ini index a654aab..d4a96ef 100644 --- a/sample.config.ini +++ b/sample.config.ini @@ -85,3 +85,20 @@ password = large4cats [database] # SQLAlchemy connection string. This one uses SQLite with asyncio support. connection_string = sqlite+aiosqlite:///packets.db + + +# ------------------------- +# Database Cleanup Configuration +# ------------------------- +[cleanup] +# Enable or disable daily cleanup +enabled = True +# Number of days to keep records in the database +days_to_keep = 14 +# Time to run daily cleanup (24-hour format) +hour = 2 +minute = 00 +# Number of rows to delete per batch +batch_size = 100 +# Run VACUUM after cleanup +vacuum = True \ No newline at end of file diff --git a/startdb.py b/startdb.py index ce81722..be19a85 100644 --- a/startdb.py +++ b/startdb.py @@ -1,51 +1,138 @@ import asyncio -import argparse -import configparser +import json +import datetime +import logging +import aiosqlite from meshview import mqtt_reader from meshview import mqtt_database from meshview import mqtt_store -import json +from meshview.config import CONFIG # <-- use your existing config.py + +# ------------------------- +# Logging for cleanup +# ------------------------- +cleanup_logger = logging.getLogger("dbcleanup") +cleanup_logger.setLevel(logging.INFO) +file_handler = logging.FileHandler("dbcleanup.log") +file_handler.setLevel(logging.INFO) +formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') +file_handler.setFormatter(formatter) +cleanup_logger.addHandler(file_handler) + +# ------------------------- +# Helper functions +# ------------------------- +def get_bool(config, section, key, default=False): + return config.get(section, {}).get(key, str(default)).lower() in ("1", "true", "yes", "on") + +def get_int(config, section, key, default=0): + try: + return int(config.get(section, {}).get(key, default)) + except ValueError: + return default + +# ------------------------- +# Database cleanup using aiosqlite with batching +# ------------------------- +async def daily_cleanup_at(db_file: str, hour: int = 2, minute: int = 0, days_to_keep: int = 14, vacuum_db: bool = True, batch_size: int = 100): + tables = { + "packet": "import_time", + "packet_seen": "import_time", + "traceroute": "import_time", + "node": "last_update" + } + + while True: + now = datetime.datetime.now() + next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0) + if next_run <= now: + next_run += datetime.timedelta(days=1) + delay = (next_run - now).total_seconds() + cleanup_logger.info(f"Next cleanup scheduled at {next_run}") + await asyncio.sleep(delay) + + try: + cleanup_logger.info(f"Running cleanup for records older than {days_to_keep} days...") + + async with aiosqlite.connect(db_file) as db: + for table, time_column in tables.items(): + total_deleted = 0 # Initialize total counter + while True: + if table == "node": + query = f""" + DELETE FROM node + WHERE {time_column} < datetime('now', '-{days_to_keep} day') + OR {time_column} IS NULL + OR {time_column} = '' + LIMIT {batch_size}; + """ + else: + query = f""" + DELETE FROM {table} + WHERE {time_column} < datetime('now', '-{days_to_keep} day') + LIMIT {batch_size}; + """ + cursor = await db.execute(query) + await db.commit() + deleted = cursor.rowcount or 0 + total_deleted += deleted + if deleted == 0: + break + await asyncio.sleep(0) # yield to event loop + + cleanup_logger.info(f"Deleted a total of {total_deleted} rows from {table}") + + if vacuum_db: + async with aiosqlite.connect(db_file) as db: + await db.execute("VACUUM;") + + cleanup_logger.info("Cleanup completed successfully.") + + except Exception as e: + cleanup_logger.error(f"Error during cleanup: {e}") -async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None): - async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd): +# ------------------------- +# MQTT loading +# ------------------------- +async def load_database_from_mqtt(mqtt_server: str, mqtt_port: int, topics: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None): + async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd): await mqtt_store.process_envelope(topic, env) - -async def main(config): - mqtt_database.init_database(config["database"]["connection_string"]) - +# ------------------------- +# Main function +# ------------------------- +async def main(): + # Initialize database + mqtt_database.init_database(CONFIG["database"]["connection_string"]) await mqtt_database.create_tables() - mqtt_user = None - mqtt_passwd = None - if config["mqtt"]["username"] != "": - mqtt_user: str = config["mqtt"]["username"] - if config["mqtt"]["password"] != "": - mqtt_passwd: str = config["mqtt"]["password"] - mqtt_topics = json.loads(config["mqtt"]["topics"]) - - + + mqtt_user = CONFIG["mqtt"].get("username") or None + mqtt_passwd = CONFIG["mqtt"].get("password") or None + mqtt_topics = json.loads(CONFIG["mqtt"]["topics"]) + + db_file = CONFIG["database"]["connection_string"].replace("sqlite+aiosqlite:///", "") + cleanup_enabled = get_bool(CONFIG, "cleanup", "enabled", True) + cleanup_days = get_int(CONFIG, "cleanup", "days_to_keep", 14) + vacuum_db = get_bool(CONFIG, "cleanup", "vacuum", True) + cleanup_hour = get_int(CONFIG, "cleanup", "hour", 2) + cleanup_minute = get_int(CONFIG, "cleanup", "minute", 0) + batch_size = get_int(CONFIG, "cleanup", "batch_size", 100) + async with asyncio.TaskGroup() as tg: tg.create_task( - load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd) + load_database_from_mqtt(CONFIG["mqtt"]["server"], int(CONFIG["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd) ) - - -def load_config(file_path): - """Load configuration from an INI-style text file.""" - config_parser = configparser.ConfigParser() - config_parser.read(file_path) - - # Convert to a dictionary for easier access - config = {section: dict(config_parser.items(section)) for section in config_parser.sections()} - return config + if cleanup_enabled: + tg.create_task( + daily_cleanup_at(db_file, cleanup_hour, cleanup_minute, cleanup_days, vacuum_db, batch_size) + ) + else: + cleanup_logger.info("Daily cleanup is disabled by configuration.") +# ------------------------- +# Entry point +# ------------------------- if __name__ == '__main__': - parser = argparse.ArgumentParser("meshview") - parser.add_argument("--config", help="Path to the configuration file.", default='config.ini') - args = parser.parse_args() - - config = load_config(args.config) - - asyncio.run(main(config)) + asyncio.run(main())