Add alembic DB schema management (#86)

* Use alembic
* add creation helper
* example migration tool
This commit is contained in:
Joel Krauska
2025-11-03 12:53:34 -08:00
committed by GitHub
parent 60ae77772d
commit 8ec44ad552
14 changed files with 1076 additions and 5 deletions

3
.gitignore vendored
View File

@@ -1,6 +1,7 @@
env/*
__pycache__/*
meshview/__pycache__/*
alembic/__pycache__/*
meshtastic/protobuf/*
packets.db
meshview-db.pid
@@ -9,3 +10,5 @@ meshview-web.pid
config.ini
screenshots/*
python/nanopb
__pycache__/
*.pyc

361
ALEMBIC_SETUP.md Normal file
View File

@@ -0,0 +1,361 @@
# Alembic Database Migration Setup
This document describes the automatic database migration system implemented for MeshView using Alembic.
## Overview
The system provides automatic database schema migrations with coordination between the writer app (startdb.py) and reader app (web.py):
- **Writer App**: Automatically runs pending migrations on startup
- **Reader App**: Waits for migrations to complete before starting
## Architecture
### Key Components
1. **`meshview/migrations.py`** - Migration management utilities
- `run_migrations()` - Runs pending migrations (writer app)
- `wait_for_migrations()` - Waits for schema to be current (reader app)
- `is_database_up_to_date()` - Checks schema version
- Migration status tracking table
2. **`alembic/`** - Alembic migration directory
- `env.py` - Configured for async SQLAlchemy support
- `versions/` - Migration scripts directory
- `alembic.ini` - Alembic configuration
3. **Modified Apps**:
- `startdb.py` - Writer app that runs migrations before MQTT ingestion
- `meshview/web.py` - Reader app that waits for schema updates
## How It Works - Automatic In-Place Updates
### ✨ Fully Automatic Operation
**No manual migration commands needed!** The database schema updates automatically when you:
1. Deploy new code with migration files
2. Restart the applications
### Writer App (startdb.py) Startup Sequence
1. Initialize database connection
2. Create migration status tracking table
3. Set "migration in progress" flag
4. **🔄 Automatically run any pending Alembic migrations** (synchronously)
- Detects current schema version
- Compares to latest available migration
- Runs all pending migrations in sequence
- Updates database schema in place
5. Clear "migration in progress" flag
6. Start MQTT ingestion and other tasks
### Reader App (web.py) Startup Sequence
1. Initialize database connection
2. **Check database schema version**
3. If not up to date:
- Wait up to 60 seconds (30 retries × 2 seconds)
- Check every 2 seconds for schema updates
- Automatically proceeds once writer completes migrations
4. Once schema is current, start web server
### 🎯 Key Point: Zero Manual Steps
When you deploy new code with migrations:
```bash
# Just start the apps - migrations happen automatically!
./env/bin/python startdb.py # Migrations run here automatically
./env/bin/python main.py # Waits for migrations, then starts
```
**The database updates itself!** No need to run `alembic upgrade` manually.
### Coordination
The apps coordinate using:
- **Alembic version table** (`alembic_version`) - Tracks current schema version
- **Migration status table** (`migration_status`) - Optional flag for "in progress" state
## Creating New Migrations
### Using the helper script:
```bash
./env/bin/python create_migration.py
```
### Manual creation:
```bash
./env/bin/alembic revision --autogenerate -m "Description of changes"
```
This will:
1. Compare current database schema with SQLAlchemy models
2. Generate a migration script in `alembic/versions/`
3. Automatically detect most schema changes
### Manual migration (advanced):
```bash
./env/bin/alembic revision -m "Manual migration"
```
Then edit the generated file to add custom migration logic.
## Running Migrations
### Automatic (Recommended)
Migrations run automatically when the writer app starts:
```bash
./env/bin/python startdb.py
```
### Manual
To run migrations manually:
```bash
./env/bin/alembic upgrade head
```
To downgrade:
```bash
./env/bin/alembic downgrade -1 # Go back one version
./env/bin/alembic downgrade base # Go back to beginning
```
## Checking Migration Status
Check current database version:
```bash
./env/bin/alembic current
```
View migration history:
```bash
./env/bin/alembic history
```
## Benefits
1. **Zero Manual Intervention**: Migrations run automatically on startup
2. **Safe Coordination**: Reader won't connect to incompatible schema
3. **Version Control**: All schema changes tracked in git
4. **Rollback Capability**: Can downgrade if needed
5. **Auto-generation**: Most migrations created automatically from model changes
## Migration Workflow
### Development Process
1. **Modify SQLAlchemy models** in `meshview/models.py`
2. **Create migration**:
```bash
./env/bin/python create_migration.py
```
3. **Review generated migration** in `alembic/versions/`
4. **Test migration**:
- Stop all apps
- Start writer app (migrations run automatically)
- Start reader app (waits for schema to be current)
5. **Commit migration** to version control
### Production Deployment
1. **Deploy new code** with migration scripts
2. **Start writer app** - Migrations run automatically
3. **Start reader app** - Waits for migrations, then starts
4. **Monitor logs** for migration success
## Troubleshooting
### Migration fails
Check logs in writer app for error details. To manually fix:
```bash
./env/bin/alembic current # Check current version
./env/bin/alembic history # View available versions
./env/bin/alembic upgrade head # Try manual upgrade
```
### Reader app won't start (timeout)
Check if writer app is running and has completed migrations:
```bash
./env/bin/alembic current
```
### Reset to clean state
⚠️ **Warning: This will lose all data**
```bash
rm packets.db # Or your database file
./env/bin/alembic upgrade head # Create fresh schema
```
## File Structure
```
meshview/
├── alembic.ini # Alembic configuration
├── alembic/
│ ├── env.py # Async-enabled migration runner
│ ├── script.py.mako # Migration template
│ └── versions/ # Migration scripts
│ └── c88468b7ab0b_initial_migration.py
├── meshview/
│ ├── models.py # SQLAlchemy models (source of truth)
│ ├── migrations.py # Migration utilities
│ ├── mqtt_database.py # Writer database connection
│ └── database.py # Reader database connection
├── startdb.py # Writer app (runs migrations)
├── main.py # Entry point for reader app
└── create_migration.py # Helper script for creating migrations
```
## Configuration
Database URL is read from `config.ini`:
```ini
[database]
connection_string = sqlite+aiosqlite:///packets.db
```
Alembic automatically uses this configuration through `meshview/migrations.py`.
## Important Notes
1. **Always test migrations** in development before deploying to production
2. **Backup database** before running migrations in production
3. **Check for data loss** - Some migrations may require data migration logic
4. **Coordinate deployments** - Start writer before readers in multi-instance setups
5. **Monitor logs** during first startup after deployment
## Example Migrations
### Example 1: Generated Initial Migration
Here's what an auto-generated migration looks like (from comparing models to database):
```python
"""Initial migration
Revision ID: c88468b7ab0b
Revises:
Create Date: 2025-01-26 20:56:50.123456
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'c88468b7ab0b'
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# Upgrade operations
op.create_table('node',
sa.Column('id', sa.String(), nullable=False),
sa.Column('node_id', sa.BigInteger(), nullable=True),
# ... more columns
sa.PrimaryKeyConstraint('id')
)
def downgrade() -> None:
# Downgrade operations
op.drop_table('node')
```
### Example 2: Manual Migration Adding a New Table
We've included an example migration (`1717fa5c6545_add_example_table.py`) that demonstrates how to manually create a new table:
```python
"""Add example table
Revision ID: 1717fa5c6545
Revises: c88468b7ab0b
Create Date: 2025-10-26 20:59:04.347066
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
def upgrade() -> None:
"""Create example table with sample columns."""
op.create_table(
'example',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('value', sa.Float(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='1'),
sa.Column('created_at', sa.DateTime(), nullable=False,
server_default=sa.text('CURRENT_TIMESTAMP')),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# Create an index on the name column for faster lookups
op.create_index('idx_example_name', 'example', ['name'])
def downgrade() -> None:
"""Remove example table."""
op.drop_index('idx_example_name', table_name='example')
op.drop_table('example')
```
**Key features demonstrated:**
- Various column types (Integer, String, Text, Float, Boolean, DateTime)
- Primary key with autoincrement
- Nullable and non-nullable columns
- Server defaults (for timestamps and booleans)
- Creating indexes
- Proper downgrade that reverses all changes
**To test this migration:**
```bash
# Apply the migration
./env/bin/alembic upgrade head
# Check it was applied
./env/bin/alembic current
# Verify table was created
sqlite3 packetsPL.db "SELECT sql FROM sqlite_master WHERE type='table' AND name='example';"
# Roll back the migration
./env/bin/alembic downgrade -1
# Verify table was removed
sqlite3 packetsPL.db "SELECT name FROM sqlite_master WHERE type='table' AND name='example';"
```
**To remove this example migration** (after testing):
```bash
# First make sure you're not on this revision
./env/bin/alembic downgrade c88468b7ab0b
# Then delete the migration file
rm alembic/versions/1717fa5c6545_add_example_table.py
```
## References
- [Alembic Documentation](https://alembic.sqlalchemy.org/)
- [SQLAlchemy Documentation](https://docs.sqlalchemy.org/)
- [Async SQLAlchemy](https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html)

120
alembic.ini Normal file
View File

@@ -0,0 +1,120 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# sqlalchemy.url will be set programmatically from meshview config
# sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@@ -0,0 +1 @@
Generic single-database configuration.

93
alembic/env.py Normal file
View File

@@ -0,0 +1,93 @@
import asyncio
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
# Import models metadata for autogenerate support
from meshview.models import Base
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Add your model's MetaData object here for 'autogenerate' support
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with the given connection."""
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in async mode."""
# Get configuration section
configuration = config.get_section(config.config_ini_section, {})
# If sqlalchemy.url is not set in alembic.ini, try to get it from meshview config
if "sqlalchemy.url" not in configuration:
try:
from meshview.config import CONFIG
configuration["sqlalchemy.url"] = CONFIG["database"]["connection_string"]
except Exception:
# Fallback to a default for initial migration creation
configuration["sqlalchemy.url"] = "sqlite+aiosqlite:///packets.db"
connectable = async_engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode with async support."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

26
alembic/script.py.mako Normal file
View File

@@ -0,0 +1,26 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,45 @@
"""Add example table
Revision ID: 1717fa5c6545
Revises: c88468b7ab0b
Create Date: 2025-10-26 20:59:04.347066
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '1717fa5c6545'
down_revision: str | None = 'c88468b7ab0b'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Create example table with sample columns."""
op.create_table(
'example',
sa.Column('id', sa.Integer(), nullable=False, primary_key=True, autoincrement=True),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('value', sa.Float(), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='1'),
sa.Column(
'created_at', sa.DateTime(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP')
),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
)
# Create an index on the name column for faster lookups
op.create_index('idx_example_name', 'example', ['name'])
def downgrade() -> None:
"""Remove example table."""
op.drop_index('idx_example_name', table_name='example')
op.drop_table('example')

View File

@@ -0,0 +1,35 @@
"""Initial migration
Revision ID: c88468b7ab0b
Revises:
Create Date: 2025-10-26 20:56:50.285200
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = 'c88468b7ab0b'
down_revision: str | None = None
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('idx_packet_import_time', table_name='packet')
op.create_index('idx_packet_import_time', 'packet', [sa.text('import_time DESC')], unique=False)
op.create_index('idx_packet_seen_packet_id', 'packet_seen', ['packet_id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('idx_packet_seen_packet_id', table_name='packet_seen')
op.drop_index('idx_packet_import_time', table_name='packet')
op.create_index('idx_packet_import_time', 'packet', ['import_time'], unique=False)
# ### end Alembic commands ###

52
create_example_migration.py Executable file
View File

@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""
Script to create a blank migration for manual editing.
Usage:
./env/bin/python create_example_migration.py
This creates an empty migration file that you can manually edit to add
custom migration logic (data migrations, complex schema changes, etc.)
Unlike create_migration.py which auto-generates from model changes,
this creates a blank template for you to fill in.
"""
import os
import sys
# Add current directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from alembic.config import Config
from alembic import command
# Create Alembic config
alembic_cfg = Config("alembic.ini")
# Set database URL from meshview config
try:
from meshview.config import CONFIG
database_url = CONFIG["database"]["connection_string"]
alembic_cfg.set_main_option("sqlalchemy.url", database_url)
print(f"Using database URL from config: {database_url}")
except Exception as e:
print(f"Warning: Could not load meshview config: {e}")
print("Using default database URL")
alembic_cfg.set_main_option("sqlalchemy.url", "sqlite+aiosqlite:///packets.db")
# Generate blank migration
try:
print("Creating blank migration for manual editing...")
command.revision(alembic_cfg, autogenerate=False, message="Manual migration")
print("✓ Successfully created blank migration!")
print("\nNow edit the generated file in alembic/versions/")
print("Add your custom upgrade() and downgrade() logic")
except Exception as e:
print(f"✗ Error creating migration: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

58
create_migration.py Executable file
View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Helper script to create Alembic migrations from SQLAlchemy model changes.
Usage:
./env/bin/python create_migration.py
This will:
1. Load your current models from meshview/models.py
2. Compare them to the current database schema
3. Auto-generate a migration with the detected changes
4. Save the migration to alembic/versions/
After running this, review the generated migration file before committing!
"""
import os
import sys
# Add current directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from alembic.config import Config
from alembic import command
# Create Alembic config
alembic_cfg = Config("alembic.ini")
# Set database URL from meshview config
try:
from meshview.config import CONFIG
database_url = CONFIG["database"]["connection_string"]
alembic_cfg.set_main_option("sqlalchemy.url", database_url)
print(f"Using database URL from config: {database_url}")
except Exception as e:
print(f"Warning: Could not load meshview config: {e}")
print("Using default database URL")
alembic_cfg.set_main_option("sqlalchemy.url", "sqlite+aiosqlite:///packets.db")
# Generate migration
try:
print("\nComparing models to current database schema...")
print("Generating migration...\n")
command.revision(alembic_cfg, autogenerate=True, message="Auto-generated migration")
print("\n✓ Successfully created migration!")
print("\nNext steps:")
print("1. Review the generated file in alembic/versions/")
print("2. Edit the migration message/logic if needed")
print("3. Test the migration: ./env/bin/alembic upgrade head")
print("4. Commit the migration file to version control")
except Exception as e:
print(f"\n✗ Error creating migration: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

237
meshview/migrations.py Normal file
View File

@@ -0,0 +1,237 @@
"""
Database migration management for MeshView.
This module provides utilities for:
- Running Alembic migrations programmatically
- Checking database schema versions
- Coordinating migrations between writer and reader apps
"""
import asyncio
import logging
from pathlib import Path
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
from alembic import command
logger = logging.getLogger(__name__)
def get_alembic_config(database_url: str) -> Config:
"""
Get Alembic configuration with the database URL set.
Args:
database_url: SQLAlchemy database connection string
Returns:
Configured Alembic Config object
"""
# Get the alembic.ini path (in project root)
alembic_ini = Path(__file__).parent.parent / "alembic.ini"
config = Config(str(alembic_ini))
config.set_main_option("sqlalchemy.url", database_url)
return config
async def get_current_revision(engine: AsyncEngine) -> str | None:
"""
Get the current database schema revision.
Args:
engine: Async SQLAlchemy engine
Returns:
Current revision string, or None if no migrations applied
"""
async with engine.connect() as connection:
def _get_revision(conn):
context = MigrationContext.configure(conn)
return context.get_current_revision()
revision = await connection.run_sync(_get_revision)
return revision
async def get_head_revision(database_url: str) -> str | None:
"""
Get the head (latest) revision from migration scripts.
Args:
database_url: Database connection string
Returns:
Head revision string, or None if no migrations exist
"""
config = get_alembic_config(database_url)
script_dir = ScriptDirectory.from_config(config)
head = script_dir.get_current_head()
return head
async def is_database_up_to_date(engine: AsyncEngine, database_url: str) -> bool:
"""
Check if database is at the latest schema version.
Args:
engine: Async SQLAlchemy engine
database_url: Database connection string
Returns:
True if database is up to date, False otherwise
"""
current = await get_current_revision(engine)
head = await get_head_revision(database_url)
# If there are no migrations yet, consider it up to date
if head is None:
return True
return current == head
def run_migrations(database_url: str) -> None:
"""
Run all pending migrations to bring database up to date.
This is a synchronous operation that runs Alembic migrations.
Should be called by the writer app on startup.
Args:
database_url: Database connection string
"""
logger.info("Running database migrations...")
config = get_alembic_config(database_url)
try:
# Run migrations to head
command.upgrade(config, "head")
logger.info("Database migrations completed successfully")
except Exception as e:
logger.error(f"Error running migrations: {e}")
raise
async def wait_for_migrations(
engine: AsyncEngine, database_url: str, max_retries: int = 30, retry_delay: int = 2
) -> bool:
"""
Wait for database migrations to complete.
This should be called by the reader app to wait until
the database schema is up to date before proceeding.
Args:
engine: Async SQLAlchemy engine
database_url: Database connection string
max_retries: Maximum number of retry attempts
retry_delay: Seconds to wait between retries
Returns:
True if database is up to date, False if max retries exceeded
"""
for attempt in range(max_retries):
try:
if await is_database_up_to_date(engine, database_url):
logger.info("Database schema is up to date")
return True
current = await get_current_revision(engine)
head = await get_head_revision(database_url)
logger.info(
f"Database schema not up to date (current: {current}, head: {head}). "
f"Waiting... (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(retry_delay)
except Exception as e:
logger.warning(
f"Error checking database version (attempt {attempt + 1}/{max_retries}): {e}"
)
await asyncio.sleep(retry_delay)
logger.error(f"Database schema not up to date after {max_retries} attempts")
return False
async def create_migration_status_table(engine: AsyncEngine) -> None:
"""
Create a simple status table for migration coordination.
This table can be used to signal when migrations are in progress.
Args:
engine: Async SQLAlchemy engine
"""
async with engine.begin() as conn:
await conn.execute(
text("""
CREATE TABLE IF NOT EXISTS migration_status (
id INTEGER PRIMARY KEY CHECK (id = 1),
in_progress BOOLEAN NOT NULL DEFAULT 0,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
)
# Insert initial row if not exists
await conn.execute(
text("""
INSERT OR IGNORE INTO migration_status (id, in_progress)
VALUES (1, 0)
""")
)
async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None:
"""
Set the migration in-progress flag.
Args:
engine: Async SQLAlchemy engine
in_progress: True if migration is in progress, False otherwise
"""
async with engine.begin() as conn:
await conn.execute(
text("""
UPDATE migration_status
SET in_progress = :in_progress,
updated_at = CURRENT_TIMESTAMP
WHERE id = 1
"""),
{"in_progress": in_progress},
)
async def is_migration_in_progress(engine: AsyncEngine) -> bool:
"""
Check if a migration is currently in progress.
Args:
engine: Async SQLAlchemy engine
Returns:
True if migration is in progress, False otherwise
"""
try:
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT in_progress FROM migration_status WHERE id = 1")
)
row = result.fetchone()
return bool(row[0]) if row else False
except Exception:
# If table doesn't exist or query fails, assume no migration in progress
return False

View File

@@ -20,7 +20,7 @@ from markupsafe import Markup
from pandas import DataFrame
from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import config, database, decode_payload, models, store
from meshview import config, database, decode_payload, migrations, models, store
logging.basicConfig(
level=logging.INFO,
@@ -1768,6 +1768,21 @@ async def serve_page(request):
async def run_server():
# Wait for database migrations to complete before starting web server
logger.info("Checking database schema status...")
database_url = CONFIG["database"]["connection_string"]
# Wait for migrations to complete (writer app responsibility)
migration_ready = await migrations.wait_for_migrations(
database.engine, database_url, max_retries=30, retry_delay=2
)
if not migration_ready:
logger.error("Database schema is not up to date. Cannot start web server.")
raise RuntimeError("Database schema version mismatch - migrations not complete")
logger.info("Database schema verified - starting web server")
app = web.Application()
app.add_routes(routes)

View File

@@ -12,6 +12,7 @@ aiosqlite~=0.21.0
# Database + ORM
sqlalchemy[asyncio]~=2.0.38
alembic~=1.14.0
# Serialization / security
protobuf~=5.29.3
@@ -41,4 +42,4 @@ pillow~=11.1.0
# Debugging / profiling
psutil~=7.0.0
objgraph~=3.6.2
objgraph~=3.6.2

View File

@@ -5,7 +5,7 @@ import logging
from sqlalchemy import delete
from meshview import models, mqtt_database, mqtt_reader, mqtt_store
from meshview import migrations, models, mqtt_database, mqtt_reader, mqtt_store
from meshview.config import CONFIG
# -------------------------
@@ -134,9 +134,32 @@ async def load_database_from_mqtt(
# Main function
# -------------------------
async def main():
logger = logging.getLogger(__name__)
# Initialize database
mqtt_database.init_database(CONFIG["database"]["connection_string"])
await mqtt_database.create_tables()
database_url = CONFIG["database"]["connection_string"]
mqtt_database.init_database(database_url)
# Create migration status table
await migrations.create_migration_status_table(mqtt_database.engine)
# Set migration in progress flag
await migrations.set_migration_in_progress(mqtt_database.engine, True)
logger.info("Migration status set to 'in progress'")
try:
# Run any pending migrations (synchronous operation)
logger.info("Checking for pending database migrations...")
migrations.run_migrations(database_url)
logger.info("Database migrations check complete")
# Create tables if needed (for backwards compatibility)
await mqtt_database.create_tables()
finally:
# Clear migration in progress flag
await migrations.set_migration_in_progress(mqtt_database.engine, False)
logger.info("Migration status cleared - database ready")
mqtt_user = CONFIG["mqtt"].get("username") or None
mqtt_passwd = CONFIG["mqtt"].get("password") or None
@@ -148,6 +171,7 @@ async def main():
cleanup_hour = get_int(CONFIG, "cleanup", "hour", 2)
cleanup_minute = get_int(CONFIG, "cleanup", "minute", 0)
logger.info("Starting MQTT ingestion and cleanup tasks...")
async with asyncio.TaskGroup() as tg:
tg.create_task(
load_database_from_mqtt(