Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ type ControlServerConfig struct {
ServerBindAddr string

// Path to `web` base dir
WebBasedir string
WebBasedir string

// TODO: refactor control server config out of the base ferry at some point
// This adds optional buttons in the web ui that runs a script located at the
Expand Down Expand Up @@ -319,7 +319,8 @@ func (c *ControlServerConfig) Validate() error {

// SchemaName => TableName => ColumnName => CompressionAlgorithm
// Example: blog1 => articles => body => snappy
// (SELECT body FROM blog1.articles => returns compressed blob)
//
// (SELECT body FROM blog1.articles => returns compressed blob)
type ColumnCompressionConfig map[string]map[string]map[string]string

func (c ColumnCompressionConfig) CompressedColumnsFor(schemaName, tableName string) map[string]string {
Expand Down Expand Up @@ -835,7 +836,7 @@ func (c *Config) ValidateConfig() error {
return fmt.Errorf("control_server: %s", err)
}

if c.DBWriteRetries == 0 {
if c.DBWriteRetries == 0 {
c.DBWriteRetries = 5
}

Expand All @@ -856,7 +857,7 @@ func (c *Config) ValidateConfig() error {
}

if c.DBReadRetries == 0 {
c.DBReadRetries = 5
c.DBReadRetries = 60
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implies 60 * 1s retry schedule. I thought it's a reasonable starting point, similarly to how gh-ost does it (60 and 1s)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And just double-checked that the config is currently @ default in core (components/shop_mover/app/utils/podding/shop_mover/ghostferry/config.rb)

}

if c.MaxCutoverRetries == 0 {
Expand All @@ -870,7 +871,7 @@ func (c *Config) ValidateConfig() error {
return nil
}

func (c *Config) checkForDeprecatedConfig() {
func (c *Config) checkForDeprecatedConfig() {
if c.DataIterationBatchSize != 0 {
c.logDeprecated("DataIterationBatchSize", "UpdatableConfig.DataIterationBatchSize")
c.UpdatableConfig.DataIterationBatchSize = c.DataIterationBatchSize
Expand All @@ -888,11 +889,11 @@ func (c *Config) checkForDeprecatedConfig() {

if len(c.ControlServerCustomScripts) != 0 {
c.logDeprecated("ControlServerCustomScripts", "ControlServerConfig.CustomScripts")
c.ControlServerConfig.CustomScripts= c.ControlServerCustomScripts
c.ControlServerConfig.CustomScripts = c.ControlServerCustomScripts
}
}

func (c *Config) logDeprecated(deprecatedConfig string, newConfig string) {
func (c *Config) logDeprecated(deprecatedConfig string, newConfig string) {
logrus.Warnf("Config.%s is deprecated in favour of Config.%s", deprecatedConfig, newConfig)
}

Expand Down
20 changes: 15 additions & 5 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package ghostferry
import (
sqlorig "database/sql"
"fmt"
sql "github.com/Shopify/ghostferry/sqlwrapper"
"strings"
"time"

sql "github.com/Shopify/ghostferry/sqlwrapper"

"github.com/Masterminds/squirrel"
"github.com/go-mysql-org/go-mysql/schema"
Expand Down Expand Up @@ -35,8 +37,8 @@ type CursorConfig struct {
DB *sql.DB
Throttler Throttler

ColumnsToSelect []string
BuildSelect func([]string, *TableSchema, uint64, uint64) (squirrel.SelectBuilder, error)
ColumnsToSelect []string
BuildSelect func([]string, *TableSchema, uint64, uint64) (squirrel.SelectBuilder, error)
// BatchSize is a pointer to the BatchSize in Config.UpdatableConfig which can be independently updated from this code.
// Having it as a pointer allows the updated value to be read without needing additional code to copy the batch size value into the cursor config for each cursor we create.
BatchSize *uint64
Expand Down Expand Up @@ -99,7 +101,7 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
var batch *RowBatch
var paginationKeypos uint64

err := WithRetries(c.ReadRetries, 0, c.logger, "fetch rows", func() (err error) {
err := WithRetries(c.ReadRetries, 1*time.Second, c.logger, "fetch rows", func() (err error) {
if c.Throttler != nil {
WaitForThrottle(c.Throttler)
}
Expand Down Expand Up @@ -172,7 +174,15 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64
}

if c.RowLock {
selectBuilder = selectBuilder.Suffix("FOR UPDATE")
mySqlVersion, err := c.DB.QueryMySQLVersion()
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought it's best to just ask the DB 🙃

if err != nil {
return nil, 0, err
}
if strings.HasPrefix(mySqlVersion, "8.") {
selectBuilder = selectBuilder.Suffix("FOR SHARE NOWAIT")
} else {
selectBuilder = selectBuilder.Suffix("LOCK IN SHARE MODE")
}
}

query, args, err := selectBuilder.ToSql()
Expand Down
9 changes: 9 additions & 0 deletions sqlwrapper/ghostferry_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (db DB) Begin() (*Tx, error) {
return &Tx{tx, db.Marginalia}, err
}

func (db DB) QueryMySQLVersion() (string, error) {
var version string
err := db.QueryRow("SELECT @@version").Scan(&version)
if err != nil {
return "", fmt.Errorf("failed to get MySQL version: %v", err)
}
return version, nil
}

func (tx Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sqlorig.Result, error) {
return tx.Tx.ExecContext(ctx, AnnotateStmt(query, tx.marginalia), args...)
}
Expand Down
2 changes: 1 addition & 1 deletion test/go/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (this *ConfigTestSuite) TestDefaultValues() {
this.Require().Equal(5, this.config.DBWriteRetries)
this.Require().Equal(uint64(200), this.config.UpdatableConfig.DataIterationBatchSize)
this.Require().Equal(4, this.config.DataIterationConcurrency)
this.Require().Equal(5, this.config.DBReadRetries)
this.Require().Equal(60, this.config.DBReadRetries)
this.Require().Equal("0.0.0.0:8000", this.config.ControlServerConfig.ServerBindAddr)
this.Require().Equal(".", this.config.ControlServerConfig.WebBasedir)
}
Expand Down
3 changes: 3 additions & 0 deletions testhelpers/data_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math/rand"
"sync"
"time"

sq "github.com/Masterminds/squirrel"
sql "github.com/Shopify/ghostferry/sqlwrapper"
Expand Down Expand Up @@ -175,6 +176,8 @@ func (this *MixedActionDataWriter) WriteData(i int) {
if err != nil {
panic(err)
}

time.Sleep(10 * time.Millisecond)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to make the DataWriter more realistic, some integration tests (example) start it with NumberOfWriters: 4 and that's too much for NOWAIT to work without sometimes taking way too long to obtain a lock.

I'm wondering if this is a signal to consider just a FOR SHARE lock 🤔

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FOR SHARE is equivalent to what we're doing right now, so it's not a step down, right?

I mean, we haven't seen problems with ghostferry holding on to locks so far, AFAIK, so it wouldn't be the worst thing to keep doing it. However, if the goal is the minimize production impact than not holding on to locks would be better.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also start with FOR SHARE to unblock super_read_only and then investigate NOWAIT later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I agree that FOR SHARE is a safer option, but also don't have new ideas on how to de-risk FOR SHARE NOWAIT further. I think it's safe enough in a way that if the cursor fails to obtain a lock (which I doubt, the logic is equal to gh-ost's iterator now, which isn't even configurable), we can guarantee that any data will remain unaffected and ghostferry may just fail to run.

Do you have any objections against shipping with NOWAIT (or how we could test it further while shipping FOR SHARE first)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to go with NOWAIT. 👍

}
}

Expand Down
Loading