Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ All notable changes to this project will be documented in this file.
### Added

- Changelog.
- Pagination keys beyond UINT64 @milanatshopify #417
- Pagination keys other than UINT64 have to have binary collation @grodowski #422

## [1.1.0]

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Variables to be built into the binary
VERSION := 1.1.0
VERSION := 1.2.0

# This variable can be overwritten by the caller
DATETIME ?= $(shell date -u +%Y%m%d%H%M%S)
Expand Down
17 changes: 9 additions & 8 deletions batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
return nil
}

startPaginationKeypos, err := values[0].GetUint64(batch.PaginationKeyIndex())
paginationColumn := batch.TableSchema().GetPaginationColumn()

startPaginationKeypos, err := NewPaginationKeyFromRow(values[0], batch.PaginationKeyIndex(), paginationColumn)
if err != nil {
return err
}

endPaginationKeypos, err := values[len(values)-1].GetUint64(batch.PaginationKeyIndex())
endPaginationKeypos, err := NewPaginationKeyFromRow(values[len(values)-1], batch.PaginationKeyIndex(), paginationColumn)
if err != nil {
return err
}
Expand All @@ -78,12 +79,12 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {

query, args, err := batch.AsSQLQuery(db, table)
if err != nil {
return fmt.Errorf("during generating sql query at paginationKey %v -> %v: %v", startPaginationKeypos, endPaginationKeypos, err)
return fmt.Errorf("during generating sql query at paginationKey %s -> %s: %v", startPaginationKeypos.String(), endPaginationKeypos.String(), err)
}

stmt, err := w.stmtCache.StmtFor(w.DB, query)
if err != nil {
return fmt.Errorf("during prepare query near paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
return fmt.Errorf("during prepare query near paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
}

tx, err := w.DB.Begin()
Expand All @@ -94,14 +95,14 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
_, err = tx.Stmt(stmt).Exec(args...)
if err != nil {
tx.Rollback()
return fmt.Errorf("during exec query near paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
return fmt.Errorf("during exec query near paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
}

if w.InlineVerifier != nil {
mismatches, err := w.InlineVerifier.CheckFingerprintInline(tx, db, table, batch, w.EnforceInlineVerification)
if err != nil {
tx.Rollback()
return fmt.Errorf("during fingerprint checking for paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
return fmt.Errorf("during fingerprint checking for paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
}

if w.EnforceInlineVerification {
Expand All @@ -119,7 +120,7 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error {
err = tx.Commit()
if err != nil {
tx.Rollback()
return fmt.Errorf("during commit near paginationKey %v -> %v (%s): %v", startPaginationKeypos, endPaginationKeypos, query, err)
return fmt.Errorf("during commit near paginationKey %s -> %s (%s): %v", startPaginationKeypos.String(), endPaginationKeypos.String(), query, err)
}

// Note that the state tracker expects us the track based on the original
Expand Down
52 changes: 37 additions & 15 deletions compression_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (e UnsupportedCompressionError) Error() string {
type CompressionVerifier struct {
logger *logrus.Entry

TableSchemaCache TableSchemaCache
supportedAlgorithms map[string]struct{}
tableColumnCompressions TableColumnCompressionConfig
}
Expand All @@ -59,32 +60,52 @@ type CompressionVerifier struct {
// The GetCompressedHashes method checks if the existing table contains compressed data
// and will apply the decompression algorithm to the applicable columns if necessary.
// After the columns are decompressed, the hashes of the data are used to verify equality
func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error) {
func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schemaName, tableName, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []interface{}) (map[string][]byte, error) {
c.logger.WithFields(logrus.Fields{
"tag": "compression_verifier",
"table": table,
"table": tableName,
}).Info("decompressing table data before verification")

tableCompression := c.tableColumnCompressions[table]
tableCompression := c.tableColumnCompressions[tableName]

// Extract the raw rows using SQL to be decompressed
rows, err := getRows(db, schema, table, paginationKeyColumn, columns, paginationKeys)
rows, err := getRows(db, schemaName, tableName, paginationKeyColumn, columns, paginationKeys)
if err != nil {
return nil, err
}
defer rows.Close()

// Decompress applicable columns and hash the resulting column values for comparison
resultSet := make(map[uint64][]byte)
table := c.TableSchemaCache.Get(schemaName, tableName)
if table == nil {
return nil, fmt.Errorf("table %s.%s not found in schema cache", schemaName, tableName)
}
paginationColumn := table.GetPaginationColumn()
resultSet := make(map[string][]byte)

for rows.Next() {
rowData, err := ScanByteRow(rows, len(columns)+1)
if err != nil {
return nil, err
}

paginationKey, err := strconv.ParseUint(string(rowData[0]), 10, 64)
if err != nil {
return nil, err
var paginationKeyStr string
switch paginationColumn.Type {
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
paginationKeyUint, err := strconv.ParseUint(string(rowData[0]), 10, 64)
if err != nil {
return nil, err
}
paginationKeyStr = NewUint64Key(paginationKeyUint).String()

case schema.TYPE_BINARY, schema.TYPE_STRING:
paginationKeyStr = NewBinaryKey(rowData[0]).String()

default:
paginationKeyUint, err := strconv.ParseUint(string(rowData[0]), 10, 64)
if err != nil {
return nil, err
}
paginationKeyStr = NewUint64Key(paginationKeyUint).String()
}

// Decompress the applicable columns and then hash them together
Expand All @@ -95,7 +116,7 @@ func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pag
for idx, column := range columns {
if algorithm, ok := tableCompression[column.Name]; ok {
// rowData contains the result of "SELECT paginationKeyColumn, * FROM ...", so idx+1 to get each column
decompressedColData, err := c.Decompress(table, column.Name, algorithm, rowData[idx+1])
decompressedColData, err := c.Decompress(tableName, column.Name, algorithm, rowData[idx+1])
if err != nil {
return nil, err
}
Expand All @@ -111,20 +132,20 @@ func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, pag
return nil, err
}

resultSet[paginationKey] = decompressedRowHash
resultSet[paginationKeyStr] = decompressedRowHash
}

metrics.Gauge(
"compression_verifier_decompress_rows",
float64(len(resultSet)),
[]MetricTag{{"table", table}},
[]MetricTag{{"table", tableName}},
1.0,
)

logrus.WithFields(logrus.Fields{
"tag": "compression_verifier",
"rows": len(resultSet),
"table": table,
"table": tableName,
}).Debug("decompressed rows will be compared")

return resultSet, nil
Expand Down Expand Up @@ -192,12 +213,13 @@ func (c *CompressionVerifier) verifyConfiguredCompression(tableColumnCompression

// NewCompressionVerifier first checks the map for supported compression algorithms before
// initializing and returning the initialized instance.
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error) {
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig, tableSchemaCache TableSchemaCache) (*CompressionVerifier, error) {
supportedAlgorithms := make(map[string]struct{})
supportedAlgorithms[CompressionSnappy] = struct{}{}

compressionVerifier := &CompressionVerifier{
logger: logrus.WithField("tag", "compression_verifier"),
TableSchemaCache: tableSchemaCache,
supportedAlgorithms: supportedAlgorithms,
tableColumnCompressions: tableColumnCompressions,
}
Expand All @@ -209,7 +231,7 @@ func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig
return compressionVerifier, nil
}

func getRows(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (*sqlorig.Rows, error) {
func getRows(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []interface{}) (*sqlorig.Rows, error) {
quotedPaginationKey := QuoteField(paginationKeyColumn)
sql, args, err := rowSelector(columns, paginationKeyColumn).
From(QuotedTableNameFromString(schema, table)).
Expand Down
16 changes: 13 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,17 @@ func (c ForceIndexConfig) IndexFor(schemaName, tableName string) string {
// CascadingPaginationColumnConfig to configure pagination columns to be
// used. The term `Cascading` to denote that greater specificity takes
// precedence.
//
// IMPORTANT: All configured pagination columns must contain unique values.
// When specifying a FallbackColumn for tables with composite primary keys,
// ensure the column has a unique constraint to prevent data loss during migration.
type CascadingPaginationColumnConfig struct {
// PerTable has greatest specificity and takes precedence over the other options
PerTable map[string]map[string]string // SchemaName => TableName => ColumnName

// FallbackColumn is a global default to fallback to and is less specific than the
// default, which is the Primary Key
// default, which is the Primary Key.
// This column MUST have unique values (ideally a unique constraint) for data integrity.
FallbackColumn string
}

Expand Down Expand Up @@ -727,10 +732,15 @@ type Config struct {
//
ForceIndexForVerification ForceIndexConfig

// Ghostferry requires a single numeric column to paginate over tables. Inferring that column is done in the following exact order:
// Ghostferry requires a single numeric or binary column to paginate over tables. Inferring that column is done in the following exact order:
// 1. Use the PerTable pagination column, if configured for a table. Fail if we cannot find this column in the table.
// 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric or is a composite key without a FallbackColumn specified.
// 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric/binary or is a composite key without a FallbackColumn specified.
// 3. Use the FallbackColumn pagination column, if configured. Fail if we cannot find this column in the table.
//
// IMPORTANT: The pagination column MUST contain unique values for data integrity.
// When using a FallbackColumn (typically "id") for tables with composite primary keys, this column must have a unique constraint.
// The pagination algorithm uses WHERE pagination_key > last_key ORDER BY pagination_key LIMIT batch_size.
// If duplicate values exist, rows may be skipped during iteration, resulting in data loss during the migration.
CascadingPaginationColumnConfig *CascadingPaginationColumnConfig

// SkipTargetVerification is used to enable or disable target verification during moves.
Expand Down
31 changes: 16 additions & 15 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type CursorConfig struct {
Throttler Throttler

ColumnsToSelect []string
BuildSelect func([]string, *TableSchema, uint64, uint64) (squirrel.SelectBuilder, error)
BuildSelect func([]string, *TableSchema, PaginationKey, uint64) (squirrel.SelectBuilder, error)
// BatchSize is a pointer to the BatchSize in Config.UpdatableConfig which can be independently updated from this code.
// Having it as a pointer allows the updated value to be read without needing additional code to copy the batch size value into the cursor config for each cursor we create.
BatchSize *uint64
Expand All @@ -47,7 +47,7 @@ type CursorConfig struct {
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor {
func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey PaginationKey) *Cursor {
return &Cursor{
CursorConfig: *c,
Table: table,
Expand All @@ -58,7 +58,7 @@ func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPagi
}

// returns a new Cursor with an embedded copy of itself
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor {
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey PaginationKey) *Cursor {
cursor := c.NewCursor(table, startPaginationKey, maxPaginationKey)
cursor.RowLock = false
return cursor
Expand All @@ -77,11 +77,11 @@ type Cursor struct {
CursorConfig

Table *TableSchema
MaxPaginationKey uint64
MaxPaginationKey PaginationKey
RowLock bool

paginationKeyColumn *schema.TableColumn
lastSuccessfulPaginationKey uint64
lastSuccessfulPaginationKey PaginationKey
logger *logrus.Entry
}

Expand All @@ -96,10 +96,10 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
c.ColumnsToSelect = []string{"*"}
}

for c.lastSuccessfulPaginationKey < c.MaxPaginationKey {
for c.lastSuccessfulPaginationKey.Compare(c.MaxPaginationKey) < 0 {
var tx SqlPreparerAndRollbacker
var batch *RowBatch
var paginationKeypos uint64
var paginationKeypos PaginationKey

err := WithRetries(c.ReadRetries, 1*time.Second, c.logger, "fetch rows", func() (err error) {
if c.Throttler != nil {
Expand Down Expand Up @@ -137,9 +137,9 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
break
}

if paginationKeypos <= c.lastSuccessfulPaginationKey {
if paginationKeypos.Compare(c.lastSuccessfulPaginationKey) <= 0 {
tx.Rollback()
err = fmt.Errorf("new paginationKeypos %d <= lastSuccessfulPaginationKey %d", paginationKeypos, c.lastSuccessfulPaginationKey)
err = fmt.Errorf("new paginationKeypos %s <= lastSuccessfulPaginationKey %s", paginationKeypos.String(), c.lastSuccessfulPaginationKey.String())
c.logger.WithError(err).Errorf("last successful paginationKey position did not advance")
return err
}
Expand All @@ -159,7 +159,7 @@ func (c *Cursor) Each(f func(*RowBatch) error) error {
return nil
}

func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64, err error) {
func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos PaginationKey, err error) {
var selectBuilder squirrel.SelectBuilder
batchSize := c.CursorConfig.GetBatchSize(c.Table.Schema, c.Table.Name)

Expand All @@ -176,7 +176,7 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64
if c.RowLock {
mySqlVersion, err := c.DB.QueryMySQLVersion()
if err != nil {
return nil, 0, err
return nil, NewUint64Key(0), err
}
if strings.HasPrefix(mySqlVersion, "8.") {
selectBuilder = selectBuilder.Suffix("FOR SHARE NOWAIT")
Expand Down Expand Up @@ -261,9 +261,10 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos uint64
}

if len(batchData) > 0 {
paginationKeypos, err = batchData[len(batchData)-1].GetUint64(paginationKeyIndex)
lastRowData := batchData[len(batchData)-1]
paginationKeypos, err = NewPaginationKeyFromRow(lastRowData, paginationKeyIndex, c.paginationKeyColumn)
if err != nil {
logger.WithError(err).Error("failed to get uint64 paginationKey value")
logger.WithError(err).Error("failed to get paginationKey value")
return
}
}
Expand Down Expand Up @@ -304,12 +305,12 @@ func ScanByteRow(rows *sqlorig.Rows, columnCount int) ([][]byte, error) {
return values, err
}

func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey, batchSize uint64) squirrel.SelectBuilder {
func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey PaginationKey, batchSize uint64) squirrel.SelectBuilder {
quotedPaginationKey := QuoteField(table.GetPaginationColumn().Name)

return squirrel.Select(columns...).
From(QuotedTableName(table)).
Where(squirrel.Gt{quotedPaginationKey: lastPaginationKey}).
Where(squirrel.Gt{quotedPaginationKey: lastPaginationKey.SQLValue()}).
Limit(batchSize).
OrderBy(quotedPaginationKey)
}
Loading