This commit is contained in:
Louis King
2025-12-06 12:53:29 +00:00
parent 6e3b86a1ad
commit 2ea04deb7e
8 changed files with 59 additions and 35 deletions
+8 -11
View File
@@ -180,24 +180,21 @@ cd meshcore-hub
cp .env.example .env
# Edit .env with your settings (API keys, serial port, network info)
# Option 1: Start core services (mqtt, collector, api, web)
# Create database schema
docker compose --profile migrate run --rm db-migrate
# Seed the database
docker compose --profile seed run --rm seed
# Start core services (mqtt, collector, api, web)
docker compose up -d
# Option 2: Start with mock device for testing
docker compose --profile mock up -d
# Option 3: Start with real MeshCore device
# Start sender/receiver interface
docker compose --profile interface-receiver up -d
# View logs
docker compose logs -f
# Run database migrations (one-time)
docker compose --profile migrate up
# Import seed data manually (also runs on collector startup)
docker compose --profile seed up
# Stop services
docker compose down
```
+34
View File
@@ -174,6 +174,40 @@ def db_history() -> None:
command.history(alembic_cfg)
@db.command("stamp")
@click.option(
"--revision",
type=str,
default="head",
help="Target revision to stamp (default: head)",
)
@click.option(
"--database-url",
type=str,
default=None,
envvar="DATABASE_URL",
help="Database connection URL",
)
def db_stamp(revision: str, database_url: str | None) -> None:
"""Stamp database with revision without running migrations.
Use this to mark an existing database as up-to-date when the schema
was created before Alembic migrations were introduced.
"""
import os
from alembic import command
from alembic.config import Config
click.echo(f"Stamping database with revision: {revision}")
alembic_cfg = Config("alembic.ini")
if database_url:
os.environ["DATABASE_URL"] = database_url
command.stamp(alembic_cfg, revision)
click.echo("Database stamped successfully.")
# Health check commands for Docker HEALTHCHECK
@cli.group()
def health() -> None:
+1 -2
View File
@@ -32,10 +32,9 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Get database URL from app state
database_url = getattr(app.state, "database_url", "sqlite:///./meshcore.db")
# Initialize database
# Initialize database (schema managed by Alembic migrations)
logger.info(f"Initializing database: {database_url}")
_db_manager = DatabaseManager(database_url)
_db_manager.create_tables()
yield
+4 -8
View File
@@ -182,11 +182,10 @@ def _run_collector_service(
click.echo(f"MQTT: {mqtt_host}:{mqtt_port} (prefix: {prefix})")
click.echo(f"Database: {database_url}")
# Initialize database and run seed import on startup
# Initialize database (schema managed by Alembic migrations)
from meshcore_hub.common.database import DatabaseManager
db = DatabaseManager(database_url)
db.create_tables()
# Auto-seed from seed files on startup
click.echo("")
@@ -294,9 +293,8 @@ def seed_cmd(
from meshcore_hub.common.database import DatabaseManager
# Initialize database
# Initialize database (schema managed by Alembic migrations)
db = DatabaseManager(ctx.obj["database_url"])
db.create_tables()
# Run seed import
imported_any = _run_seed_import(
@@ -448,9 +446,8 @@ def import_tags_cmd(
from meshcore_hub.common.database import DatabaseManager
from meshcore_hub.collector.tag_import import import_tags
# Initialize database
# Initialize database (schema managed by Alembic migrations)
db = DatabaseManager(ctx.obj["database_url"])
db.create_tables()
# Import tags
stats = import_tags(
@@ -529,9 +526,8 @@ def import_members_cmd(
from meshcore_hub.common.database import DatabaseManager
from meshcore_hub.collector.member_import import import_members
# Initialize database
# Initialize database (schema managed by Alembic migrations)
db = DatabaseManager(ctx.obj["database_url"])
db.create_tables()
# Import members
stats = import_members(
@@ -41,14 +41,13 @@ def handle_advertisement(
flags = payload.get("flags")
now = datetime.now(timezone.utc)
# Compute event hash for deduplication (5-minute time bucket)
# Compute event hash for deduplication (30-second time bucket)
event_hash = compute_advertisement_hash(
public_key=adv_public_key,
name=name,
adv_type=adv_type,
flags=flags,
received_at=now,
bucket_minutes=5,
)
with db.session_scope() as session:
@@ -50,12 +50,11 @@ def handle_telemetry(
except ValueError:
lpp_bytes = lpp_data.encode()
# Compute event hash for deduplication (5-minute time bucket)
# Compute event hash for deduplication (30-second time bucket)
event_hash = compute_telemetry_hash(
node_public_key=node_public_key,
parsed_data=parsed_data,
received_at=now,
bucket_minutes=5,
)
with db.session_scope() as session:
+6 -4
View File
@@ -206,14 +206,16 @@ class Subscriber:
"""Start the subscriber."""
logger.info("Starting collector subscriber")
# Create database tables if needed
# Verify database connection (schema managed by Alembic migrations)
try:
self.db.create_tables()
# Test connection by getting a session
session = self.db.get_session()
session.close()
self._db_connected = True
logger.info("Database initialized")
logger.info("Database connection verified")
except Exception as e:
self._db_connected = False
logger.error(f"Failed to initialize database: {e}")
logger.error(f"Failed to connect to database: {e}")
raise
# Connect to MQTT broker
+4 -6
View File
@@ -49,7 +49,7 @@ def compute_advertisement_hash(
adv_type: Optional[str] = None,
flags: Optional[int] = None,
received_at: Optional[datetime] = None,
bucket_minutes: int = 5,
bucket_seconds: int = 30,
) -> str:
"""Compute a deterministic hash for an advertisement.
@@ -62,7 +62,7 @@ def compute_advertisement_hash(
adv_type: Node type
flags: Capability flags
received_at: When received (used for time bucketing)
bucket_minutes: Time bucket size in minutes (default 5)
bucket_seconds: Time bucket size in seconds (default 30)
Returns:
32-character hex hash string
@@ -71,7 +71,6 @@ def compute_advertisement_hash(
time_bucket = ""
if received_at:
# Round down to nearest bucket
bucket_seconds = bucket_minutes * 60
epoch = int(received_at.timestamp())
bucket_epoch = (epoch // bucket_seconds) * bucket_seconds
time_bucket = str(bucket_epoch)
@@ -105,7 +104,7 @@ def compute_telemetry_hash(
node_public_key: str,
parsed_data: Optional[dict] = None,
received_at: Optional[datetime] = None,
bucket_minutes: int = 5,
bucket_seconds: int = 30,
) -> str:
"""Compute a deterministic hash for a telemetry record.
@@ -115,7 +114,7 @@ def compute_telemetry_hash(
node_public_key: Reporting node's public key
parsed_data: Decoded sensor readings
received_at: When received (used for time bucketing)
bucket_minutes: Time bucket size in minutes (default 5)
bucket_seconds: Time bucket size in seconds (default 30)
Returns:
32-character hex hash string
@@ -123,7 +122,6 @@ def compute_telemetry_hash(
# Bucket the time
time_bucket = ""
if received_at:
bucket_seconds = bucket_minutes * 60
epoch = int(received_at.timestamp())
bucket_epoch = (epoch // bucket_seconds) * bucket_seconds
time_bucket = str(bucket_epoch)