Skip to content
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function build {
echo "Building ${osname}-${GOARCH} binary"
export GOOS
export GOARCH
go build -ldflags "$ldflags" -o $buildpath/$target go/cmd/gh-ost/main.go
CGO_ENABLED="${CGO_ENABLED:-0}" go build -ldflags "$ldflags" -o "$buildpath/$target" go/cmd/gh-ost/main.go

if [ $? -ne 0 ]; then
echo "Build failed for ${osname} ${GOARCH}."
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,13 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -224,6 +226,7 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
152 changes: 152 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package logic

import (
"crypto/sha1"
gosql "database/sql"
"encoding/hex"
"fmt"
"reflect"
"regexp"
Expand Down Expand Up @@ -84,6 +86,12 @@ type Applier struct {
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder

migrationLockConn *gosql.Conn
migrationLockDB *gosql.DB
migrationLockName string
migrationLockStop chan struct{}
migrationLockDone chan struct{}
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -146,6 +154,149 @@ func (apl *Applier) InitDBConnections() (err error) {
return nil
}

// buildMigrationLockName returns a deterministic MySQL user-level lock name
// for the given database and table, hashed if longer than MySQL's 64-char limit.
func buildMigrationLockName(db, table string) string {
name := fmt.Sprintf("gh-ost::%s.%s", db, table)
if len(name) <= 64 {
return name
}
sum := sha1.Sum([]byte(name))
return "gh-ost::" + hex.EncodeToString(sum[:])
}

// AcquireMigrationLock takes a user-level lock on a pinned connection,
// preventing two gh-ost processes from migrating the same table concurrently
// on the same MySQL server.
func (apl *Applier) AcquireMigrationLock(ctx context.Context) error {
lockName := buildMigrationLockName(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName)

// Use a dedicated *sql.DB so the pinned connection does not consume a
// slot in apl.db's small pool (mysql.MaxDBPoolConnections).
lockURI := apl.connectionConfig.GetDBUri(apl.migrationContext.DatabaseName)
lockDB, err := gosql.Open("mysql", lockURI)
if err != nil {
return fmt.Errorf("failed to open migration lock DB: %w", err)
}
lockDB.SetMaxOpenConns(1)
lockDB.SetMaxIdleConns(1)

conn, err := lockDB.Conn(ctx)
if err != nil {
lockDB.Close()
return fmt.Errorf("failed to obtain pinned connection for migration lock: %w", err)
}

var lockResult gosql.NullInt64
if err := conn.QueryRowContext(ctx, `select /* gh-ost */ get_lock(?, 0)`, lockName).Scan(&lockResult); err != nil {
conn.Close()
lockDB.Close()
return fmt.Errorf("failed to execute GET_LOCK for migration lock %s: %w", lockName, err)
}

if !lockResult.Valid {
conn.Close()
lockDB.Close()
return fmt.Errorf("GET_LOCK returned NULL while acquiring migration lock %s", lockName)
}

if lockResult.Int64 != 1 {
var holderID gosql.NullInt64
_ = conn.QueryRowContext(ctx, `select /* gh-ost */ is_used_lock(?)`, lockName).Scan(&holderID)
conn.Close()
lockDB.Close()
if holderID.Valid {
return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s held by connection id %d",
apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName, holderID.Int64)
}
return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s is held",
apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName)
}

apl.migrationLockConn = conn
apl.migrationLockDB = lockDB
apl.migrationLockName = lockName
apl.migrationLockStop = make(chan struct{})
apl.migrationLockDone = make(chan struct{})
go apl.keepMigrationLockAlive(ctx)
apl.migrationContext.Log.Infof("Acquired migration lock %s", lockName)
return nil
}

// keepMigrationLockAlive pings the pinned migration-lock connection. If the
// ping fails the lock is considered lost and the migration is aborted via
// PanicAbort.
func (apl *Applier) keepMigrationLockAlive(ctx context.Context) {
defer close(apl.migrationLockDone)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-apl.migrationLockStop:
return
case <-ticker.C:
}
if err := apl.pingMigrationLockConn(ctx); err != nil {
// Shutdown may have started mid-ping; don't abort if so.
select {
case <-apl.migrationLockStop:
return
default:
}
if ctx.Err() != nil {
return
}
_ = base.SendWithContext(ctx, apl.migrationContext.PanicAbort,
fmt.Errorf("migration lock %s connection lost: %w", apl.migrationLockName, err))
return
}
}
}

// pingMigrationLockConn pings the pinned connection with a bounded timeout
// and propagates migrationLockStop as an early cancel so a teardown can
// interrupt a stuck ping.
func (apl *Applier) pingMigrationLockConn(parent context.Context) error {
pingCtx, cancel := context.WithTimeout(parent, 10*time.Second)
defer cancel()
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-apl.migrationLockStop:
cancel()
case <-done:
}
}()
return apl.migrationLockConn.PingContext(pingCtx)
}

// releaseMigrationLock stops the keepalive goroutine, releases the user-level
// lock and closes the dedicated lock DB. Safe to call when no lock is held.
func (apl *Applier) releaseMigrationLock() {
if apl.migrationLockConn == nil {
return
}
// Stop keepalive before touching the pinned connection.
close(apl.migrationLockStop)
<-apl.migrationLockDone
releaseCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := apl.migrationLockConn.ExecContext(releaseCtx, `select /* gh-ost */ release_lock(?)`, apl.migrationLockName); err != nil {
apl.migrationContext.Log.Warningf("failed to release migration lock %s: %v", apl.migrationLockName, err)
}
if err := apl.migrationLockConn.Close(); err != nil {
apl.migrationContext.Log.Warningf("failed to close migration lock connection: %v", err)
}
if apl.migrationLockDB != nil {
apl.migrationLockDB.Close()
apl.migrationLockDB = nil
}
apl.migrationLockConn = nil
}

func (apl *Applier) prepareQueries() (err error) {
if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
apl.migrationContext.DatabaseName,
Expand Down Expand Up @@ -1746,6 +1897,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e

func (apl *Applier) Teardown() {
apl.migrationContext.Log.Debugf("Tearing down...")
apl.releaseMigrationLock()
apl.db.Close()
apl.singletonDB.Close()
atomic.StoreInt64(&apl.finishedMigrating, 1)
Expand Down
77 changes: 77 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,83 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi
suite.Require().Equal(gosql.ErrNoRows, err)
}

func (suite *ApplierTestSuite) TestAcquireMigrationLockSucceedsWhenFree() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

applier := NewApplier(migrationContext)
defer applier.Teardown()

suite.Require().NoError(applier.InitDBConnections())
suite.Require().NoError(applier.AcquireMigrationLock(ctx))
suite.Require().NotNil(applier.migrationLockConn)
suite.Require().Equal(buildMigrationLockName(testMysqlDatabase, testMysqlTableName), applier.migrationLockName)
}

func (suite *ApplierTestSuite) TestAcquireMigrationLockFailsWhenHeld() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContextA := newTestMigrationContext()
migrationContextA.ApplierConnectionConfig = connectionConfig
migrationContextA.SetConnectionConfig("innodb")

applierA := NewApplier(migrationContextA)
defer applierA.Teardown()
suite.Require().NoError(applierA.InitDBConnections())
suite.Require().NoError(applierA.AcquireMigrationLock(ctx))

connectionConfigB, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContextB := newTestMigrationContext()
migrationContextB.ApplierConnectionConfig = connectionConfigB
migrationContextB.SetConnectionConfig("innodb")

applierB := NewApplier(migrationContextB)
defer applierB.Teardown()
suite.Require().NoError(applierB.InitDBConnections())

err = applierB.AcquireMigrationLock(ctx)
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "already migrating")
suite.Require().Nil(applierB.migrationLockConn)
}

func TestBuildMigrationLockName(t *testing.T) {
t.Run("short name is returned verbatim", func(t *testing.T) {
name := buildMigrationLockName("mydb", "mytable")
require.Equal(t, "gh-ost::mydb.mytable", name)
require.LessOrEqual(t, len(name), 64)
})

t.Run("long name is hashed and within MySQL limit", func(t *testing.T) {
longDB := strings.Repeat("d", 40)
longTable := strings.Repeat("t", 40)
name := buildMigrationLockName(longDB, longTable)
require.LessOrEqual(t, len(name), 64)
require.True(t, strings.HasPrefix(name, "gh-ost::"))
// deterministic
require.Equal(t, name, buildMigrationLockName(longDB, longTable))
// distinct inputs produce distinct hashes
require.NotEqual(t, name, buildMigrationLockName(longDB, longTable+"x"))
})
}

func (suite *ApplierTestSuite) TestCreateGhostTable() {
ctx := context.Background()

Expand Down
3 changes: 3 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,6 +1570,9 @@ func (mgtr *Migrator) initiateApplier() error {
if err := mgtr.applier.InitDBConnections(); err != nil {
return err
}
if err := mgtr.applier.AcquireMigrationLock(mgtr.migrationContext.GetContext()); err != nil {
return err
}
if mgtr.migrationContext.Revert {
if err := mgtr.applier.CreateChangelogTable(); err != nil {
mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
Expand Down
2 changes: 2 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() {

migrator := NewMigrator(migrationContext, "0.0.0")
suite.Require().NoError(migrator.initiateApplier())
defer migrator.applier.Teardown()
suite.Require().NoError(migrator.applier.prepareQueries())
suite.Require().NoError(migrator.applier.ReadMigrationRangeValues())

Expand Down Expand Up @@ -946,6 +947,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() {

migrator := NewMigrator(migrationContext, "0.0.0")
suite.Require().NoError(migrator.initiateApplier())
defer migrator.applier.Teardown()
suite.Require().NoError(migrator.applier.prepareQueries())
suite.Require().NoError(migrator.applier.ReadMigrationRangeValues())

Expand Down
6 changes: 2 additions & 4 deletions go/logic/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"

"fmt"
"os"
"path/filepath"
"runtime"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
Expand Down Expand Up @@ -68,9 +68,7 @@ func newTestMigrationContext() *base.MigrationContext {
migrationContext.PanicOnWarnings = true
migrationContext.AllowedRunningOnMaster = true

//nolint:dogsled
_, filename, _, _ := runtime.Caller(0)
migrationContext.ServeSocketFile = filepath.Join(filepath.Dir(filename), "../../tmp/gh-ost.sock")
migrationContext.ServeSocketFile = filepath.Join(os.TempDir(), "gh-ost.sock")

return migrationContext
}
9 changes: 9 additions & 0 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
}

func sameFirstColumnValue(rangeStartArgs, rangeEndArgs []interface{}) bool {
if len(rangeStartArgs) == 0 || len(rangeEndArgs) == 0 {
return false
}
return fmt.Sprintf("%v", rangeStartArgs[0]) == fmt.Sprintf("%v", rangeEndArgs[0])
}

Expand Down Expand Up @@ -621,6 +624,9 @@ func buildUniqueKeyRangeEndTwoColumnViaOffset(
hint string,
) (result string, explodedArgs []interface{}, err error) {
m := newTwoColumnRangeMeta(uniqueKeyColumns)
if len(rangeStartArgs) != 2 || len(rangeEndArgs) != 2 {
return "", nil, fmt.Errorf("expected 2 range args in buildUniqueKeyRangeEndTwoColumnViaOffset, got %d start and %d end", len(rangeStartArgs), len(rangeEndArgs))
}
col2StartOp := string(startRangeComparisonSign)
selectClause := m.col1Name + ", " + m.col2Name
fromClause := databaseName + "." + tableName
Expand Down Expand Up @@ -684,6 +690,9 @@ func buildUniqueKeyRangeEndTwoColumnViaTemptable(
hint string,
) (result string, explodedArgs []interface{}, err error) {
m := newTwoColumnRangeMeta(uniqueKeyColumns)
if len(rangeStartArgs) != 2 || len(rangeEndArgs) != 2 {
return "", nil, fmt.Errorf("expected 2 range args in buildUniqueKeyRangeEndTwoColumnViaTemptable, got %d start and %d end", len(rangeStartArgs), len(rangeEndArgs))
}
col2StartOp := string(startRangeComparisonSign)
selectClause := m.col1Name + ", " + m.col2Name
fromClause := databaseName + "." + tableName
Expand Down
Loading