|
| 1 | +"""Programmatic migration runner for Tuttle. |
| 2 | +
|
| 3 | +This module provides functions to run Alembic migrations from within |
| 4 | +the application (e.g. at startup), without requiring the alembic CLI. |
| 5 | +
|
| 6 | +Usage: |
| 7 | + from tuttle.migrations.run import run_migrations |
| 8 | + run_migrations("sqlite:///path/to/tuttle.db") |
| 9 | +""" |
| 10 | + |
| 11 | +from pathlib import Path |
| 12 | + |
| 13 | +from alembic import command |
| 14 | +from alembic.config import Config |
| 15 | +from alembic.runtime.migration import MigrationContext |
| 16 | +from alembic.script import ScriptDirectory |
| 17 | +from loguru import logger |
| 18 | +from sqlalchemy import create_engine, inspect |
| 19 | + |
| 20 | + |
| 21 | +# Directory containing this module (= the migrations package) |
| 22 | +_MIGRATIONS_DIR = Path(__file__).parent |
| 23 | + |
| 24 | + |
| 25 | +def _get_alembic_config(db_url: str) -> Config: |
| 26 | + """Create an Alembic Config pointing at the bundled migrations.""" |
| 27 | + ini_path = _MIGRATIONS_DIR / "alembic.ini" |
| 28 | + cfg = Config(str(ini_path)) |
| 29 | + cfg.set_main_option("script_location", str(_MIGRATIONS_DIR)) |
| 30 | + cfg.set_main_option("sqlalchemy.url", db_url) |
| 31 | + return cfg |
| 32 | + |
| 33 | + |
| 34 | +def get_head_revision() -> str | None: |
| 35 | + """Return the head revision from the migration scripts.""" |
| 36 | + script = ScriptDirectory(str(_MIGRATIONS_DIR)) |
| 37 | + return script.get_current_head() |
| 38 | + |
| 39 | + |
| 40 | +def run_migrations(db_url: str) -> None: |
| 41 | + """Run all pending Alembic migrations on the database. |
| 42 | +
|
| 43 | + Handles three scenarios: |
| 44 | + 1. **New database** (no tables): Alembic creates everything via migrations. |
| 45 | + 2. **Pre-Alembic database** (tables exist, no alembic_version): |
| 46 | + Stamps the DB at the baseline revision, then applies new migrations. |
| 47 | + 3. **Migrated database** (alembic_version exists): Applies pending migrations. |
| 48 | + """ |
| 49 | + head = get_head_revision() |
| 50 | + if head is None: |
| 51 | + logger.info("No migration scripts found, skipping migrations") |
| 52 | + return |
| 53 | + |
| 54 | + cfg = _get_alembic_config(db_url) |
| 55 | + engine = create_engine(db_url) |
| 56 | + |
| 57 | + try: |
| 58 | + insp = inspect(engine) |
| 59 | + table_names = insp.get_table_names() |
| 60 | + has_version_table = "alembic_version" in table_names |
| 61 | + has_app_tables = any(t != "alembic_version" for t in table_names) |
| 62 | + |
| 63 | + # Get current revision |
| 64 | + with engine.connect() as conn: |
| 65 | + ctx = MigrationContext.configure(conn) |
| 66 | + current = ctx.get_current_revision() |
| 67 | + |
| 68 | + if has_app_tables and not has_version_table: |
| 69 | + # Pre-Alembic database: stamp at baseline so future migrations apply |
| 70 | + script = ScriptDirectory(str(_MIGRATIONS_DIR)) |
| 71 | + base = script.get_base() |
| 72 | + if base is None: |
| 73 | + logger.warning("No migration scripts found, nothing to stamp") |
| 74 | + return |
| 75 | + logger.info( |
| 76 | + "Pre-existing database detected without Alembic version table. " |
| 77 | + f"Stamping at baseline revision: {base}" |
| 78 | + ) |
| 79 | + command.stamp(cfg, base) |
| 80 | + current = base |
| 81 | + |
| 82 | + if current == head: |
| 83 | + logger.debug(f"Database is already at head revision ({head})") |
| 84 | + return |
| 85 | + |
| 86 | + logger.info(f"Running migrations: {current} → {head}") |
| 87 | + command.upgrade(cfg, "head") |
| 88 | + logger.info("Migrations completed successfully") |
| 89 | + |
| 90 | + finally: |
| 91 | + engine.dispose() |
0 commit comments