From bb58c7df0e55a4ff8427aeb3db327bac11160e1c Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Thu, 15 Jan 2026 20:53:54 +0100 Subject: [PATCH 1/4] Allow for creating canary releases by tagging commits --- .github/workflows/release.yml | 11 ++++++++++- CHANGELOG.md | 13 +++++++++++++ README.md | 20 ++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) create mode 100644 CHANGELOG.md diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 897d327a6..b502fd8f9 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -4,6 +4,8 @@ on: push: branches: - main + tags: + - canary/* jobs: build-debs: @@ -46,6 +48,13 @@ jobs: cat ghostferry-$GITHUB_SHA.sha256sum - name: Releasing Ghostferry to Github - run: gh release create --target ${GITHUB_REF#refs/heads/} -p release-${GITHUB_SHA::7} ghostferry* + if: ${{ github.ref_type == "branch" && github.ref_name == "main" }} + run: gh release create release-${GITHUB_SHA::7} --target ${{ github.ref_name }} --prerelease ghostferry* + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Releasing Ghostferry to Github + if: ${{ github.ref_type == "tag" && startsWith(github.ref_name, "canary/") }} + run: gh release create ${{github.ref_name}} --prerelease --verify-tag --notes-from-tag --latest=false ghostferry* env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 000000000..3b023c7bd --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,13 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [Unreleased] + +### Added + +- Changelog. + +## [1.1.0] + +Past releases. diff --git a/README.md b/README.md index 74258aa83..15f0a8765 100644 --- a/README.md +++ b/README.md @@ -82,3 +82,23 @@ or Run a specific test `DEBUG=1 ruby -Itest test/integration/trivial_test.rb -n "TrivialIntegrationTest#test_logged_query_omits_columns"` + +Releasing new version +--------------------- + +### Canary + +Tag your commit with `canary/*` and push, i.e. + +```bash +git tag --sign --message="Initial support for UUIDs as pagination keys" canary/v1.1.2-uuid-pagination-keys-alpha-1 +git push origin --tags +``` + +This will create the release named by tag. + +### Production + +Final releases are created automatically on merge to `main` branch, they will end up with `release-SHA` name. + +Remember to update version prior to bigger releases in `Makefile` along with updating the `CHANGELOG.md`. From e46c6a358ed094e93ca977fdb300fc1bf652e45c Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Fri, 16 Jan 2026 11:02:03 +0100 Subject: [PATCH 2/4] Update steps names --- .github/workflows/release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b502fd8f9..5d366342c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -47,13 +47,13 @@ jobs: echo "sha256sum:" cat ghostferry-$GITHUB_SHA.sha256sum - - name: Releasing Ghostferry to Github + - name: Releasing production Ghostferry if: ${{ github.ref_type == "branch" && github.ref_name == "main" }} run: gh release create release-${GITHUB_SHA::7} --target ${{ github.ref_name }} --prerelease ghostferry* env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Releasing Ghostferry to Github + - name: Releasing canary Ghostferry if: ${{ github.ref_type == "tag" && startsWith(github.ref_name, "canary/") }} run: gh release create ${{github.ref_name}} --prerelease --verify-tag --notes-from-tag --latest=false ghostferry* env: From 92878325fccf09b76289beddeea77c68aa2e9a20 Mon Sep 17 00:00:00 2001 From: Leszek Zalewski Date: Fri, 16 Jan 2026 11:26:04 +0100 Subject: [PATCH 3/4] Use single quotes in actions expressions --- .github/workflows/release.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 5d366342c..7c6020dc2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -48,13 +48,13 @@ jobs: cat ghostferry-$GITHUB_SHA.sha256sum - name: Releasing production Ghostferry - if: ${{ github.ref_type == "branch" && github.ref_name == "main" }} + if: ${{ github.ref_type == 'branch' && github.ref_name == 'main' }} run: gh release create release-${GITHUB_SHA::7} --target ${{ github.ref_name }} --prerelease ghostferry* env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - name: Releasing canary Ghostferry - if: ${{ github.ref_type == "tag" && startsWith(github.ref_name, "canary/") }} + if: ${{ github.ref_type == 'tag' && startsWith(github.ref_name, 'canary/') }} run: gh release create ${{github.ref_name}} --prerelease --verify-tag --notes-from-tag --latest=false ghostferry* env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} From d80cfb6646da10ffde66806f162c2e35153ae0d9 Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Tue, 13 Jan 2026 16:16:55 +0100 Subject: [PATCH 4/4] extract NewPaginationKeyFromRow --- batch_writer.go | 65 +++++-------------------------------------- cursor.go | 41 +++------------------------ data_iterator.go | 38 ++++--------------------- dml_events.go | 34 +++------------------- inline_verifier.go | 64 ++++++------------------------------------ iterative_verifier.go | 58 ++++++-------------------------------- pagination_key.go | 45 +++++++++++++++++++++++++++++- 7 files changed, 80 insertions(+), 265 deletions(-) diff --git a/batch_writer.go b/batch_writer.go index 550f16566..28dfb4bd6 100644 --- a/batch_writer.go +++ b/batch_writer.go @@ -7,7 +7,6 @@ import ( sql "github.com/Shopify/ghostferry/sqlwrapper" - "github.com/go-mysql-org/go-mysql/schema" "github.com/sirupsen/logrus" ) @@ -57,65 +56,15 @@ func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error { return nil } - var startPaginationKeypos, endPaginationKeypos PaginationKey - var err error - paginationColumn := batch.TableSchema().GetPaginationColumn() - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - var startValue, endValue uint64 - startValue, err = values[0].GetUint64(batch.PaginationKeyIndex()) - if err != nil { - return err - } - endValue, err = values[len(values)-1].GetUint64(batch.PaginationKeyIndex()) - if err != nil { - return err - } - startPaginationKeypos = NewUint64Key(startValue) - endPaginationKeypos = NewUint64Key(endValue) - - case schema.TYPE_BINARY, schema.TYPE_STRING: - startValueInterface := values[0][batch.PaginationKeyIndex()] - endValueInterface := values[len(values)-1][batch.PaginationKeyIndex()] - - getBytes := func(val interface{}) ([]byte, error) { - switch v := val.(type) { - case []byte: - return v, nil - case string: - return []byte(v), nil - default: - return nil, fmt.Errorf("expected binary/string pagination key, got %T", val) - } - } - - startValue, err := getBytes(startValueInterface) - if err != nil { - return err - } - - endValue, err := getBytes(endValueInterface) - if err != nil { - return err - } - - startPaginationKeypos = NewBinaryKey(startValue) - endPaginationKeypos = NewBinaryKey(endValue) - - default: - var startValue, endValue uint64 - startValue, err = values[0].GetUint64(batch.PaginationKeyIndex()) - if err != nil { - return err - } - endValue, err = values[len(values)-1].GetUint64(batch.PaginationKeyIndex()) - if err != nil { - return err - } - startPaginationKeypos = NewUint64Key(startValue) - endPaginationKeypos = NewUint64Key(endValue) + startPaginationKeypos, err := NewPaginationKeyFromRow(values[0], batch.PaginationKeyIndex(), paginationColumn) + if err != nil { + return err + } + endPaginationKeypos, err := NewPaginationKeyFromRow(values[len(values)-1], batch.PaginationKeyIndex(), paginationColumn) + if err != nil { + return err } db := batch.TableSchema().Schema diff --git a/cursor.go b/cursor.go index 08f5a8929..c685f270c 100644 --- a/cursor.go +++ b/cursor.go @@ -262,43 +262,10 @@ func (c *Cursor) Fetch(db SqlPreparer) (batch *RowBatch, paginationKeypos Pagina if len(batchData) > 0 { lastRowData := batchData[len(batchData)-1] - - switch c.paginationKeyColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - var value uint64 - value, err = lastRowData.GetUint64(paginationKeyIndex) - if err != nil { - logger.WithError(err).Error("failed to get uint64 paginationKey value") - return - } - paginationKeypos = NewUint64Key(value) - - case schema.TYPE_BINARY, schema.TYPE_STRING: - valueInterface := lastRowData[paginationKeyIndex] - - var valueBytes []byte - switch v := valueInterface.(type) { - case []byte: - valueBytes = v - case string: - valueBytes = []byte(v) - default: - err = fmt.Errorf("expected binary pagination key to be []byte or string, got %T", valueInterface) - logger.WithError(err).Error("failed to get binary paginationKey value") - return - } - - paginationKeypos = NewBinaryKey(valueBytes) - - default: - // Fallback for other integer types - var value uint64 - value, err = lastRowData.GetUint64(paginationKeyIndex) - if err != nil { - logger.WithError(err).Error("failed to get uint64 paginationKey value") - return - } - paginationKeypos = NewUint64Key(value) + paginationKeypos, err = NewPaginationKeyFromRow(lastRowData, paginationKeyIndex, c.paginationKeyColumn) + if err != nil { + logger.WithError(err).Error("failed to get paginationKey value") + return } } diff --git a/data_iterator.go b/data_iterator.go index efe7d1b75..dee62cfd3 100644 --- a/data_iterator.go +++ b/data_iterator.go @@ -6,7 +6,6 @@ import ( sql "github.com/Shopify/ghostferry/sqlwrapper" - "github.com/go-mysql-org/go-mysql/schema" "github.com/sirupsen/logrus" ) @@ -115,40 +114,13 @@ func (d *DataIterator) Run(tables []*TableSchema) { paginationColumn := table.GetPaginationColumn() for i, rowData := range batch.Values() { - var paginationKeyStr string - - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - paginationKeyUint, err := rowData.GetUint64(batch.PaginationKeyIndex()) - if err != nil { - logger.WithError(err).Error("failed to get uint64 paginationKey data") - return err - } - paginationKeyStr = NewUint64Key(paginationKeyUint).String() - - case schema.TYPE_BINARY, schema.TYPE_STRING: - paginationKeyInterface := rowData[batch.PaginationKeyIndex()] - var paginationKeyBytes []byte - switch v := paginationKeyInterface.(type) { - case []byte: - paginationKeyBytes = v - case string: - paginationKeyBytes = []byte(v) - default: - return fmt.Errorf("expected binary/string pagination key, got %T", paginationKeyInterface) - } - paginationKeyStr = NewBinaryKey(paginationKeyBytes).String() - - default: - paginationKeyUint, err := rowData.GetUint64(batch.PaginationKeyIndex()) - if err != nil { - logger.WithError(err).Error("failed to get paginationKey data") - return err - } - paginationKeyStr = NewUint64Key(paginationKeyUint).String() + paginationKey, err := NewPaginationKeyFromRow(rowData, batch.PaginationKeyIndex(), paginationColumn) + if err != nil { + logger.WithError(err).Error("failed to get paginationKey data") + return err } - fingerprints[paginationKeyStr] = rowData[len(rowData)-1].([]byte) + fingerprints[paginationKey.String()] = rowData[len(rowData)-1].([]byte) rows[i] = rowData[:len(rowData)-1] } diff --git a/dml_events.go b/dml_events.go index 7d96c2509..7b8a00f1d 100644 --- a/dml_events.go +++ b/dml_events.go @@ -576,35 +576,9 @@ func paginationKeyFromEventData(table *TableSchema, rowData RowData) (string, er return "", err } - paginationColumn := table.GetPaginationColumn() - paginationKeyIndex := table.GetPaginationKeyIndex() - - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - paginationKeyUint, err := rowData.GetUint64(paginationKeyIndex) - if err != nil { - return "", err - } - return NewUint64Key(paginationKeyUint).String(), nil - - case schema.TYPE_BINARY, schema.TYPE_STRING: - paginationKeyInterface := rowData[paginationKeyIndex] - var paginationKeyBytes []byte - switch v := paginationKeyInterface.(type) { - case []byte: - paginationKeyBytes = v - case string: - paginationKeyBytes = []byte(v) - default: - return "", fmt.Errorf("expected binary/string pagination key, got %T", paginationKeyInterface) - } - return NewBinaryKey(paginationKeyBytes).String(), nil - - default: - paginationKeyUint, err := rowData.GetUint64(paginationKeyIndex) - if err != nil { - return "", err - } - return NewUint64Key(paginationKeyUint).String(), nil + paginationKey, err := NewPaginationKeyFromRow(rowData, table.GetPaginationKeyIndex(), table.GetPaginationColumn()) + if err != nil { + return "", err } + return paginationKey.String(), nil } diff --git a/inline_verifier.go b/inline_verifier.go index 16c0fab16..df8e2055a 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -335,34 +335,11 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target paginationKeys := make([]interface{}, len(sourceBatch.Values())) for i, row := range sourceBatch.Values() { - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - paginationKeyUint, err := row.GetUint64(sourceBatch.PaginationKeyIndex()) - if err != nil { - return nil, err - } - paginationKeys[i] = paginationKeyUint - - case schema.TYPE_BINARY, schema.TYPE_STRING: - paginationKeyInterface := row[sourceBatch.PaginationKeyIndex()] - var paginationKeyBytes []byte - switch v := paginationKeyInterface.(type) { - case []byte: - paginationKeyBytes = v - case string: - paginationKeyBytes = []byte(v) - default: - return nil, fmt.Errorf("expected binary/string pagination key, got %T", paginationKeyInterface) - } - paginationKeys[i] = paginationKeyBytes - - default: - paginationKeyUint, err := row.GetUint64(sourceBatch.PaginationKeyIndex()) - if err != nil { - return nil, err - } - paginationKeys[i] = paginationKeyUint + paginationKey, err := NewPaginationKeyFromRow(row, sourceBatch.PaginationKeyIndex(), paginationColumn) + if err != nil { + return nil, err } + paginationKeys[i] = paginationKey.SQLValue() } // Fetch target data @@ -376,36 +353,11 @@ func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, target sourceDecompressedData := make(map[string]map[string][]byte) for _, rowData := range sourceBatch.Values() { - var paginationKeyStr string - - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - paginationKeyUint, err := rowData.GetUint64(sourceBatch.PaginationKeyIndex()) - if err != nil { - return nil, err - } - paginationKeyStr = NewUint64Key(paginationKeyUint).String() - - case schema.TYPE_BINARY, schema.TYPE_STRING: - paginationKeyInterface := rowData[sourceBatch.PaginationKeyIndex()] - var paginationKeyBytes []byte - switch v := paginationKeyInterface.(type) { - case []byte: - paginationKeyBytes = v - case string: - paginationKeyBytes = []byte(v) - default: - return nil, fmt.Errorf("expected binary/string pagination key, got %T", paginationKeyInterface) - } - paginationKeyStr = NewBinaryKey(paginationKeyBytes).String() - - default: - paginationKeyUint, err := rowData.GetUint64(sourceBatch.PaginationKeyIndex()) - if err != nil { - return nil, err - } - paginationKeyStr = NewUint64Key(paginationKeyUint).String() + paginationKey, err := NewPaginationKeyFromRow(rowData, sourceBatch.PaginationKeyIndex(), paginationColumn) + if err != nil { + return nil, err } + paginationKeyStr := paginationKey.String() sourceDecompressedData[paginationKeyStr] = make(map[string][]byte) for idx, col := range table.Columns { diff --git a/iterative_verifier.go b/iterative_verifier.go index cc896179b..3476ebe60 100644 --- a/iterative_verifier.go +++ b/iterative_verifier.go @@ -320,31 +320,12 @@ func (v *IterativeVerifier) GetHashes(db *sql.DB, schemaName, tableName, paginat return nil, err } - var paginationKeyStr string - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - paginationKeyUint, err := rowData.GetUint64(0) - if err != nil { - return nil, err - } - paginationKeyStr = NewUint64Key(paginationKeyUint).String() - - case schema.TYPE_BINARY, schema.TYPE_STRING: - paginationKeyBytes, ok := rowData[0].([]byte) - if !ok { - return nil, fmt.Errorf("expected []byte for binary pagination key, got %T", rowData[0]) - } - paginationKeyStr = NewBinaryKey(paginationKeyBytes).String() - - default: - paginationKeyUint, err := rowData.GetUint64(0) - if err != nil { - return nil, err - } - paginationKeyStr = NewUint64Key(paginationKeyUint).String() + paginationKey, err := NewPaginationKeyFromRow(rowData, 0, paginationColumn) + if err != nil { + return nil, err } - resultSet[paginationKeyStr] = rowData[1].([]byte) + resultSet[paginationKey.String()] = rowData[1].([]byte) } return resultSet, nil } @@ -422,34 +403,11 @@ func (v *IterativeVerifier) iterateTableFingerprints(table *TableSchema, mismatc paginationColumn := table.GetPaginationColumn() for _, rowData := range batch.Values() { - switch paginationColumn.Type { - case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: - paginationKeyUint, err := rowData.GetUint64(batch.PaginationKeyIndex()) - if err != nil { - return err - } - paginationKeys = append(paginationKeys, paginationKeyUint) - - case schema.TYPE_BINARY, schema.TYPE_STRING: - paginationKeyInterface := rowData[batch.PaginationKeyIndex()] - var paginationKeyBytes []byte - switch v := paginationKeyInterface.(type) { - case []byte: - paginationKeyBytes = v - case string: - paginationKeyBytes = []byte(v) - default: - return fmt.Errorf("expected binary/string pagination key, got %T", paginationKeyInterface) - } - paginationKeys = append(paginationKeys, paginationKeyBytes) - - default: - paginationKeyUint, err := rowData.GetUint64(batch.PaginationKeyIndex()) - if err != nil { - return err - } - paginationKeys = append(paginationKeys, paginationKeyUint) + paginationKey, err := NewPaginationKeyFromRow(rowData, batch.PaginationKeyIndex(), paginationColumn) + if err != nil { + return err } + paginationKeys = append(paginationKeys, paginationKey.SQLValue()) } mismatchedPaginationKeys, err := v.compareFingerprints(paginationKeys, batch.TableSchema()) diff --git a/pagination_key.go b/pagination_key.go index 421526483..4906632e0 100644 --- a/pagination_key.go +++ b/pagination_key.go @@ -11,12 +11,21 @@ import ( "github.com/go-mysql-org/go-mysql/schema" ) +// PaginationKey represents a cursor position for paginating through table rows. +// It abstracts over different primary key types (integers, UUIDs, binary data) +// to enable consistent batched iteration through tables. type PaginationKey interface { + // SQLValue returns the value to use in SQL WHERE clauses (e.g., WHERE id > ?). SQLValue() interface{} + // Compare returns -1, 0, or 1 if this key is less than, equal to, or greater than other. Compare(other PaginationKey) int + // NumericPosition returns a float64 approximation for progress tracking and estimation. NumericPosition() float64 + // String returns a human-readable representation for logging and debugging. String() string + // MarshalJSON serializes the key for state persistence and checkpointing. MarshalJSON() ([]byte, error) + // IsMax returns true if this key represents the maximum possible value for its type. IsMax() bool } @@ -207,7 +216,7 @@ func MaxPaginationKey(column *schema.TableColumn) PaginationKey { if size > 4096 { size = 4096 } - + maxBytes := make([]byte, size) for i := range maxBytes { maxBytes[i] = 0xFF @@ -217,3 +226,37 @@ func MaxPaginationKey(column *schema.TableColumn) PaginationKey { return NewUint64Key(math.MaxUint64) } } + +// NewPaginationKeyFromRow extracts a pagination key from a row at the given index. +// It determines the appropriate key type based on the column schema. +func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableColumn) (PaginationKey, error) { + switch column.Type { + case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: + value, err := rowData.GetUint64(index) + if err != nil { + return nil, fmt.Errorf("failed to get uint64 pagination key: %w", err) + } + return NewUint64Key(value), nil + + case schema.TYPE_BINARY, schema.TYPE_STRING: + valueInterface := rowData[index] + var valueBytes []byte + switch v := valueInterface.(type) { + case []byte: + valueBytes = v + case string: + valueBytes = []byte(v) + default: + return nil, fmt.Errorf("expected binary pagination key to be []byte or string, got %T", valueInterface) + } + return NewBinaryKey(valueBytes), nil + + default: + // Fallback for other integer types + value, err := rowData.GetUint64(index) + if err != nil { + return nil, fmt.Errorf("failed to get pagination key: %w", err) + } + return NewUint64Key(value), nil + } +}