From 64169787b370db50b0bec276a75002f2ac79cbae Mon Sep 17 00:00:00 2001 From: Joel Krauska Date: Mon, 3 Nov 2025 14:11:42 -0800 Subject: [PATCH] modify alembic to support cleaner migrations --- alembic/env.py | 11 +- .../1717fa5c6545_add_example_table.py | 2 +- .../versions/add_import_time_us_columns.py | 62 +++++++++++ .../c88468b7ab0b_initial_migration.py | 105 +++++++++++++++++- ...0c470_add_microsecond_timestamp_columns.py | 92 --------------- 5 files changed, 174 insertions(+), 98 deletions(-) create mode 100644 alembic/versions/add_import_time_us_columns.py delete mode 100644 alembic/versions/fb5781f0c470_add_microsecond_timestamp_columns.py diff --git a/alembic/env.py b/alembic/env.py index 757ec03..e7a505c 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -84,7 +84,16 @@ async def run_async_migrations() -> None: def run_migrations_online() -> None: """Run migrations in 'online' mode with async support.""" - asyncio.run(run_async_migrations()) + try: + # Try to get existing event loop + loop = asyncio.get_running_loop() + # Event loop is already running, schedule and run the coroutine + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as pool: + pool.submit(lambda: asyncio.run(run_async_migrations())).result() + except RuntimeError: + # No event loop running, create one + asyncio.run(run_async_migrations()) if context.is_offline_mode(): diff --git a/alembic/versions/1717fa5c6545_add_example_table.py b/alembic/versions/1717fa5c6545_add_example_table.py index 7d49c12..4418a3c 100644 --- a/alembic/versions/1717fa5c6545_add_example_table.py +++ b/alembic/versions/1717fa5c6545_add_example_table.py @@ -14,7 +14,7 @@ from alembic import op # revision identifiers, used by Alembic. revision: str = '1717fa5c6545' -down_revision: Union[str, None] = 'c88468b7ab0b' +down_revision: Union[str, None] = 'add_time_us_cols' branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None diff --git a/alembic/versions/add_import_time_us_columns.py b/alembic/versions/add_import_time_us_columns.py new file mode 100644 index 0000000..cfeb0ef --- /dev/null +++ b/alembic/versions/add_import_time_us_columns.py @@ -0,0 +1,62 @@ +"""add import_time_us columns + +Revision ID: add_time_us_cols +Revises: c88468b7ab0b +Create Date: 2025-11-03 14:10:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'add_time_us_cols' +down_revision: Union[str, None] = 'c88468b7ab0b' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Check if columns already exist, add them if they don't + conn = op.get_bind() + inspector = sa.inspect(conn) + + # Add import_time_us to packet table + packet_columns = [col['name'] for col in inspector.get_columns('packet')] + if 'import_time_us' not in packet_columns: + with op.batch_alter_table('packet', schema=None) as batch_op: + batch_op.add_column(sa.Column('import_time_us', sa.BigInteger(), nullable=True)) + op.create_index('idx_packet_import_time_us', 'packet', [sa.text('import_time_us DESC')], unique=False) + op.create_index('idx_packet_from_node_time_us', 'packet', ['from_node_id', sa.text('import_time_us DESC')], unique=False) + + # Add import_time_us to packet_seen table + packet_seen_columns = [col['name'] for col in inspector.get_columns('packet_seen')] + if 'import_time_us' not in packet_seen_columns: + with op.batch_alter_table('packet_seen', schema=None) as batch_op: + batch_op.add_column(sa.Column('import_time_us', sa.BigInteger(), nullable=True)) + op.create_index('idx_packet_seen_import_time_us', 'packet_seen', ['import_time_us'], unique=False) + + # Add import_time_us to traceroute table + traceroute_columns = [col['name'] for col in inspector.get_columns('traceroute')] + if 'import_time_us' not in traceroute_columns: + with op.batch_alter_table('traceroute', schema=None) as batch_op: + batch_op.add_column(sa.Column('import_time_us', sa.BigInteger(), nullable=True)) + op.create_index('idx_traceroute_import_time_us', 'traceroute', ['import_time_us'], unique=False) + + +def downgrade() -> None: + # Drop indexes and columns + op.drop_index('idx_traceroute_import_time_us', table_name='traceroute') + with op.batch_alter_table('traceroute', schema=None) as batch_op: + batch_op.drop_column('import_time_us') + + op.drop_index('idx_packet_seen_import_time_us', table_name='packet_seen') + with op.batch_alter_table('packet_seen', schema=None) as batch_op: + batch_op.drop_column('import_time_us') + + op.drop_index('idx_packet_from_node_time_us', table_name='packet') + op.drop_index('idx_packet_import_time_us', table_name='packet') + with op.batch_alter_table('packet', schema=None) as batch_op: + batch_op.drop_column('import_time_us') diff --git a/alembic/versions/c88468b7ab0b_initial_migration.py b/alembic/versions/c88468b7ab0b_initial_migration.py index caf55b2..a307d6e 100644 --- a/alembic/versions/c88468b7ab0b_initial_migration.py +++ b/alembic/versions/c88468b7ab0b_initial_migration.py @@ -21,15 +21,112 @@ depends_on: Union[str, Sequence[str], None] = None def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### - op.drop_index('idx_packet_import_time', table_name='packet') - op.create_index('idx_packet_import_time', 'packet', [sa.text('import_time DESC')], unique=False) - op.create_index('idx_packet_seen_packet_id', 'packet_seen', ['packet_id'], unique=False) + # Get connection and inspector to check what exists + conn = op.get_bind() + inspector = sa.inspect(conn) + existing_tables = inspector.get_table_names() + + # Create node table if it doesn't exist + if 'node' not in existing_tables: + op.create_table('node', + sa.Column('id', sa.String(), nullable=False), + sa.Column('node_id', sa.BigInteger(), nullable=True), + sa.Column('long_name', sa.String(), nullable=True), + sa.Column('short_name', sa.String(), nullable=True), + sa.Column('hw_model', sa.String(), nullable=True), + sa.Column('firmware', sa.String(), nullable=True), + sa.Column('role', sa.String(), nullable=True), + sa.Column('last_lat', sa.BigInteger(), nullable=True), + sa.Column('last_long', sa.BigInteger(), nullable=True), + sa.Column('channel', sa.String(), nullable=True), + sa.Column('last_update', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('node_id') + ) + op.create_index('idx_node_node_id', 'node', ['node_id'], unique=False) + + # Create packet table if it doesn't exist + if 'packet' not in existing_tables: + op.create_table('packet', + sa.Column('id', sa.BigInteger(), nullable=False), + sa.Column('portnum', sa.Integer(), nullable=True), + sa.Column('from_node_id', sa.BigInteger(), nullable=True), + sa.Column('to_node_id', sa.BigInteger(), nullable=True), + sa.Column('payload', sa.LargeBinary(), nullable=True), + sa.Column('import_time', sa.DateTime(), nullable=True), + sa.Column('import_time_us', sa.BigInteger(), nullable=True), + sa.Column('channel', sa.String(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('idx_packet_from_node_id', 'packet', ['from_node_id'], unique=False) + op.create_index('idx_packet_to_node_id', 'packet', ['to_node_id'], unique=False) + op.create_index('idx_packet_import_time', 'packet', [sa.text('import_time DESC')], unique=False) + op.create_index('idx_packet_import_time_us', 'packet', [sa.text('import_time_us DESC')], unique=False) + op.create_index('idx_packet_from_node_time', 'packet', ['from_node_id', sa.text('import_time DESC')], unique=False) + op.create_index('idx_packet_from_node_time_us', 'packet', ['from_node_id', sa.text('import_time_us DESC')], unique=False) + + # Create packet_seen table if it doesn't exist + if 'packet_seen' not in existing_tables: + op.create_table('packet_seen', + sa.Column('packet_id', sa.BigInteger(), nullable=False), + sa.Column('node_id', sa.BigInteger(), nullable=False), + sa.Column('rx_time', sa.BigInteger(), nullable=False), + sa.Column('hop_limit', sa.Integer(), nullable=True), + sa.Column('hop_start', sa.Integer(), nullable=True), + sa.Column('channel', sa.String(), nullable=True), + sa.Column('rx_snr', sa.Float(), nullable=True), + sa.Column('rx_rssi', sa.Integer(), nullable=True), + sa.Column('topic', sa.String(), nullable=True), + sa.Column('import_time', sa.DateTime(), nullable=True), + sa.Column('import_time_us', sa.BigInteger(), nullable=True), + sa.ForeignKeyConstraint(['packet_id'], ['packet.id'], ), + sa.PrimaryKeyConstraint('packet_id', 'node_id', 'rx_time') + ) + op.create_index('idx_packet_seen_node_id', 'packet_seen', ['node_id'], unique=False) + op.create_index('idx_packet_seen_packet_id', 'packet_seen', ['packet_id'], unique=False) + op.create_index('idx_packet_seen_import_time_us', 'packet_seen', ['import_time_us'], unique=False) + + # Create traceroute table if it doesn't exist + if 'traceroute' not in existing_tables: + op.create_table('traceroute', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('packet_id', sa.BigInteger(), nullable=True), + sa.Column('gateway_node_id', sa.BigInteger(), nullable=True), + sa.Column('done', sa.Boolean(), nullable=True), + sa.Column('route', sa.LargeBinary(), nullable=True), + sa.Column('import_time', sa.DateTime(), nullable=True), + sa.Column('import_time_us', sa.BigInteger(), nullable=True), + sa.ForeignKeyConstraint(['packet_id'], ['packet.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('idx_traceroute_import_time', 'traceroute', ['import_time'], unique=False) + op.create_index('idx_traceroute_import_time_us', 'traceroute', ['import_time_us'], unique=False) # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### + # Drop traceroute table and indexes + op.drop_index('idx_traceroute_import_time_us', table_name='traceroute') + op.drop_index('idx_traceroute_import_time', table_name='traceroute') + op.drop_table('traceroute') + + # Drop packet_seen table and indexes + op.drop_index('idx_packet_seen_import_time_us', table_name='packet_seen') op.drop_index('idx_packet_seen_packet_id', table_name='packet_seen') + op.drop_index('idx_packet_seen_node_id', table_name='packet_seen') + op.drop_table('packet_seen') + + # Drop packet table and indexes + op.drop_index('idx_packet_from_node_time_us', table_name='packet') + op.drop_index('idx_packet_from_node_time', table_name='packet') + op.drop_index('idx_packet_import_time_us', table_name='packet') op.drop_index('idx_packet_import_time', table_name='packet') - op.create_index('idx_packet_import_time', 'packet', ['import_time'], unique=False) + op.drop_index('idx_packet_to_node_id', table_name='packet') + op.drop_index('idx_packet_from_node_id', table_name='packet') + op.drop_table('packet') + + # Drop node table and indexes + op.drop_index('idx_node_node_id', table_name='node') + op.drop_table('node') # ### end Alembic commands ### diff --git a/alembic/versions/fb5781f0c470_add_microsecond_timestamp_columns.py b/alembic/versions/fb5781f0c470_add_microsecond_timestamp_columns.py deleted file mode 100644 index de0af9d..0000000 --- a/alembic/versions/fb5781f0c470_add_microsecond_timestamp_columns.py +++ /dev/null @@ -1,92 +0,0 @@ -"""add_microsecond_timestamp_columns - -Adds import_time_us INTEGER columns to packet, packet_seen, and traceroute tables. -This implements the changes described in GitHub issue #55: -- Adds import_time_us INTEGER columns to track microsecond-precision timestamps -- Populates new columns from existing import_time datetime values -- Creates indexes on the new columns for performance - -Revision ID: fb5781f0c470 -Revises: 1717fa5c6545 -Create Date: 2025-11-03 12:59:03.202458 - -""" -from typing import Sequence, Union - -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision: str = 'fb5781f0c470' -down_revision: Union[str, None] = '1717fa5c6545' -branch_labels: Union[str, Sequence[str], None] = None -depends_on: Union[str, Sequence[str], None] = None - - -def upgrade() -> None: - """Add import_time_us columns and populate them from existing data.""" - - # Add import_time_us column to packet table - with op.batch_alter_table('packet', schema=None) as batch_op: - batch_op.add_column(sa.Column('import_time_us', sa.Integer(), nullable=True)) - - # Add import_time_us column to packet_seen table - with op.batch_alter_table('packet_seen', schema=None) as batch_op: - batch_op.add_column(sa.Column('import_time_us', sa.Integer(), nullable=True)) - - # Add import_time_us column to traceroute table - with op.batch_alter_table('traceroute', schema=None) as batch_op: - batch_op.add_column(sa.Column('import_time_us', sa.Integer(), nullable=True)) - - # Populate packet.import_time_us from existing import_time data - # Note: import_time is stored as local time text, but we convert to UTC timestamp - # strftime('%s', ...) interprets the datetime as UTC - op.execute(""" - UPDATE packet - SET import_time_us = - CAST((strftime('%s', import_time) || substr(import_time, 21, 6)) AS INTEGER) - WHERE import_time IS NOT NULL - """) - - # Populate packet_seen.import_time_us - op.execute(""" - UPDATE packet_seen - SET import_time_us = - CAST((strftime('%s', import_time) || substr(import_time, 21, 6)) AS INTEGER) - WHERE import_time IS NOT NULL - """) - - # Populate traceroute.import_time_us - op.execute(""" - UPDATE traceroute - SET import_time_us = - CAST((strftime('%s', import_time) || substr(import_time, 21, 6)) AS INTEGER) - WHERE import_time IS NOT NULL - """) - - # Create indexes on the new columns - op.create_index('idx_packet_import_time_us', 'packet', [sa.text('import_time_us DESC')]) - op.create_index('idx_packet_from_node_time_us', 'packet', ['from_node_id', sa.text('import_time_us DESC')]) - op.create_index('idx_packet_seen_import_time_us', 'packet_seen', ['import_time_us']) - op.create_index('idx_traceroute_import_time_us', 'traceroute', ['import_time_us']) - - -def downgrade() -> None: - """Remove import_time_us columns and their indexes.""" - - # Drop indexes - op.drop_index('idx_traceroute_import_time_us', table_name='traceroute') - op.drop_index('idx_packet_seen_import_time_us', table_name='packet_seen') - op.drop_index('idx_packet_from_node_time_us', table_name='packet') - op.drop_index('idx_packet_import_time_us', table_name='packet') - - # Drop columns - with op.batch_alter_table('traceroute', schema=None) as batch_op: - batch_op.drop_column('import_time_us') - - with op.batch_alter_table('packet_seen', schema=None) as batch_op: - batch_op.drop_column('import_time_us') - - with op.batch_alter_table('packet', schema=None) as batch_op: - batch_op.drop_column('import_time_us')