Work on DB cleanup tool

This commit is contained in:
Pablo Revilla
2025-09-19 09:20:43 -07:00
parent 0da2ef841c
commit f9a6f3dff2

View File

@@ -44,7 +44,7 @@ async def daily_cleanup_at(
hour: int = 2,
minute: int = 0,
days_to_keep: int = 14,
vacuum_db: bool = False
vacuum_db: bool = True
):
while True:
now = datetime.datetime.now()
@@ -55,33 +55,42 @@ async def daily_cleanup_at(
cleanup_logger.info(f"Next cleanup scheduled at {next_run}")
await asyncio.sleep(delay)
# Local-time cutoff as string for SQLite DATETIME comparison
cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime("%Y-%m-%d %H:%M:%S")
cleanup_logger.info(f"Running cleanup for records older than {cutoff}...")
try:
async with db_lock: # <--- Pause ingestion during cleanup
async with db_lock: # Pause ingestion
cleanup_logger.info("Ingestion paused for cleanup.")
async with mqtt_database.async_session() as session:
# -------------------------
# Packet
# -------------------------
result = await session.execute(
delete(models.Packet).where(models.Packet.import_time < cutoff)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet")
# -------------------------
# PacketSeen
# -------------------------
result = await session.execute(
delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen")
# -------------------------
# Traceroute
# -------------------------
result = await session.execute(
delete(models.Traceroute).where(models.Traceroute.import_time < cutoff)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute")
# -------------------------
# Node
# -------------------------
result = await session.execute(
delete(models.Node).where(models.Node.last_update < cutoff)
)
@@ -114,7 +123,7 @@ async def load_database_from_mqtt(
async for topic, env in mqtt_reader.get_topic_envelopes(
mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd
):
async with db_lock: # <--- Block here if cleanup is running
async with db_lock: # Block if cleanup is running
await mqtt_store.process_envelope(topic, env)
# -------------------------