diff --git a/build.sh b/build.sh index 1a5af7036..8c5bf47d8 100755 --- a/build.sh +++ b/build.sh @@ -24,7 +24,7 @@ function build { echo "Building ${osname}-${GOARCH} binary" export GOOS export GOARCH - go build -ldflags "$ldflags" -o $buildpath/$target go/cmd/gh-ost/main.go + CGO_ENABLED="${CGO_ENABLED:-0}" go build -ldflags "$ldflags" -o "$buildpath/$target" go/cmd/gh-ost/main.go if [ $? -ne 0 ]; then echo "Build failed for ${osname} ${GOARCH}." diff --git a/go.sum b/go.sum index 69b51b296..e39b587e8 100644 --- a/go.sum +++ b/go.sum @@ -201,11 +201,13 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -224,6 +226,7 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/go/logic/applier.go b/go/logic/applier.go index 3d88acc0a..3191e5d76 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -6,7 +6,9 @@ package logic import ( + "crypto/sha1" gosql "database/sql" + "encoding/hex" "fmt" "reflect" "regexp" @@ -84,6 +86,12 @@ type Applier struct { dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder + + migrationLockConn *gosql.Conn + migrationLockDB *gosql.DB + migrationLockName string + migrationLockStop chan struct{} + migrationLockDone chan struct{} } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -146,6 +154,149 @@ func (apl *Applier) InitDBConnections() (err error) { return nil } +// buildMigrationLockName returns a deterministic MySQL user-level lock name +// for the given database and table, hashed if longer than MySQL's 64-char limit. +func buildMigrationLockName(db, table string) string { + name := fmt.Sprintf("gh-ost::%s.%s", db, table) + if len(name) <= 64 { + return name + } + sum := sha1.Sum([]byte(name)) + return "gh-ost::" + hex.EncodeToString(sum[:]) +} + +// AcquireMigrationLock takes a user-level lock on a pinned connection, +// preventing two gh-ost processes from migrating the same table concurrently +// on the same MySQL server. +func (apl *Applier) AcquireMigrationLock(ctx context.Context) error { + lockName := buildMigrationLockName(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + + // Use a dedicated *sql.DB so the pinned connection does not consume a + // slot in apl.db's small pool (mysql.MaxDBPoolConnections). + lockURI := apl.connectionConfig.GetDBUri(apl.migrationContext.DatabaseName) + lockDB, err := gosql.Open("mysql", lockURI) + if err != nil { + return fmt.Errorf("failed to open migration lock DB: %w", err) + } + lockDB.SetMaxOpenConns(1) + lockDB.SetMaxIdleConns(1) + + conn, err := lockDB.Conn(ctx) + if err != nil { + lockDB.Close() + return fmt.Errorf("failed to obtain pinned connection for migration lock: %w", err) + } + + var lockResult gosql.NullInt64 + if err := conn.QueryRowContext(ctx, `select /* gh-ost */ get_lock(?, 0)`, lockName).Scan(&lockResult); err != nil { + conn.Close() + lockDB.Close() + return fmt.Errorf("failed to execute GET_LOCK for migration lock %s: %w", lockName, err) + } + + if !lockResult.Valid { + conn.Close() + lockDB.Close() + return fmt.Errorf("GET_LOCK returned NULL while acquiring migration lock %s", lockName) + } + + if lockResult.Int64 != 1 { + var holderID gosql.NullInt64 + _ = conn.QueryRowContext(ctx, `select /* gh-ost */ is_used_lock(?)`, lockName).Scan(&holderID) + conn.Close() + lockDB.Close() + if holderID.Valid { + return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s held by connection id %d", + apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName, holderID.Int64) + } + return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s is held", + apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName) + } + + apl.migrationLockConn = conn + apl.migrationLockDB = lockDB + apl.migrationLockName = lockName + apl.migrationLockStop = make(chan struct{}) + apl.migrationLockDone = make(chan struct{}) + go apl.keepMigrationLockAlive(ctx) + apl.migrationContext.Log.Infof("Acquired migration lock %s", lockName) + return nil +} + +// keepMigrationLockAlive pings the pinned migration-lock connection. If the +// ping fails the lock is considered lost and the migration is aborted via +// PanicAbort. +func (apl *Applier) keepMigrationLockAlive(ctx context.Context) { + defer close(apl.migrationLockDone) + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-apl.migrationLockStop: + return + case <-ticker.C: + } + if err := apl.pingMigrationLockConn(ctx); err != nil { + // Shutdown may have started mid-ping; don't abort if so. + select { + case <-apl.migrationLockStop: + return + default: + } + if ctx.Err() != nil { + return + } + _ = base.SendWithContext(ctx, apl.migrationContext.PanicAbort, + fmt.Errorf("migration lock %s connection lost: %w", apl.migrationLockName, err)) + return + } + } +} + +// pingMigrationLockConn pings the pinned connection with a bounded timeout +// and propagates migrationLockStop as an early cancel so a teardown can +// interrupt a stuck ping. +func (apl *Applier) pingMigrationLockConn(parent context.Context) error { + pingCtx, cancel := context.WithTimeout(parent, 10*time.Second) + defer cancel() + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-apl.migrationLockStop: + cancel() + case <-done: + } + }() + return apl.migrationLockConn.PingContext(pingCtx) +} + +// releaseMigrationLock stops the keepalive goroutine, releases the user-level +// lock and closes the dedicated lock DB. Safe to call when no lock is held. +func (apl *Applier) releaseMigrationLock() { + if apl.migrationLockConn == nil { + return + } + // Stop keepalive before touching the pinned connection. + close(apl.migrationLockStop) + <-apl.migrationLockDone + releaseCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if _, err := apl.migrationLockConn.ExecContext(releaseCtx, `select /* gh-ost */ release_lock(?)`, apl.migrationLockName); err != nil { + apl.migrationContext.Log.Warningf("failed to release migration lock %s: %v", apl.migrationLockName, err) + } + if err := apl.migrationLockConn.Close(); err != nil { + apl.migrationContext.Log.Warningf("failed to close migration lock connection: %v", err) + } + if apl.migrationLockDB != nil { + apl.migrationLockDB.Close() + apl.migrationLockDB = nil + } + apl.migrationLockConn = nil +} + func (apl *Applier) prepareQueries() (err error) { if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder( apl.migrationContext.DatabaseName, @@ -1746,6 +1897,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e func (apl *Applier) Teardown() { apl.migrationContext.Log.Debugf("Tearing down...") + apl.releaseMigrationLock() apl.db.Close() apl.singletonDB.Close() atomic.StoreInt64(&apl.finishedMigrating, 1) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6d7ba42f4..85a5a01d3 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -507,6 +507,83 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi suite.Require().Equal(gosql.ErrNoRows, err) } +func (suite *ApplierTestSuite) TestAcquireMigrationLockSucceedsWhenFree() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + suite.Require().NoError(applier.InitDBConnections()) + suite.Require().NoError(applier.AcquireMigrationLock(ctx)) + suite.Require().NotNil(applier.migrationLockConn) + suite.Require().Equal(buildMigrationLockName(testMysqlDatabase, testMysqlTableName), applier.migrationLockName) +} + +func (suite *ApplierTestSuite) TestAcquireMigrationLockFailsWhenHeld() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContextA := newTestMigrationContext() + migrationContextA.ApplierConnectionConfig = connectionConfig + migrationContextA.SetConnectionConfig("innodb") + + applierA := NewApplier(migrationContextA) + defer applierA.Teardown() + suite.Require().NoError(applierA.InitDBConnections()) + suite.Require().NoError(applierA.AcquireMigrationLock(ctx)) + + connectionConfigB, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContextB := newTestMigrationContext() + migrationContextB.ApplierConnectionConfig = connectionConfigB + migrationContextB.SetConnectionConfig("innodb") + + applierB := NewApplier(migrationContextB) + defer applierB.Teardown() + suite.Require().NoError(applierB.InitDBConnections()) + + err = applierB.AcquireMigrationLock(ctx) + suite.Require().Error(err) + suite.Require().Contains(err.Error(), "already migrating") + suite.Require().Nil(applierB.migrationLockConn) +} + +func TestBuildMigrationLockName(t *testing.T) { + t.Run("short name is returned verbatim", func(t *testing.T) { + name := buildMigrationLockName("mydb", "mytable") + require.Equal(t, "gh-ost::mydb.mytable", name) + require.LessOrEqual(t, len(name), 64) + }) + + t.Run("long name is hashed and within MySQL limit", func(t *testing.T) { + longDB := strings.Repeat("d", 40) + longTable := strings.Repeat("t", 40) + name := buildMigrationLockName(longDB, longTable) + require.LessOrEqual(t, len(name), 64) + require.True(t, strings.HasPrefix(name, "gh-ost::")) + // deterministic + require.Equal(t, name, buildMigrationLockName(longDB, longTable)) + // distinct inputs produce distinct hashes + require.NotEqual(t, name, buildMigrationLockName(longDB, longTable+"x")) + }) +} + func (suite *ApplierTestSuite) TestCreateGhostTable() { ctx := context.Background() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index da1e20f2f..1acfe9093 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1570,6 +1570,9 @@ func (mgtr *Migrator) initiateApplier() error { if err := mgtr.applier.InitDBConnections(); err != nil { return err } + if err := mgtr.applier.AcquireMigrationLock(mgtr.migrationContext.GetContext()); err != nil { + return err + } if mgtr.migrationContext.Revert { if err := mgtr.applier.CreateChangelogTable(); err != nil { mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index a8c555220..fa8c14aee 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -875,6 +875,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) + defer migrator.applier.Teardown() suite.Require().NoError(migrator.applier.prepareQueries()) suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) @@ -946,6 +947,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) + defer migrator.applier.Teardown() suite.Require().NoError(migrator.applier.prepareQueries()) suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) diff --git a/go/logic/test_utils.go b/go/logic/test_utils.go index f552cfc76..cdcfcee84 100644 --- a/go/logic/test_utils.go +++ b/go/logic/test_utils.go @@ -4,8 +4,8 @@ import ( "context" "fmt" + "os" "path/filepath" - "runtime" "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/mysql" @@ -68,9 +68,7 @@ func newTestMigrationContext() *base.MigrationContext { migrationContext.PanicOnWarnings = true migrationContext.AllowedRunningOnMaster = true - //nolint:dogsled - _, filename, _, _ := runtime.Caller(0) - migrationContext.ServeSocketFile = filepath.Join(filepath.Dir(filename), "../../tmp/gh-ost.sock") + migrationContext.ServeSocketFile = filepath.Join(os.TempDir(), "gh-ost.sock") return migrationContext } diff --git a/go/sql/builder.go b/go/sql/builder.go index bc9d99c09..7d0864601 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -335,6 +335,9 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin } func sameFirstColumnValue(rangeStartArgs, rangeEndArgs []interface{}) bool { + if len(rangeStartArgs) == 0 || len(rangeEndArgs) == 0 { + return false + } return fmt.Sprintf("%v", rangeStartArgs[0]) == fmt.Sprintf("%v", rangeEndArgs[0]) } @@ -621,6 +624,9 @@ func buildUniqueKeyRangeEndTwoColumnViaOffset( hint string, ) (result string, explodedArgs []interface{}, err error) { m := newTwoColumnRangeMeta(uniqueKeyColumns) + if len(rangeStartArgs) != 2 || len(rangeEndArgs) != 2 { + return "", nil, fmt.Errorf("expected 2 range args in buildUniqueKeyRangeEndTwoColumnViaOffset, got %d start and %d end", len(rangeStartArgs), len(rangeEndArgs)) + } col2StartOp := string(startRangeComparisonSign) selectClause := m.col1Name + ", " + m.col2Name fromClause := databaseName + "." + tableName @@ -684,6 +690,9 @@ func buildUniqueKeyRangeEndTwoColumnViaTemptable( hint string, ) (result string, explodedArgs []interface{}, err error) { m := newTwoColumnRangeMeta(uniqueKeyColumns) + if len(rangeStartArgs) != 2 || len(rangeEndArgs) != 2 { + return "", nil, fmt.Errorf("expected 2 range args in buildUniqueKeyRangeEndTwoColumnViaTemptable, got %d start and %d end", len(rangeStartArgs), len(rangeEndArgs)) + } col2StartOp := string(startRangeComparisonSign) selectClause := m.col1Name + ", " + m.col2Name fromClause := databaseName + "." + tableName