diff --git a/.gitignore b/.gitignore index 0939dc1..7260145 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ env/* __pycache__/* meshview/__pycache__/* +alembic/__pycache__/* meshtastic/protobuf/* packets.db meshview-db.pid @@ -9,3 +10,5 @@ meshview-web.pid config.ini screenshots/* python/nanopb +__pycache__/ +*.pyc diff --git a/ALEMBIC_SETUP.md b/ALEMBIC_SETUP.md new file mode 100644 index 0000000..0905673 --- /dev/null +++ b/ALEMBIC_SETUP.md @@ -0,0 +1,361 @@ +# Alembic Database Migration Setup + +This document describes the automatic database migration system implemented for MeshView using Alembic. + +## Overview + +The system provides automatic database schema migrations with coordination between the writer app (startdb.py) and reader app (web.py): + +- **Writer App**: Automatically runs pending migrations on startup +- **Reader App**: Waits for migrations to complete before starting + +## Architecture + +### Key Components + +1. **`meshview/migrations.py`** - Migration management utilities + - `run_migrations()` - Runs pending migrations (writer app) + - `wait_for_migrations()` - Waits for schema to be current (reader app) + - `is_database_up_to_date()` - Checks schema version + - Migration status tracking table + +2. **`alembic/`** - Alembic migration directory + - `env.py` - Configured for async SQLAlchemy support + - `versions/` - Migration scripts directory + - `alembic.ini` - Alembic configuration + +3. **Modified Apps**: + - `startdb.py` - Writer app that runs migrations before MQTT ingestion + - `meshview/web.py` - Reader app that waits for schema updates + +## How It Works - Automatic In-Place Updates + +### ✨ Fully Automatic Operation + +**No manual migration commands needed!** The database schema updates automatically when you: +1. Deploy new code with migration files +2. Restart the applications + +### Writer App (startdb.py) Startup Sequence + +1. Initialize database connection +2. Create migration status tracking table +3. Set "migration in progress" flag +4. **šŸ”„ Automatically run any pending Alembic migrations** (synchronously) + - Detects current schema version + - Compares to latest available migration + - Runs all pending migrations in sequence + - Updates database schema in place +5. Clear "migration in progress" flag +6. Start MQTT ingestion and other tasks + +### Reader App (web.py) Startup Sequence + +1. Initialize database connection +2. **Check database schema version** +3. If not up to date: + - Wait up to 60 seconds (30 retries Ɨ 2 seconds) + - Check every 2 seconds for schema updates + - Automatically proceeds once writer completes migrations +4. Once schema is current, start web server + +### šŸŽÆ Key Point: Zero Manual Steps + +When you deploy new code with migrations: +```bash +# Just start the apps - migrations happen automatically! +./env/bin/python startdb.py # Migrations run here automatically +./env/bin/python main.py # Waits for migrations, then starts +``` + +**The database updates itself!** No need to run `alembic upgrade` manually. + +### Coordination + +The apps coordinate using: +- **Alembic version table** (`alembic_version`) - Tracks current schema version +- **Migration status table** (`migration_status`) - Optional flag for "in progress" state + +## Creating New Migrations + +### Using the helper script: + +```bash +./env/bin/python create_migration.py +``` + +### Manual creation: + +```bash +./env/bin/alembic revision --autogenerate -m "Description of changes" +``` + +This will: +1. Compare current database schema with SQLAlchemy models +2. Generate a migration script in `alembic/versions/` +3. Automatically detect most schema changes + +### Manual migration (advanced): + +```bash +./env/bin/alembic revision -m "Manual migration" +``` + +Then edit the generated file to add custom migration logic. + +## Running Migrations + +### Automatic (Recommended) + +Migrations run automatically when the writer app starts: + +```bash +./env/bin/python startdb.py +``` + +### Manual + +To run migrations manually: + +```bash +./env/bin/alembic upgrade head +``` + +To downgrade: + +```bash +./env/bin/alembic downgrade -1 # Go back one version +./env/bin/alembic downgrade base # Go back to beginning +``` + +## Checking Migration Status + +Check current database version: + +```bash +./env/bin/alembic current +``` + +View migration history: + +```bash +./env/bin/alembic history +``` + +## Benefits + +1. **Zero Manual Intervention**: Migrations run automatically on startup +2. **Safe Coordination**: Reader won't connect to incompatible schema +3. **Version Control**: All schema changes tracked in git +4. **Rollback Capability**: Can downgrade if needed +5. **Auto-generation**: Most migrations created automatically from model changes + +## Migration Workflow + +### Development Process + +1. **Modify SQLAlchemy models** in `meshview/models.py` +2. **Create migration**: + ```bash + ./env/bin/python create_migration.py + ``` +3. **Review generated migration** in `alembic/versions/` +4. **Test migration**: + - Stop all apps + - Start writer app (migrations run automatically) + - Start reader app (waits for schema to be current) +5. **Commit migration** to version control + +### Production Deployment + +1. **Deploy new code** with migration scripts +2. **Start writer app** - Migrations run automatically +3. **Start reader app** - Waits for migrations, then starts +4. **Monitor logs** for migration success + +## Troubleshooting + +### Migration fails + +Check logs in writer app for error details. To manually fix: + +```bash +./env/bin/alembic current # Check current version +./env/bin/alembic history # View available versions +./env/bin/alembic upgrade head # Try manual upgrade +``` + +### Reader app won't start (timeout) + +Check if writer app is running and has completed migrations: + +```bash +./env/bin/alembic current +``` + +### Reset to clean state + +āš ļø **Warning: This will lose all data** + +```bash +rm packets.db # Or your database file +./env/bin/alembic upgrade head # Create fresh schema +``` + +## File Structure + +``` +meshview/ +ā”œā”€ā”€ alembic.ini # Alembic configuration +ā”œā”€ā”€ alembic/ +│ ā”œā”€ā”€ env.py # Async-enabled migration runner +│ ā”œā”€ā”€ script.py.mako # Migration template +│ └── versions/ # Migration scripts +│ └── c88468b7ab0b_initial_migration.py +ā”œā”€ā”€ meshview/ +│ ā”œā”€ā”€ models.py # SQLAlchemy models (source of truth) +│ ā”œā”€ā”€ migrations.py # Migration utilities +│ ā”œā”€ā”€ mqtt_database.py # Writer database connection +│ └── database.py # Reader database connection +ā”œā”€ā”€ startdb.py # Writer app (runs migrations) +ā”œā”€ā”€ main.py # Entry point for reader app +└── create_migration.py # Helper script for creating migrations +``` + +## Configuration + +Database URL is read from `config.ini`: + +```ini +[database] +connection_string = sqlite+aiosqlite:///packets.db +``` + +Alembic automatically uses this configuration through `meshview/migrations.py`. + +## Important Notes + +1. **Always test migrations** in development before deploying to production +2. **Backup database** before running migrations in production +3. **Check for data loss** - Some migrations may require data migration logic +4. **Coordinate deployments** - Start writer before readers in multi-instance setups +5. **Monitor logs** during first startup after deployment + +## Example Migrations + +### Example 1: Generated Initial Migration + +Here's what an auto-generated migration looks like (from comparing models to database): + +```python +"""Initial migration + +Revision ID: c88468b7ab0b +Revises: +Create Date: 2025-01-26 20:56:50.123456 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers +revision = 'c88468b7ab0b' +down_revision = None +branch_labels = None +depends_on = None + +def upgrade() -> None: + # Upgrade operations + op.create_table('node', + sa.Column('id', sa.String(), nullable=False), + sa.Column('node_id', sa.BigInteger(), nullable=True), + # ... more columns + sa.PrimaryKeyConstraint('id') + ) + +def downgrade() -> None: + # Downgrade operations + op.drop_table('node') +``` + +### Example 2: Manual Migration Adding a New Table + +We've included an example migration (`1717fa5c6545_add_example_table.py`) that demonstrates how to manually create a new table: + +```python +"""Add example table + +Revision ID: 1717fa5c6545 +Revises: c88468b7ab0b +Create Date: 2025-10-26 20:59:04.347066 +""" +from typing import Sequence, Union +from alembic import op +import sqlalchemy as sa + +def upgrade() -> None: + """Create example table with sample columns.""" + op.create_table( + 'example', + sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True), + sa.Column('name', sa.String(length=100), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('value', sa.Float(), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=False, server_default='1'), + sa.Column('created_at', sa.DateTime(), nullable=False, + server_default=sa.text('CURRENT_TIMESTAMP')), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + + # Create an index on the name column for faster lookups + op.create_index('idx_example_name', 'example', ['name']) + +def downgrade() -> None: + """Remove example table.""" + op.drop_index('idx_example_name', table_name='example') + op.drop_table('example') +``` + +**Key features demonstrated:** +- Various column types (Integer, String, Text, Float, Boolean, DateTime) +- Primary key with autoincrement +- Nullable and non-nullable columns +- Server defaults (for timestamps and booleans) +- Creating indexes +- Proper downgrade that reverses all changes + +**To test this migration:** + +```bash +# Apply the migration +./env/bin/alembic upgrade head + +# Check it was applied +./env/bin/alembic current + +# Verify table was created +sqlite3 packetsPL.db "SELECT sql FROM sqlite_master WHERE type='table' AND name='example';" + +# Roll back the migration +./env/bin/alembic downgrade -1 + +# Verify table was removed +sqlite3 packetsPL.db "SELECT name FROM sqlite_master WHERE type='table' AND name='example';" +``` + +**To remove this example migration** (after testing): + +```bash +# First make sure you're not on this revision +./env/bin/alembic downgrade c88468b7ab0b + +# Then delete the migration file +rm alembic/versions/1717fa5c6545_add_example_table.py +``` + +## References + +- [Alembic Documentation](https://alembic.sqlalchemy.org/) +- [SQLAlchemy Documentation](https://docs.sqlalchemy.org/) +- [Async SQLAlchemy](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html) \ No newline at end of file diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..895e544 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,120 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# Use forward slashes (/) also on windows to provide an os agnostic path +script_location = alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +# version_path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +version_path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# sqlalchemy.url will be set programmatically from meshview config +# sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..757ec03 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,93 @@ +import asyncio +from logging.config import fileConfig + +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import async_engine_from_config + +from alembic import context + +# Import models metadata for autogenerate support +from meshview.models import Base + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# Add your model's MetaData object here for 'autogenerate' support +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + """Run migrations with the given connection.""" + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + """Run migrations in async mode.""" + # Get configuration section + configuration = config.get_section(config.config_ini_section, {}) + + # If sqlalchemy.url is not set in alembic.ini, try to get it from meshview config + if "sqlalchemy.url" not in configuration: + try: + from meshview.config import CONFIG + + configuration["sqlalchemy.url"] = CONFIG["database"]["connection_string"] + except Exception: + # Fallback to a default for initial migration creation + configuration["sqlalchemy.url"] = "sqlite+aiosqlite:///packets.db" + + connectable = async_engine_from_config( + configuration, + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode with async support.""" + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..fbc4b07 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/1717fa5c6545_add_example_table.py b/alembic/versions/1717fa5c6545_add_example_table.py new file mode 100644 index 0000000..6b900e0 --- /dev/null +++ b/alembic/versions/1717fa5c6545_add_example_table.py @@ -0,0 +1,45 @@ +"""Add example table + +Revision ID: 1717fa5c6545 +Revises: c88468b7ab0b +Create Date: 2025-10-26 20:59:04.347066 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = '1717fa5c6545' +down_revision: str | None = 'c88468b7ab0b' +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Create example table with sample columns.""" + op.create_table( + 'example', + sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True), + sa.Column('name', sa.String(length=100), nullable=False), + sa.Column('description', sa.Text(), nullable=True), + sa.Column('value', sa.Float(), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=False, server_default='1'), + sa.Column( + 'created_at', sa.DateTime(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP') + ), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + ) + + # Create an index on the name column for faster lookups + op.create_index('idx_example_name', 'example', ['name']) + + +def downgrade() -> None: + """Remove example table.""" + op.drop_index('idx_example_name', table_name='example') + op.drop_table('example') diff --git a/alembic/versions/c88468b7ab0b_initial_migration.py b/alembic/versions/c88468b7ab0b_initial_migration.py new file mode 100644 index 0000000..b1a1851 --- /dev/null +++ b/alembic/versions/c88468b7ab0b_initial_migration.py @@ -0,0 +1,35 @@ +"""Initial migration + +Revision ID: c88468b7ab0b +Revises: +Create Date: 2025-10-26 20:56:50.285200 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = 'c88468b7ab0b' +down_revision: str | None = None +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('idx_packet_import_time', table_name='packet') + op.create_index('idx_packet_import_time', 'packet', [sa.text('import_time DESC')], unique=False) + op.create_index('idx_packet_seen_packet_id', 'packet_seen', ['packet_id'], unique=False) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index('idx_packet_seen_packet_id', table_name='packet_seen') + op.drop_index('idx_packet_import_time', table_name='packet') + op.create_index('idx_packet_import_time', 'packet', ['import_time'], unique=False) + # ### end Alembic commands ### diff --git a/create_example_migration.py b/create_example_migration.py new file mode 100755 index 0000000..1eb2aa9 --- /dev/null +++ b/create_example_migration.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +""" +Script to create a blank migration for manual editing. + +Usage: + ./env/bin/python create_example_migration.py + +This creates an empty migration file that you can manually edit to add +custom migration logic (data migrations, complex schema changes, etc.) + +Unlike create_migration.py which auto-generates from model changes, +this creates a blank template for you to fill in. +""" + +import os +import sys + +# Add current directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from alembic.config import Config + +from alembic import command + +# Create Alembic config +alembic_cfg = Config("alembic.ini") + +# Set database URL from meshview config +try: + from meshview.config import CONFIG + + database_url = CONFIG["database"]["connection_string"] + alembic_cfg.set_main_option("sqlalchemy.url", database_url) + print(f"Using database URL from config: {database_url}") +except Exception as e: + print(f"Warning: Could not load meshview config: {e}") + print("Using default database URL") + alembic_cfg.set_main_option("sqlalchemy.url", "sqlite+aiosqlite:///packets.db") + +# Generate blank migration +try: + print("Creating blank migration for manual editing...") + command.revision(alembic_cfg, autogenerate=False, message="Manual migration") + print("āœ“ Successfully created blank migration!") + print("\nNow edit the generated file in alembic/versions/") + print("Add your custom upgrade() and downgrade() logic") +except Exception as e: + print(f"āœ— Error creating migration: {e}") + import traceback + + traceback.print_exc() + sys.exit(1) diff --git a/create_migration.py b/create_migration.py new file mode 100755 index 0000000..a242655 --- /dev/null +++ b/create_migration.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Helper script to create Alembic migrations from SQLAlchemy model changes. + +Usage: + ./env/bin/python create_migration.py + +This will: +1. Load your current models from meshview/models.py +2. Compare them to the current database schema +3. Auto-generate a migration with the detected changes +4. Save the migration to alembic/versions/ + +After running this, review the generated migration file before committing! +""" + +import os +import sys + +# Add current directory to path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from alembic.config import Config + +from alembic import command + +# Create Alembic config +alembic_cfg = Config("alembic.ini") + +# Set database URL from meshview config +try: + from meshview.config import CONFIG + + database_url = CONFIG["database"]["connection_string"] + alembic_cfg.set_main_option("sqlalchemy.url", database_url) + print(f"Using database URL from config: {database_url}") +except Exception as e: + print(f"Warning: Could not load meshview config: {e}") + print("Using default database URL") + alembic_cfg.set_main_option("sqlalchemy.url", "sqlite+aiosqlite:///packets.db") + +# Generate migration +try: + print("\nComparing models to current database schema...") + print("Generating migration...\n") + command.revision(alembic_cfg, autogenerate=True, message="Auto-generated migration") + print("\nāœ“ Successfully created migration!") + print("\nNext steps:") + print("1. Review the generated file in alembic/versions/") + print("2. Edit the migration message/logic if needed") + print("3. Test the migration: ./env/bin/alembic upgrade head") + print("4. Commit the migration file to version control") +except Exception as e: + print(f"\nāœ— Error creating migration: {e}") + import traceback + + traceback.print_exc() + sys.exit(1) diff --git a/meshview/migrations.py b/meshview/migrations.py new file mode 100644 index 0000000..e6e3bc4 --- /dev/null +++ b/meshview/migrations.py @@ -0,0 +1,237 @@ +""" +Database migration management for MeshView. + +This module provides utilities for: +- Running Alembic migrations programmatically +- Checking database schema versions +- Coordinating migrations between writer and reader apps +""" + +import asyncio +import logging +from pathlib import Path + +from alembic.config import Config +from alembic.runtime.migration import MigrationContext +from alembic.script import ScriptDirectory +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncEngine + +from alembic import command + +logger = logging.getLogger(__name__) + + +def get_alembic_config(database_url: str) -> Config: + """ + Get Alembic configuration with the database URL set. + + Args: + database_url: SQLAlchemy database connection string + + Returns: + Configured Alembic Config object + """ + # Get the alembic.ini path (in project root) + alembic_ini = Path(__file__).parent.parent / "alembic.ini" + + config = Config(str(alembic_ini)) + config.set_main_option("sqlalchemy.url", database_url) + + return config + + +async def get_current_revision(engine: AsyncEngine) -> str | None: + """ + Get the current database schema revision. + + Args: + engine: Async SQLAlchemy engine + + Returns: + Current revision string, or None if no migrations applied + """ + async with engine.connect() as connection: + + def _get_revision(conn): + context = MigrationContext.configure(conn) + return context.get_current_revision() + + revision = await connection.run_sync(_get_revision) + return revision + + +async def get_head_revision(database_url: str) -> str | None: + """ + Get the head (latest) revision from migration scripts. + + Args: + database_url: Database connection string + + Returns: + Head revision string, or None if no migrations exist + """ + config = get_alembic_config(database_url) + script_dir = ScriptDirectory.from_config(config) + + head = script_dir.get_current_head() + return head + + +async def is_database_up_to_date(engine: AsyncEngine, database_url: str) -> bool: + """ + Check if database is at the latest schema version. + + Args: + engine: Async SQLAlchemy engine + database_url: Database connection string + + Returns: + True if database is up to date, False otherwise + """ + current = await get_current_revision(engine) + head = await get_head_revision(database_url) + + # If there are no migrations yet, consider it up to date + if head is None: + return True + + return current == head + + +def run_migrations(database_url: str) -> None: + """ + Run all pending migrations to bring database up to date. + + This is a synchronous operation that runs Alembic migrations. + Should be called by the writer app on startup. + + Args: + database_url: Database connection string + """ + logger.info("Running database migrations...") + + config = get_alembic_config(database_url) + + try: + # Run migrations to head + command.upgrade(config, "head") + logger.info("Database migrations completed successfully") + except Exception as e: + logger.error(f"Error running migrations: {e}") + raise + + +async def wait_for_migrations( + engine: AsyncEngine, database_url: str, max_retries: int = 30, retry_delay: int = 2 +) -> bool: + """ + Wait for database migrations to complete. + + This should be called by the reader app to wait until + the database schema is up to date before proceeding. + + Args: + engine: Async SQLAlchemy engine + database_url: Database connection string + max_retries: Maximum number of retry attempts + retry_delay: Seconds to wait between retries + + Returns: + True if database is up to date, False if max retries exceeded + """ + for attempt in range(max_retries): + try: + if await is_database_up_to_date(engine, database_url): + logger.info("Database schema is up to date") + return True + + current = await get_current_revision(engine) + head = await get_head_revision(database_url) + + logger.info( + f"Database schema not up to date (current: {current}, head: {head}). " + f"Waiting... (attempt {attempt + 1}/{max_retries})" + ) + + await asyncio.sleep(retry_delay) + + except Exception as e: + logger.warning( + f"Error checking database version (attempt {attempt + 1}/{max_retries}): {e}" + ) + await asyncio.sleep(retry_delay) + + logger.error(f"Database schema not up to date after {max_retries} attempts") + return False + + +async def create_migration_status_table(engine: AsyncEngine) -> None: + """ + Create a simple status table for migration coordination. + + This table can be used to signal when migrations are in progress. + + Args: + engine: Async SQLAlchemy engine + """ + async with engine.begin() as conn: + await conn.execute( + text(""" + CREATE TABLE IF NOT EXISTS migration_status ( + id INTEGER PRIMARY KEY CHECK (id = 1), + in_progress BOOLEAN NOT NULL DEFAULT 0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + ) + + # Insert initial row if not exists + await conn.execute( + text(""" + INSERT OR IGNORE INTO migration_status (id, in_progress) + VALUES (1, 0) + """) + ) + + +async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None: + """ + Set the migration in-progress flag. + + Args: + engine: Async SQLAlchemy engine + in_progress: True if migration is in progress, False otherwise + """ + async with engine.begin() as conn: + await conn.execute( + text(""" + UPDATE migration_status + SET in_progress = :in_progress, + updated_at = CURRENT_TIMESTAMP + WHERE id = 1 + """), + {"in_progress": in_progress}, + ) + + +async def is_migration_in_progress(engine: AsyncEngine) -> bool: + """ + Check if a migration is currently in progress. + + Args: + engine: Async SQLAlchemy engine + + Returns: + True if migration is in progress, False otherwise + """ + try: + async with engine.connect() as conn: + result = await conn.execute( + text("SELECT in_progress FROM migration_status WHERE id = 1") + ) + row = result.fetchone() + return bool(row[0]) if row else False + except Exception: + # If table doesn't exist or query fails, assume no migration in progress + return False diff --git a/meshview/web.py b/meshview/web.py index ea145f5..bc17051 100644 --- a/meshview/web.py +++ b/meshview/web.py @@ -20,7 +20,7 @@ from markupsafe import Markup from pandas import DataFrame from meshtastic.protobuf.portnums_pb2 import PortNum -from meshview import config, database, decode_payload, models, store +from meshview import config, database, decode_payload, migrations, models, store logging.basicConfig( level=logging.INFO, @@ -1768,6 +1768,21 @@ async def serve_page(request): async def run_server(): + # Wait for database migrations to complete before starting web server + logger.info("Checking database schema status...") + database_url = CONFIG["database"]["connection_string"] + + # Wait for migrations to complete (writer app responsibility) + migration_ready = await migrations.wait_for_migrations( + database.engine, database_url, max_retries=30, retry_delay=2 + ) + + if not migration_ready: + logger.error("Database schema is not up to date. Cannot start web server.") + raise RuntimeError("Database schema version mismatch - migrations not complete") + + logger.info("Database schema verified - starting web server") + app = web.Application() app.add_routes(routes) diff --git a/requirements.txt b/requirements.txt index 4ce4803..0654f6a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ aiosqlite~=0.21.0 # Database + ORM sqlalchemy[asyncio]~=2.0.38 +alembic~=1.14.0 # Serialization / security protobuf~=5.29.3 @@ -41,4 +42,4 @@ pillow~=11.1.0 # Debugging / profiling psutil~=7.0.0 -objgraph~=3.6.2 +objgraph~=3.6.2 \ No newline at end of file diff --git a/startdb.py b/startdb.py index 06ecd25..01e3e63 100644 --- a/startdb.py +++ b/startdb.py @@ -5,7 +5,7 @@ import logging from sqlalchemy import delete -from meshview import models, mqtt_database, mqtt_reader, mqtt_store +from meshview import migrations, models, mqtt_database, mqtt_reader, mqtt_store from meshview.config import CONFIG # ------------------------- @@ -134,9 +134,32 @@ async def load_database_from_mqtt( # Main function # ------------------------- async def main(): + logger = logging.getLogger(__name__) + # Initialize database - mqtt_database.init_database(CONFIG["database"]["connection_string"]) - await mqtt_database.create_tables() + database_url = CONFIG["database"]["connection_string"] + mqtt_database.init_database(database_url) + + # Create migration status table + await migrations.create_migration_status_table(mqtt_database.engine) + + # Set migration in progress flag + await migrations.set_migration_in_progress(mqtt_database.engine, True) + logger.info("Migration status set to 'in progress'") + + try: + # Run any pending migrations (synchronous operation) + logger.info("Checking for pending database migrations...") + migrations.run_migrations(database_url) + logger.info("Database migrations check complete") + + # Create tables if needed (for backwards compatibility) + await mqtt_database.create_tables() + + finally: + # Clear migration in progress flag + await migrations.set_migration_in_progress(mqtt_database.engine, False) + logger.info("Migration status cleared - database ready") mqtt_user = CONFIG["mqtt"].get("username") or None mqtt_passwd = CONFIG["mqtt"].get("password") or None @@ -148,6 +171,7 @@ async def main(): cleanup_hour = get_int(CONFIG, "cleanup", "hour", 2) cleanup_minute = get_int(CONFIG, "cleanup", "minute", 0) + logger.info("Starting MQTT ingestion and cleanup tasks...") async with asyncio.TaskGroup() as tg: tg.create_task( load_database_from_mqtt(