diff --git a/startdb.py b/startdb.py
index eedbc39..87cce3c 100644
--- a/startdb.py
+++ b/startdb.py
@@ -2,11 +2,12 @@ import asyncio
import json
import datetime
import logging
-import aiosqlite
+from sqlalchemy import delete
from meshview import mqtt_reader
from meshview import mqtt_database
from meshview import mqtt_store
-from meshview.config import CONFIG # <-- use your existing config.py
+from meshview import models
+from meshview.config import CONFIG
# -------------------------
# Logging for cleanup
@@ -23,7 +24,7 @@ cleanup_logger.addHandler(file_handler)
# Helper functions
# -------------------------
def get_bool(config, section, key, default=False):
- return config.get(section, {}).get(key, str(default)).lower() in ("1", "true", "yes", "on")
+ return str(config.get(section, {}).get(key, default)).lower() in ("1", "true", "yes", "on")
def get_int(config, section, key, default=0):
try:
@@ -32,16 +33,19 @@ def get_int(config, section, key, default=0):
return default
# -------------------------
-# Database cleanup using aiosqlite with batching
+# Shared DB lock
# -------------------------
-async def daily_cleanup_at(db_file: str, hour: int = 2, minute: int = 0, days_to_keep: int = 14, vacuum_db: bool = True, batch_size: int = 100):
- tables = {
- "packet": "import_time",
- "packet_seen": "import_time",
- "traceroute": "import_time",
- "node": "last_update"
- }
+db_lock = asyncio.Lock()
+# -------------------------
+# Database cleanup using ORM
+# -------------------------
+async def daily_cleanup_at(
+ hour: int = 2,
+ minute: int = 0,
+ days_to_keep: int = 14,
+ vacuum_db: bool = False
+):
while True:
now = datetime.datetime.now()
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
@@ -51,53 +55,67 @@ async def daily_cleanup_at(db_file: str, hour: int = 2, minute: int = 0, days_to
cleanup_logger.info(f"Next cleanup scheduled at {next_run}")
await asyncio.sleep(delay)
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=days_to_keep)
+ cleanup_logger.info(f"Running cleanup for records older than {cutoff}...")
+
try:
- cleanup_logger.info(f"Running cleanup for records older than {days_to_keep} days...")
+ async with db_lock: # <--- Pause ingestion during cleanup
+ cleanup_logger.info("Ingestion paused for cleanup.")
- async with aiosqlite.connect(db_file) as db:
- for table, time_column in tables.items():
- total_deleted = 0 # Initialize total counter
- while True:
- if table == "node":
- query = f"""
- DELETE FROM node
- WHERE {time_column} < datetime('now', '-{days_to_keep} day')
- OR {time_column} IS NULL
- OR {time_column} = ''
- LIMIT {batch_size};
- """
- else:
- query = f"""
- DELETE FROM {table}
- WHERE {time_column} < datetime('now', '-{days_to_keep} day')
- LIMIT {batch_size};
- """
- cursor = await db.execute(query)
- await db.commit()
- deleted = cursor.rowcount or 0
- total_deleted += deleted
- if deleted == 0:
- break
- await asyncio.sleep(0)
+ async with mqtt_database.async_session() as session:
+ # Packet
+ result = await session.execute(
+ delete(models.Packet).where(models.Packet.import_time < cutoff)
+ )
+ cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet")
- cleanup_logger.info(f"Deleted a total of {total_deleted} rows from {table}")
+ # PacketSeen
+ result = await session.execute(
+ delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff)
+ )
+ cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen")
- if vacuum_db:
- async with aiosqlite.connect(db_file) as db:
- await db.execute("VACUUM;")
+ # Traceroute
+ result = await session.execute(
+ delete(models.Traceroute).where(models.Traceroute.import_time < cutoff)
+ )
+ cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute")
- cleanup_logger.info("Cleanup completed successfully.")
+ # Node
+ result = await session.execute(
+ delete(models.Node).where(models.Node.last_update < cutoff)
+ )
+ cleanup_logger.info(f"Deleted {result.rowcount} rows from Node")
+
+ await session.commit()
+
+ if vacuum_db:
+ cleanup_logger.info("Running VACUUM...")
+ async with mqtt_database.engine.begin() as conn:
+ await conn.exec_driver_sql("VACUUM;")
+ cleanup_logger.info("VACUUM completed.")
+
+ cleanup_logger.info("Cleanup completed successfully.")
+ cleanup_logger.info("Ingestion resumed after cleanup.")
except Exception as e:
cleanup_logger.error(f"Error during cleanup: {e}")
-
# -------------------------
# MQTT loading
# -------------------------
-async def load_database_from_mqtt(mqtt_server: str, mqtt_port: int, topics: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None):
- async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd):
- await mqtt_store.process_envelope(topic, env)
+async def load_database_from_mqtt(
+ mqtt_server: str,
+ mqtt_port: int,
+ topics: list,
+ mqtt_user: str | None = None,
+ mqtt_passwd: str | None = None
+):
+ async for topic, env in mqtt_reader.get_topic_envelopes(
+ mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd
+ ):
+ async with db_lock: # <--- Block here if cleanup is running
+ await mqtt_store.process_envelope(topic, env)
# -------------------------
# Main function
@@ -111,22 +129,26 @@ async def main():
mqtt_passwd = CONFIG["mqtt"].get("password") or None
mqtt_topics = json.loads(CONFIG["mqtt"]["topics"])
- db_file = CONFIG["database"]["connection_string"].replace("sqlite+aiosqlite:///", "")
cleanup_enabled = get_bool(CONFIG, "cleanup", "enabled", False)
cleanup_days = get_int(CONFIG, "cleanup", "days_to_keep", 14)
vacuum_db = get_bool(CONFIG, "cleanup", "vacuum", False)
cleanup_hour = get_int(CONFIG, "cleanup", "hour", 2)
cleanup_minute = get_int(CONFIG, "cleanup", "minute", 0)
- batch_size = get_int(CONFIG, "cleanup", "batch_size", 100)
async with asyncio.TaskGroup() as tg:
tg.create_task(
- load_database_from_mqtt(CONFIG["mqtt"]["server"], int(CONFIG["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd)
+ load_database_from_mqtt(
+ CONFIG["mqtt"]["server"],
+ int(CONFIG["mqtt"]["port"]),
+ mqtt_topics,
+ mqtt_user,
+ mqtt_passwd,
+ )
)
if cleanup_enabled:
tg.create_task(
- daily_cleanup_at(db_file, cleanup_hour, cleanup_minute, cleanup_days, vacuum_db, batch_size)
+ daily_cleanup_at(cleanup_hour, cleanup_minute, cleanup_days, vacuum_db)
)
else:
cleanup_logger.info("Daily cleanup is disabled by configuration.")