Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,33 @@

All notable changes to this project will be documented in this file.

## [Unreleased]
## [1.2.1 - ?]

### Added

- `PaginationKey` now includes column name

### Changed

- Use `PaginationKey` instead of raw `uint64` for progress report. This means that table progress report will
include not raw value, but a whole `PaginationKey` object, i.e.
```json
{
"type": "uint64",
"column": "id",
"value": 999
}
```
which will be in line with the format of state dump. @driv3r #426

## [1.2.0 - 2026-02-06]

### Added

- Changelog.
- Pagination keys beyond UINT64 @milanatshopify #417
- Pagination keys other than UINT64 have to have binary collation @grodowski #422
- UUID as ID: validate collation by @grodowski in #422
- NewPaginationKeyFromRow refactor by @grodowski in #424
- Pagination beyond uint64 by @milanatshopify in #417

## [1.1.0]

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Variables to be built into the binary
VERSION := 1.2.0
VERSION := 1.2.1

# This variable can be overwritten by the caller
DATETIME ?= $(shell date -u +%Y%m%d%H%M%S)
Expand Down
8 changes: 4 additions & 4 deletions control_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ type ControlServerTableStatus struct {
TableName string
PaginationKeyName string
Status string
LastSuccessfulPaginationKey uint64
TargetPaginationKey uint64
LastSuccessfulPaginationKey PaginationKey
TargetPaginationKey PaginationKey

BatchSize uint64
}
Expand Down Expand Up @@ -286,11 +286,11 @@ func (this *ControlServer) fetchStatus() *ControlServerStatus {

lastSuccessfulPaginationKey := tableProgress.LastSuccessfulPaginationKey
if tableProgress.CurrentAction == TableActionWaiting {
lastSuccessfulPaginationKey = 0
lastSuccessfulPaginationKey = MinPaginationKey(this.F.Tables[name].GetPaginationColumn())
}
controlStatus := &ControlServerTableStatus{
TableName: name,
PaginationKeyName: this.F.Tables[name].GetPaginationColumn().Name,
PaginationKeyName: lastSuccessfulPaginationKey.ColumnName(),
Status: tableProgress.CurrentAction,
LastSuccessfulPaginationKey: lastSuccessfulPaginationKey,
TargetPaginationKey: tableProgress.TargetPaginationKey,
Expand Down
12 changes: 6 additions & 6 deletions ferry.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (f *Ferry) NewDataIterator() *DataIterator {
BatchSizePerTableOverride: f.Config.DataIterationBatchSizePerTableOverride,
ReadRetries: f.Config.DBReadRetries,
},
StateTracker: f.StateTracker,
StateTracker: f.StateTracker,
TargetPaginationKeys: &sync.Map{},
}

Expand Down Expand Up @@ -993,9 +993,9 @@ func (f *Ferry) Progress() *Progress {
rowStatsWrittenPerTable := f.StateTracker.RowStatsWrittenPerTable()

s.Tables = make(map[string]TableProgress)
targetPaginationKeys := make(map[string]uint64)
targetPaginationKeys := make(map[string]PaginationKey)
f.DataIterator.TargetPaginationKeys.Range(func(k, v interface{}) bool {
targetPaginationKeys[k.(string)] = uint64(v.(PaginationKey).NumericPosition())
targetPaginationKeys[k.(string)] = v.(PaginationKey)
return true
})

Expand All @@ -1022,9 +1022,9 @@ func (f *Ferry) Progress() *Progress {

rowWrittenStats, _ := rowStatsWrittenPerTable[tableName]

var lastSuccessfulPaginationKey uint64
var lastSuccessfulPaginationKey PaginationKey
if lastSuccessfulPaginationKeyInterface != nil {
lastSuccessfulPaginationKey = uint64(lastSuccessfulPaginationKeyInterface.NumericPosition())
lastSuccessfulPaginationKey = lastSuccessfulPaginationKeyInterface
}

s.Tables[tableName] = TableProgress{
Expand All @@ -1042,7 +1042,7 @@ func (f *Ferry) Progress() *Progress {
var completedPaginationKeys uint64 = 0
estimatedPaginationKeysPerSecond := f.StateTracker.EstimatedPaginationKeysPerSecond()
for _, targetPaginationKey := range targetPaginationKeys {
totalPaginationKeysToCopy += targetPaginationKey
totalPaginationKeysToCopy += uint64(targetPaginationKey.NumericPosition())
}

for _, completedPaginationKey := range serializedState.LastSuccessfulPaginationKeys {
Expand Down
133 changes: 79 additions & 54 deletions pagination_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
type PaginationKey interface {
// SQLValue returns the value to use in SQL WHERE clauses (e.g., WHERE id > ?).
SQLValue() interface{}
// ColumnName returns the column name this key belongs to, if known.
ColumnName() string
// Compare returns -1, 0, or 1 if this key is less than, equal to, or greater than other.
Compare(other PaginationKey) int
// NumericPosition returns a float64 approximation for progress tracking and estimation.
Expand All @@ -29,14 +31,31 @@ type PaginationKey interface {
IsMax() bool
}

type Uint64Key uint64
type Uint64Key struct {
Column string
Value uint64
}

type encodedKey struct {
Type string `json:"type"`
Value json.RawMessage `json:"value"`
Column string `json:"column,omitempty"`
}

func NewUint64Key(value uint64) Uint64Key {
return Uint64Key(value)
return Uint64Key{Value: value}
}

func NewUint64KeyWithColumn(column string, value uint64) Uint64Key {
return Uint64Key{Column: column, Value: value}
}

func (k Uint64Key) SQLValue() interface{} {
return uint64(k)
return k.Value
}

func (k Uint64Key) ColumnName() string {
return k.Column
}

func (k Uint64Key) Compare(other PaginationKey) int {
Expand All @@ -45,48 +64,71 @@ func (k Uint64Key) Compare(other PaginationKey) int {
panic(fmt.Sprintf("cannot compare Uint64Key with %T", other))
}

if k < otherKey {
if k.Value < otherKey.Value {
return -1
} else if k > otherKey {
} else if k.Value > otherKey.Value {
return 1
}
return 0
}

func (k Uint64Key) NumericPosition() float64 {
return float64(k)
return float64(k.Value)
}

func (k Uint64Key) String() string {
return fmt.Sprintf("%d", uint64(k))
return fmt.Sprintf("%d", k.Value)
}

func (k Uint64Key) IsMax() bool {
return k == Uint64Key(math.MaxUint64)
return k.Value == math.MaxUint64
}

func (k Uint64Key) MarshalJSON() ([]byte, error) {
return json.Marshal(uint64(k))
valBytes, err := json.Marshal(k.Value)

if err != nil {
return nil, err
}

return json.Marshal(encodedKey{
Type: "uint64",
Value: valBytes,
Column: k.Column,
})
}

type BinaryKey []byte
type BinaryKey struct {
Column string
Value []byte
}

func NewBinaryKey(value []byte) BinaryKey {
clone := make([]byte, len(value))
copy(clone, value)
return BinaryKey(clone)
return BinaryKey{Value: clone}
}

func NewBinaryKeyWithColumn(column string, value []byte) BinaryKey {
clone := make([]byte, len(value))
copy(clone, value)
return BinaryKey{Column: column, Value: clone}
}

func (k BinaryKey) SQLValue() interface{} {
return []byte(k)
return k.Value
}

func (k BinaryKey) ColumnName() string {
return k.Column
}

func (k BinaryKey) Compare(other PaginationKey) int {
otherKey, ok := other.(BinaryKey)
if !ok {
panic(fmt.Sprintf("type mismatch: cannot compare BinaryKey with %T", other))
}
return bytes.Compare(k, otherKey)
return bytes.Compare(k.Value, otherKey.Value)
}

// NumericPosition calculates a rough float position for progress tracking.
Expand All @@ -98,29 +140,29 @@ func (k BinaryKey) Compare(other PaginationKey) int {
//
// The core pagination algorithm (using Compare()) is unaffected and works correctly with any binary data.
func (k BinaryKey) NumericPosition() float64 {
if len(k) == 0 {
if len(k.Value) == 0 {
return 0.0
}

// Take up to the first 8 bytes to form a uint64 for estimation
var buf [8]byte
copy(buf[:], k)
copy(buf[:], k.Value)

val := binary.BigEndian.Uint64(buf[:])
return float64(val)
}

func (k BinaryKey) String() string {
return hex.EncodeToString(k)
return hex.EncodeToString(k.Value)
}

func (k BinaryKey) IsMax() bool {
// We cannot know the true "Max" of a VARBINARY without knowing the length.
// However, for UUID(16), we can check for FF...
if len(k) == 0 {
if len(k.Value) == 0 {
return false
}
for _, b := range k {
for _, b := range k.Value {
if b != 0xFF {
return false
}
Expand All @@ -129,37 +171,16 @@ func (k BinaryKey) IsMax() bool {
}

func (k BinaryKey) MarshalJSON() ([]byte, error) {
return json.Marshal(hex.EncodeToString(k))
}

type encodedKey struct {
Type string `json:"type"`
Value json.RawMessage `json:"value"`
}

func MarshalPaginationKey(k PaginationKey) ([]byte, error) {
var typeName string
var valBytes []byte
var err error

switch t := k.(type) {
case Uint64Key:
typeName = "uint64"
valBytes, err = t.MarshalJSON()
case BinaryKey:
typeName = "binary"
valBytes, err = t.MarshalJSON()
default:
return nil, fmt.Errorf("unknown pagination key type: %T", k)
}
valBytes, err := json.Marshal(hex.EncodeToString(k.Value))

if err != nil {
return nil, err
}

return json.Marshal(encodedKey{
Type: typeName,
Value: valBytes,
Type: "binary",
Value: valBytes,
Column: k.Column,
})
}

Expand All @@ -175,7 +196,9 @@ func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
if err := json.Unmarshal(wrapper.Value, &i); err != nil {
return nil, err
}
return NewUint64Key(i), nil
key := NewUint64Key(i)
key.Column = wrapper.Column
return key, nil
case "binary":
var s string
if err := json.Unmarshal(wrapper.Value, &s); err != nil {
Expand All @@ -185,7 +208,9 @@ func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
if err != nil {
return nil, err
}
return NewBinaryKey(b), nil
key := NewBinaryKey(b)
key.Column = wrapper.Column
return key, nil
default:
return nil, fmt.Errorf("unknown key type: %s", wrapper.Type)
}
Expand All @@ -194,21 +219,21 @@ func UnmarshalPaginationKey(data []byte) (PaginationKey, error) {
func MinPaginationKey(column *schema.TableColumn) PaginationKey {
switch column.Type {
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
return NewUint64Key(0)
return NewUint64KeyWithColumn(column.Name, 0)
// Handle all potential binary/string types
case schema.TYPE_BINARY, schema.TYPE_STRING:
// The smallest value for any binary/string type is an empty slice.
// Even for fixed BINARY(N), starting at empty ensures we catch [0x00, ...]
return NewBinaryKey([]byte{})
return NewBinaryKeyWithColumn(column.Name, []byte{})
default:
return NewUint64Key(0)
return NewUint64KeyWithColumn(column.Name, 0)
}
}

func MaxPaginationKey(column *schema.TableColumn) PaginationKey {
switch column.Type {
case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT:
return NewUint64Key(math.MaxUint64)
return NewUint64KeyWithColumn(column.Name, math.MaxUint64)
case schema.TYPE_BINARY, schema.TYPE_STRING:
// SAFETY: Cap the size to prevent OOM on LONGBLOB (4GB).
// InnoDB index limit is 3072 bytes. 4KB is a safe upper bound for a PK.
Expand All @@ -221,9 +246,9 @@ func MaxPaginationKey(column *schema.TableColumn) PaginationKey {
for i := range maxBytes {
maxBytes[i] = 0xFF
}
return NewBinaryKey(maxBytes)
return NewBinaryKeyWithColumn(column.Name, maxBytes)
default:
return NewUint64Key(math.MaxUint64)
return NewUint64KeyWithColumn(column.Name, math.MaxUint64)
}
}

Expand All @@ -236,7 +261,7 @@ func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableCol
if err != nil {
return nil, fmt.Errorf("failed to get uint64 pagination key: %w", err)
}
return NewUint64Key(value), nil
return NewUint64KeyWithColumn(column.Name, value), nil

case schema.TYPE_BINARY, schema.TYPE_STRING:
valueInterface := rowData[index]
Expand All @@ -249,14 +274,14 @@ func NewPaginationKeyFromRow(rowData RowData, index int, column *schema.TableCol
default:
return nil, fmt.Errorf("expected binary pagination key to be []byte or string, got %T", valueInterface)
}
return NewBinaryKey(valueBytes), nil
return NewBinaryKeyWithColumn(column.Name, valueBytes), nil

default:
// Fallback for other integer types
value, err := rowData.GetUint64(index)
if err != nil {
return nil, fmt.Errorf("failed to get pagination key: %w", err)
}
return NewUint64Key(value), nil
return NewUint64KeyWithColumn(column.Name, value), nil
}
}
Loading