From 2ea04deb7e63042c3bddd769cfab7f4260bfcf4d Mon Sep 17 00:00:00 2001 From: Louis King Date: Sat, 6 Dec 2025 12:53:29 +0000 Subject: [PATCH] Updates --- README.md | 19 +++++------ src/meshcore_hub/__main__.py | 34 +++++++++++++++++++ src/meshcore_hub/api/app.py | 3 +- src/meshcore_hub/collector/cli.py | 12 +++---- .../collector/handlers/advertisement.py | 3 +- .../collector/handlers/telemetry.py | 3 +- src/meshcore_hub/collector/subscriber.py | 10 +++--- src/meshcore_hub/common/hash_utils.py | 10 +++--- 8 files changed, 59 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 60cc1bb..f8e4d29 100644 --- a/README.md +++ b/README.md @@ -180,24 +180,21 @@ cd meshcore-hub cp .env.example .env # Edit .env with your settings (API keys, serial port, network info) -# Option 1: Start core services (mqtt, collector, api, web) +# Create database schema +docker compose --profile migrate run --rm db-migrate + +# Seed the database +docker compose --profile seed run --rm seed + +# Start core services (mqtt, collector, api, web) docker compose up -d -# Option 2: Start with mock device for testing -docker compose --profile mock up -d - -# Option 3: Start with real MeshCore device +# Start sender/receiver interface docker compose --profile interface-receiver up -d # View logs docker compose logs -f -# Run database migrations (one-time) -docker compose --profile migrate up - -# Import seed data manually (also runs on collector startup) -docker compose --profile seed up - # Stop services docker compose down ``` diff --git a/src/meshcore_hub/__main__.py b/src/meshcore_hub/__main__.py index 96d8fc9..871615b 100644 --- a/src/meshcore_hub/__main__.py +++ b/src/meshcore_hub/__main__.py @@ -174,6 +174,40 @@ def db_history() -> None: command.history(alembic_cfg) +@db.command("stamp") +@click.option( + "--revision", + type=str, + default="head", + help="Target revision to stamp (default: head)", +) +@click.option( + "--database-url", + type=str, + default=None, + envvar="DATABASE_URL", + help="Database connection URL", +) +def db_stamp(revision: str, database_url: str | None) -> None: + """Stamp database with revision without running migrations. + + Use this to mark an existing database as up-to-date when the schema + was created before Alembic migrations were introduced. + """ + import os + from alembic import command + from alembic.config import Config + + click.echo(f"Stamping database with revision: {revision}") + + alembic_cfg = Config("alembic.ini") + if database_url: + os.environ["DATABASE_URL"] = database_url + + command.stamp(alembic_cfg, revision) + click.echo("Database stamped successfully.") + + # Health check commands for Docker HEALTHCHECK @cli.group() def health() -> None: diff --git a/src/meshcore_hub/api/app.py b/src/meshcore_hub/api/app.py index 4a1efd1..fa9191d 100644 --- a/src/meshcore_hub/api/app.py +++ b/src/meshcore_hub/api/app.py @@ -32,10 +32,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Get database URL from app state database_url = getattr(app.state, "database_url", "sqlite:///./meshcore.db") - # Initialize database + # Initialize database (schema managed by Alembic migrations) logger.info(f"Initializing database: {database_url}") _db_manager = DatabaseManager(database_url) - _db_manager.create_tables() yield diff --git a/src/meshcore_hub/collector/cli.py b/src/meshcore_hub/collector/cli.py index c501de9..655d56f 100644 --- a/src/meshcore_hub/collector/cli.py +++ b/src/meshcore_hub/collector/cli.py @@ -182,11 +182,10 @@ def _run_collector_service( click.echo(f"MQTT: {mqtt_host}:{mqtt_port} (prefix: {prefix})") click.echo(f"Database: {database_url}") - # Initialize database and run seed import on startup + # Initialize database (schema managed by Alembic migrations) from meshcore_hub.common.database import DatabaseManager db = DatabaseManager(database_url) - db.create_tables() # Auto-seed from seed files on startup click.echo("") @@ -294,9 +293,8 @@ def seed_cmd( from meshcore_hub.common.database import DatabaseManager - # Initialize database + # Initialize database (schema managed by Alembic migrations) db = DatabaseManager(ctx.obj["database_url"]) - db.create_tables() # Run seed import imported_any = _run_seed_import( @@ -448,9 +446,8 @@ def import_tags_cmd( from meshcore_hub.common.database import DatabaseManager from meshcore_hub.collector.tag_import import import_tags - # Initialize database + # Initialize database (schema managed by Alembic migrations) db = DatabaseManager(ctx.obj["database_url"]) - db.create_tables() # Import tags stats = import_tags( @@ -529,9 +526,8 @@ def import_members_cmd( from meshcore_hub.common.database import DatabaseManager from meshcore_hub.collector.member_import import import_members - # Initialize database + # Initialize database (schema managed by Alembic migrations) db = DatabaseManager(ctx.obj["database_url"]) - db.create_tables() # Import members stats = import_members( diff --git a/src/meshcore_hub/collector/handlers/advertisement.py b/src/meshcore_hub/collector/handlers/advertisement.py index b9d6fbf..8bb04f9 100644 --- a/src/meshcore_hub/collector/handlers/advertisement.py +++ b/src/meshcore_hub/collector/handlers/advertisement.py @@ -41,14 +41,13 @@ def handle_advertisement( flags = payload.get("flags") now = datetime.now(timezone.utc) - # Compute event hash for deduplication (5-minute time bucket) + # Compute event hash for deduplication (30-second time bucket) event_hash = compute_advertisement_hash( public_key=adv_public_key, name=name, adv_type=adv_type, flags=flags, received_at=now, - bucket_minutes=5, ) with db.session_scope() as session: diff --git a/src/meshcore_hub/collector/handlers/telemetry.py b/src/meshcore_hub/collector/handlers/telemetry.py index 6e0bfe9..61ebbf4 100644 --- a/src/meshcore_hub/collector/handlers/telemetry.py +++ b/src/meshcore_hub/collector/handlers/telemetry.py @@ -50,12 +50,11 @@ def handle_telemetry( except ValueError: lpp_bytes = lpp_data.encode() - # Compute event hash for deduplication (5-minute time bucket) + # Compute event hash for deduplication (30-second time bucket) event_hash = compute_telemetry_hash( node_public_key=node_public_key, parsed_data=parsed_data, received_at=now, - bucket_minutes=5, ) with db.session_scope() as session: diff --git a/src/meshcore_hub/collector/subscriber.py b/src/meshcore_hub/collector/subscriber.py index 57d7ccf..0b4e4e2 100644 --- a/src/meshcore_hub/collector/subscriber.py +++ b/src/meshcore_hub/collector/subscriber.py @@ -206,14 +206,16 @@ class Subscriber: """Start the subscriber.""" logger.info("Starting collector subscriber") - # Create database tables if needed + # Verify database connection (schema managed by Alembic migrations) try: - self.db.create_tables() + # Test connection by getting a session + session = self.db.get_session() + session.close() self._db_connected = True - logger.info("Database initialized") + logger.info("Database connection verified") except Exception as e: self._db_connected = False - logger.error(f"Failed to initialize database: {e}") + logger.error(f"Failed to connect to database: {e}") raise # Connect to MQTT broker diff --git a/src/meshcore_hub/common/hash_utils.py b/src/meshcore_hub/common/hash_utils.py index 5be1825..5e75a9a 100644 --- a/src/meshcore_hub/common/hash_utils.py +++ b/src/meshcore_hub/common/hash_utils.py @@ -49,7 +49,7 @@ def compute_advertisement_hash( adv_type: Optional[str] = None, flags: Optional[int] = None, received_at: Optional[datetime] = None, - bucket_minutes: int = 5, + bucket_seconds: int = 30, ) -> str: """Compute a deterministic hash for an advertisement. @@ -62,7 +62,7 @@ def compute_advertisement_hash( adv_type: Node type flags: Capability flags received_at: When received (used for time bucketing) - bucket_minutes: Time bucket size in minutes (default 5) + bucket_seconds: Time bucket size in seconds (default 30) Returns: 32-character hex hash string @@ -71,7 +71,6 @@ def compute_advertisement_hash( time_bucket = "" if received_at: # Round down to nearest bucket - bucket_seconds = bucket_minutes * 60 epoch = int(received_at.timestamp()) bucket_epoch = (epoch // bucket_seconds) * bucket_seconds time_bucket = str(bucket_epoch) @@ -105,7 +104,7 @@ def compute_telemetry_hash( node_public_key: str, parsed_data: Optional[dict] = None, received_at: Optional[datetime] = None, - bucket_minutes: int = 5, + bucket_seconds: int = 30, ) -> str: """Compute a deterministic hash for a telemetry record. @@ -115,7 +114,7 @@ def compute_telemetry_hash( node_public_key: Reporting node's public key parsed_data: Decoded sensor readings received_at: When received (used for time bucketing) - bucket_minutes: Time bucket size in minutes (default 5) + bucket_seconds: Time bucket size in seconds (default 30) Returns: 32-character hex hash string @@ -123,7 +122,6 @@ def compute_telemetry_hash( # Bucket the time time_bucket = "" if received_at: - bucket_seconds = bucket_minutes * 60 epoch = int(received_at.timestamp()) bucket_epoch = (epoch // bucket_seconds) * bucket_seconds time_bucket = str(bucket_epoch)