From f9a6f3dff2722efa6d377b291b921ee42055d673 Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Fri, 19 Sep 2025 09:20:43 -0700 Subject: [PATCH] Work on DB cleanup tool --- startdb.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/startdb.py b/startdb.py index c56485a..568b7e2 100644 --- a/startdb.py +++ b/startdb.py @@ -44,7 +44,7 @@ async def daily_cleanup_at( hour: int = 2, minute: int = 0, days_to_keep: int = 14, - vacuum_db: bool = False + vacuum_db: bool = True ): while True: now = datetime.datetime.now() @@ -55,33 +55,42 @@ async def daily_cleanup_at( cleanup_logger.info(f"Next cleanup scheduled at {next_run}") await asyncio.sleep(delay) + # Local-time cutoff as string for SQLite DATETIME comparison cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime("%Y-%m-%d %H:%M:%S") cleanup_logger.info(f"Running cleanup for records older than {cutoff}...") try: - async with db_lock: # <--- Pause ingestion during cleanup + async with db_lock: # Pause ingestion cleanup_logger.info("Ingestion paused for cleanup.") async with mqtt_database.async_session() as session: + # ------------------------- # Packet + # ------------------------- result = await session.execute( delete(models.Packet).where(models.Packet.import_time < cutoff) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet") + # ------------------------- # PacketSeen + # ------------------------- result = await session.execute( delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen") + # ------------------------- # Traceroute + # ------------------------- result = await session.execute( delete(models.Traceroute).where(models.Traceroute.import_time < cutoff) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute") + # ------------------------- # Node + # ------------------------- result = await session.execute( delete(models.Node).where(models.Node.last_update < cutoff) ) @@ -114,7 +123,7 @@ async def load_database_from_mqtt( async for topic, env in mqtt_reader.get_topic_envelopes( mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd ): - async with db_lock: # <--- Block here if cleanup is running + async with db_lock: # Block if cleanup is running await mqtt_store.process_envelope(topic, env) # -------------------------