Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,18 @@ func (m *Migrator) MigrateTo(ctx context.Context, targetVersion int32) (err erro
return err
}

// Fast path: if the database is already at targetVersion, return without
// acquiring the advisory lock. This avoids contention for callers that
// re-run Migrate after deploys with no schema change — a common case in
// CI/CD pipelines where every deploy invokes the migrator regardless of
// whether new migration files were added.
//
// Conservative by design: any error or out-of-range version falls through
// to the locked path so error semantics remain identical to before.
if atTarget, _ := m.atTargetVersion(ctx, targetVersion); atTarget {
return nil
}

err = acquireAdvisoryLock(ctx, m.conn)
if err != nil {
return err
Expand Down Expand Up @@ -484,6 +496,28 @@ func (m *Migrator) GetCurrentVersion(ctx context.Context) (v int32, err error) {
return v, err
}

// atTargetVersion reports whether the database is already at targetVersion,
// without acquiring the advisory lock. Used by MigrateTo as a fast path so
// schema-stable deploys do not contend on the lock.
//
// Returns false (not an error) when targetVersion is out of range or when
// the version stored in the schema_version table is out of range. The caller
// falls through to the locked path, which produces the existing
// BadVersionError, so user-visible error semantics do not change.
func (m *Migrator) atTargetVersion(ctx context.Context, targetVersion int32) (bool, error) {
if targetVersion < 0 || int32(len(m.Migrations)) < targetVersion {
return false, nil
}
currentVersion, err := m.GetCurrentVersion(ctx)
if err != nil {
return false, err
}
if currentVersion < 0 || int32(len(m.Migrations)) < currentVersion {
return false, nil
}
return currentVersion == targetVersion, nil
}

// SetVersion sets the current migration version without running any migrations.
// This is useful for baselining an existing database when adopting tern.
func (m *Migrator) SetVersion(ctx context.Context, version int32) (err error) {
Expand Down
46 changes: 46 additions & 0 deletions migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/exec"
"testing"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand Down Expand Up @@ -345,6 +346,51 @@ func TestMigrate(t *testing.T) {
assert.EqualValues(t, 3, currentVersion)
}

// TestMigrateToSkipsAdvisoryLockWhenAtTarget proves that calling MigrateTo
// against a database already at the target version does not contend on the
// migrations advisory lock. The mechanism: hold the advisory lock from a
// separate connection and call MigrateTo with a bounded context — without
// the fast path it would block on pg_advisory_lock until the deadline; with
// it the call returns immediately.
func TestMigrateToSkipsAdvisoryLockWhenAtTarget(t *testing.T) {
conn := connectConn(t)
defer conn.Close(context.Background())
m := createSampleMigrator(t, conn)

// Bring the schema to the highest available version through the regular path.
require.NoError(t, m.Migrate(context.Background()))

// Hold tern's advisory lock from a second session. The lock id is
// the same private constant tern uses internally; if MigrateTo took
// the lock again, it would block on this until the test times out.
connStr, ok := os.LookupEnv("MIGRATE_TEST_CONN_STRING")
require.True(t, ok, "MIGRATE_TEST_CONN_STRING must be set")
lockConn, err := pgx.Connect(context.Background(), connStr)
require.NoError(t, err)
defer lockConn.Close(context.Background())
const ternLockNum = int64(9628173550095224)
_, err = lockConn.Exec(context.Background(), "select pg_advisory_lock($1)", ternLockNum)
require.NoError(t, err)
defer func() {
_, _ = lockConn.Exec(context.Background(), "select pg_advisory_unlock($1)", ternLockNum)
}()

// Bound the call so a regression manifests as a clear failure rather than a hang.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
start := time.Now()
err = m.MigrateTo(ctx, 3)
require.NoError(t, err)
require.Less(t, time.Since(start), 2*time.Second,
"MigrateTo at target should not contend on the advisory lock")

// Migrate should behave identically (it delegates to MigrateTo).
start = time.Now()
require.NoError(t, m.Migrate(ctx))
require.Less(t, time.Since(start), 2*time.Second,
"Migrate at target should not contend on the advisory lock")
}

func TestMigrateUsingGoFunctions(t *testing.T) {
conn := connectConn(t)
defer conn.Close(context.Background())
Expand Down