diff --git a/startdb.py b/startdb.py index 793ae8a..ce81722 100644 --- a/startdb.py +++ b/startdb.py @@ -1,60 +1,51 @@ import asyncio import argparse import configparser -import json from meshview import mqtt_reader from meshview import mqtt_database from meshview import mqtt_store - -async def load_database_from_mqtt(mqtt_server, mqtt_port, topics, mqtt_user=None, mqtt_passwd=None): - async for received_topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd): - await mqtt_store.process_envelope(received_topic, env) +import json -async def run_daily(task_func, hour=0, minute=0): - """Run an async task_func once every day at the specified hour and minute.""" - while True: - from datetime import datetime, timedelta - now_dt = datetime.now() - target = now_dt.replace(hour=hour, minute=minute, second=0, microsecond=0) - if target <= now_dt: - target += timedelta(days=1) - delay = (target - now_dt).total_seconds() - await asyncio.sleep(delay) +async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None): + async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd): + await mqtt_store.process_envelope(topic, env) - try: - await task_func() - except Exception as e: - print(f"Error during daily task: {e}") async def main(config): mqtt_database.init_database(config["database"]["connection_string"]) + await mqtt_database.create_tables() - - mqtt_user = config["mqtt"]["username"] or None - mqtt_passwd = config["mqtt"]["password"] or None + mqtt_user = None + mqtt_passwd = None + if config["mqtt"]["username"] != "": + mqtt_user: str = config["mqtt"]["username"] + if config["mqtt"]["password"] != "": + mqtt_passwd: str = config["mqtt"]["password"] mqtt_topics = json.loads(config["mqtt"]["topics"]) - + + async with asyncio.TaskGroup() as tg: - tg.create_task(load_database_from_mqtt( - config["mqtt"]["server"], - int(config["mqtt"]["port"]), - mqtt_topics, - mqtt_user, - mqtt_passwd - )) - # Schedule cleanup - if config["database"]["cleanup"]: - tg.create_task(run_daily(mqtt_store.cleanup_old_entries, hour=int(config["database"]["cleanup_hour"]), minute=int(config["database"]["cleanup_minutes"]))) + tg.create_task( + load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd) + ) + def load_config(file_path): + """Load configuration from an INI-style text file.""" config_parser = configparser.ConfigParser() config_parser.read(file_path) - return {section: dict(config_parser.items(section)) for section in config_parser.sections()} + + # Convert to a dictionary for easier access + config = {section: dict(config_parser.items(section)) for section in config_parser.sections()} + return config + if __name__ == '__main__': parser = argparse.ArgumentParser("meshview") parser.add_argument("--config", help="Path to the configuration file.", default='config.ini') args = parser.parse_args() + config = load_config(args.config) + asyncio.run(main(config))