Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions downstreamadapter/sink/mysql/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func TestMysqlSinkBasicFunctionality(t *testing.T) {
// Step 2: execDDLWithMaxRetries - Execute the actual DDL
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table t (id int primary key, name varchar(32));").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

Expand Down Expand Up @@ -261,6 +262,7 @@ func TestMysqlSinkMeetsDDLError(t *testing.T) {
// Step 2: execDDLWithMaxRetries - Execute the actual DDL (will fail)
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table t (id int primary key, name varchar(32));").WillReturnError(errors.New("connect: connection refused"))
mock.ExpectRollback()

Expand Down
4 changes: 4 additions & 0 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ func getMockDB(t *testing.T) *sql.DB {

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

Expand Down Expand Up @@ -658,6 +659,7 @@ func getMockDB(t *testing.T) *sql.DB {
// Then, apply ddl which commitTs equal to resolvedTs
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

Expand All @@ -672,6 +674,7 @@ func getMockDBForBigTxn(t *testing.T) *sql.DB {

mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table checkpoint(id int)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

Expand Down Expand Up @@ -713,6 +716,7 @@ func getMockDBForBigTxn(t *testing.T) *sql.DB {
// Then, apply ddl which commitTs equal to resolvedTs
mock.ExpectBegin()
mock.ExpectExec("USE `test`;").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("SET TIMESTAMP = DEFAULT").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("create table resolved(id int not null unique key)").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectCommit()

Expand Down
5 changes: 5 additions & 0 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
dmysql "github.com/go-sql-driver/mysql"
lru "github.com/hashicorp/golang-lru"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -325,6 +326,10 @@ func NewMysqlConfigAndDB(
// Adding an extra connection to the connection pool solves the connection exhaustion issue.
db.SetMaxIdleConns(cfg.WorkerCount + 1)
db.SetMaxOpenConns(cfg.WorkerCount + 1)
failpoint.Inject("MySQLSinkForceSingleConnection", func() {
db.SetMaxIdleConns(1)
db.SetMaxOpenConns(1)
})

// Inherit the default value of the prepared statement cache from the SinkURI Options
cachePrepStmts := cfg.CachePrepStmts
Expand Down
174 changes: 174 additions & 0 deletions pkg/sink/mysql/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import (
"github.com/coreos/go-semver/semver"
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/dumpling/export"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
"go.uber.org/zap"
Expand Down Expand Up @@ -593,3 +596,174 @@ func GetSQLModeStrBySQLMode(sqlMode mysql.SQLMode) string {
}
return strings.Join(sqlModeStr, ",")
}

func setSessionTimestamp(ctx context.Context, tx *sql.Tx, unixTimestamp float64) error {
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET TIMESTAMP = %s", formatUnixTimestamp(unixTimestamp)))
return err
}

func resetSessionTimestamp(ctx context.Context, tx *sql.Tx) error {
// Reset @@timestamp to prevent stale values from leaking across DDLs.
_, err := tx.ExecContext(ctx, "SET TIMESTAMP = DEFAULT")
return err
}

func formatUnixTimestamp(unixTimestamp float64) string {
return strconv.FormatFloat(unixTimestamp, 'f', 6, 64)
}

func ddlSessionTimestampFromOriginDefault(event *commonEvent.DDLEvent, timezone string) (float64, bool) {
if event == nil || event.TableInfo == nil {
return 0, false
}
targetColumns, err := extractCurrentTimestampDefaultColumns(event.GetDDLQuery())
if err != nil || len(targetColumns) == 0 {
return 0, false
}

for _, col := range event.TableInfo.GetColumns() {
if _, ok := targetColumns[col.Name.L]; !ok {
continue
}
val := col.GetOriginDefaultValue()
valStr, ok := val.(string)
if !ok || valStr == "" {
continue
}
ts, err := parseOriginDefaultTimestamp(valStr, col, timezone)
if err != nil {
log.Warn("Failed to parse OriginDefaultValue for DDL timestamp",
zap.String("column", col.Name.O),
zap.String("originDefault", valStr),
zap.Error(err))
continue
}
log.Info("Using OriginDefaultValue for DDL timestamp",
zap.String("column", col.Name.O),
zap.String("originDefault", valStr),
zap.Float64("timestamp", ts),
zap.String("timezone", timezone))
return ts, true
}

return 0, false
}

func extractCurrentTimestampDefaultColumns(query string) (map[string]struct{}, error) {
p := parser.New()
stmt, err := p.ParseOneStmt(query, "", "")
if err != nil {
return nil, err
}

cols := make(map[string]struct{})
switch s := stmt.(type) {
case *ast.CreateTableStmt:
for _, col := range s.Cols {
if hasCurrentTimestampDefault(col) {
cols[col.Name.Name.L] = struct{}{}
}
}
case *ast.AlterTableStmt:
for _, spec := range s.Specs {
switch spec.Tp {
case ast.AlterTableAddColumns, ast.AlterTableModifyColumn, ast.AlterTableChangeColumn, ast.AlterTableAlterColumn:
for _, col := range spec.NewColumns {
if hasCurrentTimestampDefault(col) {
cols[col.Name.Name.L] = struct{}{}
}
}
}
}
}

return cols, nil
}

func hasCurrentTimestampDefault(col *ast.ColumnDef) bool {
if col == nil {
return false
}
for _, opt := range col.Options {
if opt.Tp != ast.ColumnOptionDefaultValue {
continue
}
if isCurrentTimestampExpr(opt.Expr) {
return true
}
}
return false
}

func isCurrentTimestampExpr(expr ast.ExprNode) bool {
if expr == nil {
return false
}
switch v := expr.(type) {
case *ast.FuncCallExpr:
return isCurrentTimestampFuncName(v.FnName.L)
case ast.ValueExpr:
return isCurrentTimestampFuncName(strings.ToLower(v.GetString()))
default:
return false
}
}

func isCurrentTimestampFuncName(name string) bool {
switch name {
case ast.CurrentTimestamp, ast.Now, ast.LocalTime, ast.LocalTimestamp:
return true
default:
return false
}
}

func parseOriginDefaultTimestamp(val string, col *timodel.ColumnInfo, timezone string) (float64, error) {
loc, err := resolveOriginDefaultLocation(col, timezone)
if err != nil {
return 0, err
}
return parseTimestampInLocation(val, loc)
}

func resolveOriginDefaultLocation(col *timodel.ColumnInfo, timezone string) (*time.Location, error) {
if col != nil && col.GetType() == mysql.TypeTimestamp && col.Version >= timodel.ColumnInfoVersion1 {
return time.UTC, nil
}
if timezone == "" {
return time.UTC, nil
}
tz := strings.Trim(timezone, "\"")
return time.LoadLocation(tz)
}

func parseTimestampInLocation(val string, loc *time.Location) (float64, error) {
formats := []string{
"2006-01-02 15:04:05",
"2006-01-02 15:04:05.999999",
}
for _, f := range formats {
t, err := time.ParseInLocation(f, val, loc)
if err == nil {
return float64(t.UnixNano()) / float64(time.Second), nil
}
}
return 0, fmt.Errorf("failed to parse timestamp: %s", val)
}

func matchFailpointValue(val failpoint.Value, ddlQuery string) bool {
if val == nil {
return true
}
switch v := val.(type) {
case bool:
return v
case string:
if v == "" {
return true
}
return strings.Contains(strings.ToLower(ddlQuery), strings.ToLower(v))
default:
return true
}
}
44 changes: 44 additions & 0 deletions pkg/sink/mysql/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMatchFailpointValue(t *testing.T) {
ddl := "ALTER TABLE t ADD COLUMN c2 int"
tests := []struct {
name string
val any
want bool
}{
{name: "nil", val: nil, want: true},
{name: "bool-true", val: true, want: true},
{name: "bool-false", val: false, want: false},
{name: "empty-string", val: "", want: true},
{name: "match-string", val: "c2", want: true},
{name: "match-string-case", val: "C2", want: true},
{name: "no-match", val: "d2", want: false},
{name: "unknown-type", val: 123, want: true},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
require.Equal(t, tc.want, matchFailpointValue(tc.val, ddl))
})
}
}
77 changes: 76 additions & 1 deletion pkg/sink/mysql/mysql_writer_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,95 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error {
}
}

// Reset session timestamp before DDL to avoid leaking from pooled connections.
if err := resetSessionTimestamp(ctx, tx); err != nil {
log.Error("Failed to reset session timestamp before DDL execution",
zap.String("changefeed", w.ChangefeedID.String()),
zap.String("query", event.GetDDLQuery()),
zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("changefeed", w.ChangefeedID.String()), zap.Error(rbErr))
}
return err
}

ddlTimestamp, useSessionTimestamp := ddlSessionTimestampFromOriginDefault(event, w.cfg.Timezone)
skipSetTimestamp := false
failpoint.Inject("MySQLSinkSkipSetSessionTimestamp", func(val failpoint.Value) {
skipSetTimestamp = matchFailpointValue(val, event.GetDDLQuery())
})
skipResetAfterDDL := false
failpoint.Inject("MySQLSinkSkipResetSessionTimestampAfterDDL", func(val failpoint.Value) {
skipResetAfterDDL = matchFailpointValue(val, event.GetDDLQuery())
})

if useSessionTimestamp && skipSetTimestamp {
log.Warn("Skip setting session timestamp due to failpoint",
zap.String("changefeed", w.ChangefeedID.String()),
zap.String("query", event.GetDDLQuery()))
}
if useSessionTimestamp && !skipSetTimestamp {
// set the session timestamp to match upstream DDL execution time
if err := setSessionTimestamp(ctx, tx, ddlTimestamp); err != nil {
log.Error("Fail to set session timestamp for DDL",
zap.Float64("timestamp", ddlTimestamp),
zap.String("query", event.GetDDLQuery()),
zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("changefeed", w.ChangefeedID.String()), zap.Error(rbErr))
}
return err
}
}

query := event.GetDDLQuery()
_, err = tx.ExecContext(ctx, query)
if err != nil {
log.Error("Fail to ExecContext", zap.Any("err", err), zap.Any("query", query))
if useSessionTimestamp {
if skipResetAfterDDL {
log.Warn("Skip resetting session timestamp after DDL execution failure due to failpoint",
zap.String("changefeed", w.ChangefeedID.String()),
zap.String("query", event.GetDDLQuery()))
} else if tsErr := resetSessionTimestamp(ctx, tx); tsErr != nil {
log.Warn("Failed to reset session timestamp after DDL execution failure", zap.Error(tsErr))
}
}
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", event.GetDDLQuery()), zap.Error(err))
log.Error("Failed to rollback", zap.String("sql", event.GetDDLQuery()), zap.Error(rbErr))
}
return err
}

if useSessionTimestamp {
// reset session timestamp after DDL execution to avoid affecting subsequent operations
if skipResetAfterDDL {
log.Warn("Skip resetting session timestamp after DDL execution due to failpoint",
zap.String("changefeed", w.ChangefeedID.String()),
zap.String("query", event.GetDDLQuery()))
} else if err := resetSessionTimestamp(ctx, tx); err != nil {
log.Error("Failed to reset session timestamp after DDL execution", zap.Error(err))
if rbErr := tx.Rollback(); rbErr != nil {
log.Error("Failed to rollback", zap.String("sql", event.GetDDLQuery()), zap.Error(rbErr))
}
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", event.GetDDLQuery())))
}
}

if err = tx.Commit(); err != nil {
return errors.WrapError(errors.ErrMySQLTxnError, errors.WithMessage(err, fmt.Sprintf("Query info: %s; ", event.GetDDLQuery())))
}

logFields := []zap.Field{
zap.String("query", event.GetDDLQuery()),
}

if useSessionTimestamp {
logFields = append(logFields, zap.Float64("sessionTimestamp", ddlTimestamp))
}

log.Info("Exec DDL succeeded", logFields...)

return nil
}

Expand Down
Loading