modify alembic to support cleaner migrations

This commit is contained in:
Joel Krauska
2025-11-03 14:11:42 -08:00
parent fa28f6b63f
commit 64169787b3
5 changed files with 174 additions and 98 deletions

View File

@@ -84,7 +84,16 @@ async def run_async_migrations() -> None:
def run_migrations_online() -> None:
"""Run migrations in 'online' mode with async support."""
asyncio.run(run_async_migrations())
try:
# Try to get existing event loop
loop = asyncio.get_running_loop()
# Event loop is already running, schedule and run the coroutine
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as pool:
pool.submit(lambda: asyncio.run(run_async_migrations())).result()
except RuntimeError:
# No event loop running, create one
asyncio.run(run_async_migrations())
if context.is_offline_mode():

View File

@@ -14,7 +14,7 @@ from alembic import op
# revision identifiers, used by Alembic.
revision: str = '1717fa5c6545'
down_revision: Union[str, None] = 'c88468b7ab0b'
down_revision: Union[str, None] = 'add_time_us_cols'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None

View File

@@ -0,0 +1,62 @@
"""add import_time_us columns
Revision ID: add_time_us_cols
Revises: c88468b7ab0b
Create Date: 2025-11-03 14:10:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'add_time_us_cols'
down_revision: Union[str, None] = 'c88468b7ab0b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Check if columns already exist, add them if they don't
conn = op.get_bind()
inspector = sa.inspect(conn)
# Add import_time_us to packet table
packet_columns = [col['name'] for col in inspector.get_columns('packet')]
if 'import_time_us' not in packet_columns:
with op.batch_alter_table('packet', schema=None) as batch_op:
batch_op.add_column(sa.Column('import_time_us', sa.BigInteger(), nullable=True))
op.create_index('idx_packet_import_time_us', 'packet', [sa.text('import_time_us DESC')], unique=False)
op.create_index('idx_packet_from_node_time_us', 'packet', ['from_node_id', sa.text('import_time_us DESC')], unique=False)
# Add import_time_us to packet_seen table
packet_seen_columns = [col['name'] for col in inspector.get_columns('packet_seen')]
if 'import_time_us' not in packet_seen_columns:
with op.batch_alter_table('packet_seen', schema=None) as batch_op:
batch_op.add_column(sa.Column('import_time_us', sa.BigInteger(), nullable=True))
op.create_index('idx_packet_seen_import_time_us', 'packet_seen', ['import_time_us'], unique=False)
# Add import_time_us to traceroute table
traceroute_columns = [col['name'] for col in inspector.get_columns('traceroute')]
if 'import_time_us' not in traceroute_columns:
with op.batch_alter_table('traceroute', schema=None) as batch_op:
batch_op.add_column(sa.Column('import_time_us', sa.BigInteger(), nullable=True))
op.create_index('idx_traceroute_import_time_us', 'traceroute', ['import_time_us'], unique=False)
def downgrade() -> None:
# Drop indexes and columns
op.drop_index('idx_traceroute_import_time_us', table_name='traceroute')
with op.batch_alter_table('traceroute', schema=None) as batch_op:
batch_op.drop_column('import_time_us')
op.drop_index('idx_packet_seen_import_time_us', table_name='packet_seen')
with op.batch_alter_table('packet_seen', schema=None) as batch_op:
batch_op.drop_column('import_time_us')
op.drop_index('idx_packet_from_node_time_us', table_name='packet')
op.drop_index('idx_packet_import_time_us', table_name='packet')
with op.batch_alter_table('packet', schema=None) as batch_op:
batch_op.drop_column('import_time_us')

View File

@@ -21,15 +21,112 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('idx_packet_import_time', table_name='packet')
op.create_index('idx_packet_import_time', 'packet', [sa.text('import_time DESC')], unique=False)
op.create_index('idx_packet_seen_packet_id', 'packet_seen', ['packet_id'], unique=False)
# Get connection and inspector to check what exists
conn = op.get_bind()
inspector = sa.inspect(conn)
existing_tables = inspector.get_table_names()
# Create node table if it doesn't exist
if 'node' not in existing_tables:
op.create_table('node',
sa.Column('id', sa.String(), nullable=False),
sa.Column('node_id', sa.BigInteger(), nullable=True),
sa.Column('long_name', sa.String(), nullable=True),
sa.Column('short_name', sa.String(), nullable=True),
sa.Column('hw_model', sa.String(), nullable=True),
sa.Column('firmware', sa.String(), nullable=True),
sa.Column('role', sa.String(), nullable=True),
sa.Column('last_lat', sa.BigInteger(), nullable=True),
sa.Column('last_long', sa.BigInteger(), nullable=True),
sa.Column('channel', sa.String(), nullable=True),
sa.Column('last_update', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('node_id')
)
op.create_index('idx_node_node_id', 'node', ['node_id'], unique=False)
# Create packet table if it doesn't exist
if 'packet' not in existing_tables:
op.create_table('packet',
sa.Column('id', sa.BigInteger(), nullable=False),
sa.Column('portnum', sa.Integer(), nullable=True),
sa.Column('from_node_id', sa.BigInteger(), nullable=True),
sa.Column('to_node_id', sa.BigInteger(), nullable=True),
sa.Column('payload', sa.LargeBinary(), nullable=True),
sa.Column('import_time', sa.DateTime(), nullable=True),
sa.Column('import_time_us', sa.BigInteger(), nullable=True),
sa.Column('channel', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index('idx_packet_from_node_id', 'packet', ['from_node_id'], unique=False)
op.create_index('idx_packet_to_node_id', 'packet', ['to_node_id'], unique=False)
op.create_index('idx_packet_import_time', 'packet', [sa.text('import_time DESC')], unique=False)
op.create_index('idx_packet_import_time_us', 'packet', [sa.text('import_time_us DESC')], unique=False)
op.create_index('idx_packet_from_node_time', 'packet', ['from_node_id', sa.text('import_time DESC')], unique=False)
op.create_index('idx_packet_from_node_time_us', 'packet', ['from_node_id', sa.text('import_time_us DESC')], unique=False)
# Create packet_seen table if it doesn't exist
if 'packet_seen' not in existing_tables:
op.create_table('packet_seen',
sa.Column('packet_id', sa.BigInteger(), nullable=False),
sa.Column('node_id', sa.BigInteger(), nullable=False),
sa.Column('rx_time', sa.BigInteger(), nullable=False),
sa.Column('hop_limit', sa.Integer(), nullable=True),
sa.Column('hop_start', sa.Integer(), nullable=True),
sa.Column('channel', sa.String(), nullable=True),
sa.Column('rx_snr', sa.Float(), nullable=True),
sa.Column('rx_rssi', sa.Integer(), nullable=True),
sa.Column('topic', sa.String(), nullable=True),
sa.Column('import_time', sa.DateTime(), nullable=True),
sa.Column('import_time_us', sa.BigInteger(), nullable=True),
sa.ForeignKeyConstraint(['packet_id'], ['packet.id'], ),
sa.PrimaryKeyConstraint('packet_id', 'node_id', 'rx_time')
)
op.create_index('idx_packet_seen_node_id', 'packet_seen', ['node_id'], unique=False)
op.create_index('idx_packet_seen_packet_id', 'packet_seen', ['packet_id'], unique=False)
op.create_index('idx_packet_seen_import_time_us', 'packet_seen', ['import_time_us'], unique=False)
# Create traceroute table if it doesn't exist
if 'traceroute' not in existing_tables:
op.create_table('traceroute',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('packet_id', sa.BigInteger(), nullable=True),
sa.Column('gateway_node_id', sa.BigInteger(), nullable=True),
sa.Column('done', sa.Boolean(), nullable=True),
sa.Column('route', sa.LargeBinary(), nullable=True),
sa.Column('import_time', sa.DateTime(), nullable=True),
sa.Column('import_time_us', sa.BigInteger(), nullable=True),
sa.ForeignKeyConstraint(['packet_id'], ['packet.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index('idx_traceroute_import_time', 'traceroute', ['import_time'], unique=False)
op.create_index('idx_traceroute_import_time_us', 'traceroute', ['import_time_us'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Drop traceroute table and indexes
op.drop_index('idx_traceroute_import_time_us', table_name='traceroute')
op.drop_index('idx_traceroute_import_time', table_name='traceroute')
op.drop_table('traceroute')
# Drop packet_seen table and indexes
op.drop_index('idx_packet_seen_import_time_us', table_name='packet_seen')
op.drop_index('idx_packet_seen_packet_id', table_name='packet_seen')
op.drop_index('idx_packet_seen_node_id', table_name='packet_seen')
op.drop_table('packet_seen')
# Drop packet table and indexes
op.drop_index('idx_packet_from_node_time_us', table_name='packet')
op.drop_index('idx_packet_from_node_time', table_name='packet')
op.drop_index('idx_packet_import_time_us', table_name='packet')
op.drop_index('idx_packet_import_time', table_name='packet')
op.create_index('idx_packet_import_time', 'packet', ['import_time'], unique=False)
op.drop_index('idx_packet_to_node_id', table_name='packet')
op.drop_index('idx_packet_from_node_id', table_name='packet')
op.drop_table('packet')
# Drop node table and indexes
op.drop_index('idx_node_node_id', table_name='node')
op.drop_table('node')
# ### end Alembic commands ###

View File

@@ -1,92 +0,0 @@
"""add_microsecond_timestamp_columns
Adds import_time_us INTEGER columns to packet, packet_seen, and traceroute tables.
This implements the changes described in GitHub issue #55:
- Adds import_time_us INTEGER columns to track microsecond-precision timestamps
- Populates new columns from existing import_time datetime values
- Creates indexes on the new columns for performance
Revision ID: fb5781f0c470
Revises: 1717fa5c6545
Create Date: 2025-11-03 12:59:03.202458
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'fb5781f0c470'
down_revision: Union[str, None] = '1717fa5c6545'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Add import_time_us columns and populate them from existing data."""
# Add import_time_us column to packet table
with op.batch_alter_table('packet', schema=None) as batch_op:
batch_op.add_column(sa.Column('import_time_us', sa.Integer(), nullable=True))
# Add import_time_us column to packet_seen table
with op.batch_alter_table('packet_seen', schema=None) as batch_op:
batch_op.add_column(sa.Column('import_time_us', sa.Integer(), nullable=True))
# Add import_time_us column to traceroute table
with op.batch_alter_table('traceroute', schema=None) as batch_op:
batch_op.add_column(sa.Column('import_time_us', sa.Integer(), nullable=True))
# Populate packet.import_time_us from existing import_time data
# Note: import_time is stored as local time text, but we convert to UTC timestamp
# strftime('%s', ...) interprets the datetime as UTC
op.execute("""
UPDATE packet
SET import_time_us =
CAST((strftime('%s', import_time) || substr(import_time, 21, 6)) AS INTEGER)
WHERE import_time IS NOT NULL
""")
# Populate packet_seen.import_time_us
op.execute("""
UPDATE packet_seen
SET import_time_us =
CAST((strftime('%s', import_time) || substr(import_time, 21, 6)) AS INTEGER)
WHERE import_time IS NOT NULL
""")
# Populate traceroute.import_time_us
op.execute("""
UPDATE traceroute
SET import_time_us =
CAST((strftime('%s', import_time) || substr(import_time, 21, 6)) AS INTEGER)
WHERE import_time IS NOT NULL
""")
# Create indexes on the new columns
op.create_index('idx_packet_import_time_us', 'packet', [sa.text('import_time_us DESC')])
op.create_index('idx_packet_from_node_time_us', 'packet', ['from_node_id', sa.text('import_time_us DESC')])
op.create_index('idx_packet_seen_import_time_us', 'packet_seen', ['import_time_us'])
op.create_index('idx_traceroute_import_time_us', 'traceroute', ['import_time_us'])
def downgrade() -> None:
"""Remove import_time_us columns and their indexes."""
# Drop indexes
op.drop_index('idx_traceroute_import_time_us', table_name='traceroute')
op.drop_index('idx_packet_seen_import_time_us', table_name='packet_seen')
op.drop_index('idx_packet_from_node_time_us', table_name='packet')
op.drop_index('idx_packet_import_time_us', table_name='packet')
# Drop columns
with op.batch_alter_table('traceroute', schema=None) as batch_op:
batch_op.drop_column('import_time_us')
with op.batch_alter_table('packet_seen', schema=None) as batch_op:
batch_op.drop_column('import_time_us')
with op.batch_alter_table('packet', schema=None) as batch_op:
batch_op.drop_column('import_time_us')